7 log "github.com/sirupsen/logrus"
9 cfg "github.com/bytom/config"
10 "github.com/bytom/consensus"
11 "github.com/bytom/event"
12 "github.com/bytom/p2p"
13 core "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/protocol/bc/types"
16 "github.com/tendermint/go-crypto"
22 maxFilterAddressSize = 50
23 maxFilterAddressCount = 1000
27 errVaultModeDialPeer = errors.New("can't dial peer in vault mode")
30 // Chain is the interface for Bytom core
31 type Chain interface {
32 BestBlockHeader() *types.BlockHeader
33 BestBlockHeight() uint64
34 CalcNextSeed(*bc.Hash) (*bc.Hash, error)
35 GetBlockByHash(*bc.Hash) (*types.Block, error)
36 GetBlockByHeight(uint64) (*types.Block, error)
37 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
38 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
39 GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
40 InMainChain(bc.Hash) bool
41 ProcessBlock(*types.Block) (bool, error)
42 ValidateTx(*types.Tx) (bool, error)
45 type Switch interface {
46 AddReactor(name string, reactor p2p.Reactor) p2p.Reactor
47 AddBannedPeer(string) error
48 StopPeerGracefully(string)
49 NodeInfo() *p2p.NodeInfo
53 DialPeerWithAddress(addr *p2p.NetAddress) error
57 //SyncManager Sync Manager is responsible for the business layer information synchronization
58 type SyncManager struct {
63 blockFetcher *blockFetcher
64 blockKeeper *blockKeeper
67 txSyncCh chan *txSyncMsg
68 quitSync chan struct{}
71 eventDispatcher *event.Dispatcher
72 minedBlockSub *event.Subscription
73 txMsgSub *event.Subscription
76 // CreateSyncManager create sync manager and set switch.
77 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
78 sw, err := p2p.NewSwitch(config)
83 return newSyncManager(config, sw, chain, txPool, dispatcher)
86 //NewSyncManager create a sync manager
87 func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxPool, dispatcher *event.Dispatcher) (*SyncManager, error) {
88 genesisHeader, err := chain.GetHeaderByHeight(0)
92 peers := newPeerSet(sw)
93 manager := &SyncManager{
95 genesisHash: genesisHeader.Hash(),
98 blockFetcher: newBlockFetcher(chain, peers),
99 blockKeeper: newBlockKeeper(chain, peers),
101 txSyncCh: make(chan *txSyncMsg),
102 quitSync: make(chan struct{}),
104 eventDispatcher: dispatcher,
107 if !config.VaultMode {
108 protocolReactor := NewProtocolReactor(manager, peers)
109 manager.sw.AddReactor("PROTOCOL", protocolReactor)
114 //BestPeer return the highest p2p peerInfo
115 func (sm *SyncManager) BestPeer() *PeerInfo {
116 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
118 return bestPeer.getPeerInfo()
123 func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
124 if sm.config.VaultMode {
125 return errVaultModeDialPeer
128 return sm.sw.DialPeerWithAddress(addr)
131 func (sm *SyncManager) GetNetwork() string {
132 return sm.config.ChainID
135 //GetPeerInfos return peer info of all peers
136 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
137 return sm.peers.getPeerInfos()
140 //IsCaughtUp check wheather the peer finish the sync
141 func (sm *SyncManager) IsCaughtUp() bool {
142 peer := sm.peers.bestPeer(consensus.SFFullNode)
143 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
146 //StopPeer try to stop peer by given ID
147 func (sm *SyncManager) StopPeer(peerID string) error {
148 if peer := sm.peers.getPeer(peerID); peer == nil {
149 return errors.New("peerId not exist")
151 sm.peers.removePeer(peerID)
155 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
156 block, err := msg.GetBlock()
160 sm.blockKeeper.processBlock(peer.ID(), block)
163 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
164 blocks, err := msg.GetBlocks()
166 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleBlocksMsg GetBlocks")
170 sm.blockKeeper.processBlocks(peer.ID(), blocks)
173 func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
174 peer.addFilterAddress(msg.Address)
177 func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
178 peer.filterAdds.Clear()
181 func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
182 peer.addFilterAddresses(msg.Addresses)
185 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
186 var block *types.Block
189 block, err = sm.chain.GetBlockByHeight(msg.Height)
191 block, err = sm.chain.GetBlockByHash(msg.GetHash())
194 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetBlockMsg get block from chain")
198 ok, err := peer.sendBlock(block)
200 sm.peers.removePeer(peer.ID())
203 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlockMsg sentBlock")
207 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
208 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
209 if err != nil || len(blocks) == 0 {
214 sendBlocks := []*types.Block{}
215 for _, block := range blocks {
216 rawData, err := block.MarshalText()
218 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg marshal block")
222 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
225 totalSize += len(rawData)
226 sendBlocks = append(sendBlocks, block)
229 ok, err := peer.sendBlocks(sendBlocks)
231 sm.peers.removePeer(peer.ID())
234 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetBlocksMsg sentBlock")
238 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
239 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
240 if err != nil || len(headers) == 0 {
241 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleGetHeadersMsg locateHeaders")
245 ok, err := peer.sendHeaders(headers)
247 sm.peers.removePeer(peer.ID())
250 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetHeadersMsg sentBlock")
254 func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
256 var block *types.Block
258 block, err = sm.chain.GetBlockByHeight(msg.Height)
260 block, err = sm.chain.GetBlockByHash(msg.GetHash())
263 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get block from chain")
267 blockHash := block.Hash()
268 txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
270 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleGetMerkleBlockMsg get transaction status")
274 ok, err := peer.sendMerkleBlock(block, txStatus)
276 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
281 sm.peers.removePeer(peer.ID())
285 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
286 headers, err := msg.GetHeaders()
288 log.WithFields(log.Fields{"module": logModule, "err": err}).Debug("fail on handleHeadersMsg GetHeaders")
292 sm.blockKeeper.processHeaders(peer.ID(), headers)
295 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
296 block, err := msg.GetMineBlock()
298 log.WithFields(log.Fields{"module": logModule, "err": err}).Warning("fail on handleMineBlockMsg GetMineBlock")
303 peer.markBlock(&hash)
304 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
305 peer.setStatus(block.Height, &hash)
308 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
309 bestHeader := sm.chain.BestBlockHeader()
310 genesisBlock, err := sm.chain.GetBlockByHeight(0)
312 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on handleStatusRequestMsg get genesis")
315 genesisHash := genesisBlock.Hash()
316 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
317 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
318 sm.peers.removePeer(peer.ID())
322 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
323 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
324 peer.setStatus(msg.Height, msg.GetHash())
328 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
329 log.WithFields(log.Fields{"module": logModule, "remote genesis": genesisHash.String(), "local genesis": sm.genesisHash.String()}).Warn("fail hand shake due to differnt genesis")
333 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
336 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
337 tx, err := msg.GetTransaction()
339 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
343 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
344 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
348 func (sm *SyncManager) IsListening() bool {
349 if sm.config.VaultMode {
352 return sm.sw.IsListening()
355 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
356 if sm.config.VaultMode {
357 return p2p.NewNodeInfo(sm.config, crypto.PubKeyEd25519{}, "")
359 return sm.sw.NodeInfo()
362 func (sm *SyncManager) PeerCount() int {
363 if sm.config.VaultMode {
366 return len(sm.sw.Peers().List())
369 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
370 peer := sm.peers.getPeer(basePeer.ID())
371 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
375 log.WithFields(log.Fields{
377 "peer": basePeer.Addr(),
378 "type": reflect.TypeOf(msg),
379 "message": msg.String(),
380 }).Info("receive message from peer")
382 switch msg := msg.(type) {
383 case *GetBlockMessage:
384 sm.handleGetBlockMsg(peer, msg)
387 sm.handleBlockMsg(peer, msg)
389 case *StatusRequestMessage:
390 sm.handleStatusRequestMsg(basePeer)
392 case *StatusResponseMessage:
393 sm.handleStatusResponseMsg(basePeer, msg)
395 case *TransactionMessage:
396 sm.handleTransactionMsg(peer, msg)
398 case *MineBlockMessage:
399 sm.handleMineBlockMsg(peer, msg)
401 case *GetHeadersMessage:
402 sm.handleGetHeadersMsg(peer, msg)
404 case *HeadersMessage:
405 sm.handleHeadersMsg(peer, msg)
407 case *GetBlocksMessage:
408 sm.handleGetBlocksMsg(peer, msg)
411 sm.handleBlocksMsg(peer, msg)
413 case *FilterLoadMessage:
414 sm.handleFilterLoadMsg(peer, msg)
416 case *FilterAddMessage:
417 sm.handleFilterAddMsg(peer, msg)
419 case *FilterClearMessage:
420 sm.handleFilterClearMsg(peer)
422 case *GetMerkleBlockMessage:
423 sm.handleGetMerkleBlockMsg(peer, msg)
426 log.WithFields(log.Fields{
428 "peer": basePeer.Addr(),
429 "message_type": reflect.TypeOf(msg),
430 }).Error("unhandled message type")
434 func (sm *SyncManager) Start() error {
436 if _, err = sm.sw.Start(); err != nil {
437 log.Error("switch start err")
441 sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
446 sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
451 // broadcast transactions
452 go sm.txBroadcastLoop()
453 go sm.minedBroadcastLoop()
459 //Stop stop sync manager
460 func (sm *SyncManager) Stop() {
462 sm.minedBlockSub.Unsubscribe()
463 if !sm.config.VaultMode {
468 func (sm *SyncManager) minedBroadcastLoop() {
471 case obj, ok := <-sm.minedBlockSub.Chan():
473 log.WithFields(log.Fields{"module": logModule}).Warning("mined block subscription channel closed")
477 ev, ok := obj.Data.(event.NewMinedBlockEvent)
479 log.WithFields(log.Fields{"module": logModule}).Error("event type error")
483 if err := sm.peers.broadcastMinedBlock(ev.Block); err != nil {
484 log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast mine block")