11 log "github.com/sirupsen/logrus"
12 "github.com/tendermint/go-crypto"
13 cmn "github.com/tendermint/tmlibs/common"
15 cfg "github.com/bytom/config"
16 "github.com/bytom/consensus"
17 "github.com/bytom/p2p"
18 "github.com/bytom/p2p/discover"
19 "github.com/bytom/p2p/pex"
20 core "github.com/bytom/protocol"
21 "github.com/bytom/protocol/bc"
22 "github.com/bytom/protocol/bc/types"
23 "github.com/bytom/version"
30 //SyncManager Sync Manager is responsible for the business layer information synchronization
31 type SyncManager struct {
35 privKey crypto.PrivKeyEd25519 // local node's p2p key
38 blockFetcher *blockFetcher
39 blockKeeper *blockKeeper
42 newTxCh chan *types.Tx
43 newBlockCh chan *bc.Hash
44 txSyncCh chan *txSyncMsg
45 quitSync chan struct{}
49 //NewSyncManager create a sync manager
50 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
51 genesisHeader, err := chain.GetHeaderByHeight(0)
56 sw := p2p.NewSwitch(config)
57 peers := newPeerSet(sw)
58 manager := &SyncManager{
60 genesisHash: genesisHeader.Hash(),
63 privKey: crypto.GenPrivKeyEd25519(),
64 blockFetcher: newBlockFetcher(chain, peers),
65 blockKeeper: newBlockKeeper(chain, peers),
67 newTxCh: make(chan *types.Tx, maxTxChanSize),
68 newBlockCh: newBlockCh,
69 txSyncCh: make(chan *txSyncMsg),
70 quitSync: make(chan struct{}),
74 protocolReactor := NewProtocolReactor(manager, manager.peers)
75 manager.sw.AddReactor("PROTOCOL", protocolReactor)
77 // Create & add listener
78 var listenerStatus bool
80 if !config.VaultMode {
81 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
82 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
83 manager.sw.AddListener(l)
85 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
90 pexReactor := pex.NewPEXReactor(discv)
91 manager.sw.AddReactor("PEX", pexReactor)
93 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
94 manager.sw.SetNodePrivKey(manager.privKey)
98 //BestPeer return the highest p2p peerInfo
99 func (sm *SyncManager) BestPeer() *PeerInfo {
100 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
102 return bestPeer.getPeerInfo()
107 // GetNewTxCh return a unconfirmed transaction feed channel
108 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
112 //GetPeerInfos return peer info of all peers
113 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
114 return sm.peers.getPeerInfos()
117 //IsCaughtUp check wheather the peer finish the sync
118 func (sm *SyncManager) IsCaughtUp() bool {
119 peer := sm.peers.bestPeer(consensus.SFFullNode)
120 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
123 //NodeInfo get P2P peer node info
124 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
125 return sm.sw.NodeInfo()
128 //StopPeer try to stop peer by given ID
129 func (sm *SyncManager) StopPeer(peerID string) error {
130 if peer := sm.peers.getPeer(peerID); peer == nil {
131 return errors.New("peerId not exist")
133 sm.peers.removePeer(peerID)
137 //Switch get sync manager switch
138 func (sm *SyncManager) Switch() *p2p.Switch {
142 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
143 sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
146 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
147 blocks, err := msg.GetBlocks()
149 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
153 sm.blockKeeper.processBlocks(peer.ID(), blocks)
156 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
157 var block *types.Block
160 block, err = sm.chain.GetBlockByHeight(msg.Height)
162 block, err = sm.chain.GetBlockByHash(msg.GetHash())
165 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
169 ok, err := peer.sendBlock(block)
171 sm.peers.removePeer(peer.ID())
174 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
178 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
179 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
180 if err != nil || len(blocks) == 0 {
185 sendBlocks := []*types.Block{}
186 for _, block := range blocks {
187 rawData, err := block.MarshalText()
189 log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
193 if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
196 totalSize += len(rawData)
197 sendBlocks = append(sendBlocks, block)
200 ok, err := peer.sendBlocks(sendBlocks)
202 sm.peers.removePeer(peer.ID())
205 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
209 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
210 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
211 if err != nil || len(headers) == 0 {
212 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
216 ok, err := peer.sendHeaders(headers)
218 sm.peers.removePeer(peer.ID())
221 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
225 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
226 headers, err := msg.GetHeaders()
228 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
232 sm.blockKeeper.processHeaders(peer.ID(), headers)
235 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
236 block, err := msg.GetMineBlock()
238 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
243 peer.markBlock(&hash)
244 sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
245 peer.setStatus(block.Height, &hash)
248 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
249 bestHeader := sm.chain.BestBlockHeader()
250 genesisBlock, err := sm.chain.GetBlockByHeight(0)
252 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
255 genesisHash := genesisBlock.Hash()
256 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
257 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
258 sm.peers.removePeer(peer.ID())
262 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
263 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
264 peer.setStatus(msg.Height, msg.GetHash())
268 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
269 log.WithFields(log.Fields{
270 "remote genesis": genesisHash.String(),
271 "local genesis": sm.genesisHash.String(),
272 }).Warn("fail hand shake due to differnt genesis")
276 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
279 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
280 tx, err := msg.GetTransaction()
282 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
286 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
287 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
292 func protocolAndAddress(listenAddr string) (string, string) {
293 p, address := "tcp", listenAddr
294 parts := strings.SplitN(address, "://", 2)
296 p, address = parts[0], parts[1]
301 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
302 nodeInfo := &p2p.NodeInfo{
303 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
304 Moniker: sm.config.Moniker,
305 Network: sm.config.ChainID,
306 Version: version.Version,
307 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
310 if !sm.sw.IsListening() {
314 p2pListener := sm.sw.Listeners()[0]
316 // We assume that the rpcListener has the same ExternalAddress.
317 // This is probably true because both P2P and RPC listeners use UPnP,
318 // except of course if the rpc is only bound to localhost
320 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
322 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
327 //Start start sync manager service
328 func (sm *SyncManager) Start() {
329 if _, err := sm.sw.Start(); err != nil {
330 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
332 // broadcast transactions
333 go sm.txBroadcastLoop()
334 go sm.minedBroadcastLoop()
338 //Stop stop sync manager
339 func (sm *SyncManager) Stop() {
344 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
345 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
350 conn, err := net.ListenUDP("udp", addr)
355 realaddr := conn.LocalAddr().(*net.UDPAddr)
356 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
361 // add the seeds node to the discover table
362 if config.P2P.Seeds == "" {
365 nodes := []*discover.Node{}
366 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
367 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
368 nodes = append(nodes, discover.MustParseNode(url))
370 if err = ntab.SetFallbackNodes(nodes); err != nil {
376 func (sm *SyncManager) minedBroadcastLoop() {
379 case blockHash := <-sm.newBlockCh:
380 block, err := sm.chain.GetBlockByHash(blockHash)
382 log.Errorf("Failed on mined broadcast loop get block %v", err)
385 if err := sm.peers.broadcastMinedBlock(block); err != nil {
386 log.Errorf("Broadcast mine block error. %v", err)