12 log "github.com/sirupsen/logrus"
13 "github.com/tendermint/go-crypto"
14 cmn "github.com/tendermint/tmlibs/common"
16 "github.com/bytom/account"
17 cfg "github.com/bytom/config"
18 "github.com/bytom/consensus"
19 "github.com/bytom/p2p"
20 "github.com/bytom/p2p/discover"
21 core "github.com/bytom/protocol"
22 "github.com/bytom/protocol/bc"
23 "github.com/bytom/protocol/bc/types"
24 "github.com/bytom/version"
25 "github.com/bytom/wallet"
33 // Chain is the interface for Bytom core
34 type Chain interface {
35 BestBlockHeader() *types.BlockHeader
36 BestBlockHeight() uint64
37 CalcNextSeed(*bc.Hash) (*bc.Hash, error)
38 GetBlockByHash(*bc.Hash) (*types.Block, error)
39 GetBlockByHeight(uint64) (*types.Block, error)
40 GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
41 GetHeaderByHeight(uint64) (*types.BlockHeader, error)
42 InMainChain(bc.Hash) bool
43 ProcessBlock(*types.Block) (bool, error)
44 ValidateTx(*types.Tx) (bool, error)
47 //SyncManager Sync Manager is responsible for the business layer information synchronization
48 type SyncManager struct {
52 privKey crypto.PrivKeyEd25519 // local node's p2p key
55 blockKeeper *blockKeeper
58 newTxCh chan *types.Tx
59 newBlockCh chan *bc.Hash
60 newAddrCh chan *account.CtrlProgram
61 spvAddresses []*account.CtrlProgram
62 addrMutex sync.RWMutex
63 txSyncCh chan *txSyncMsg
64 quitSync chan struct{}
68 //NewSyncManager create a sync manager
69 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
70 genesisHeader, err := chain.GetHeaderByHeight(0)
75 sw := p2p.NewSwitch(config)
76 peers := newPeerSet(sw)
77 manager := &SyncManager{
79 genesisHash: genesisHeader.Hash(),
82 privKey: crypto.GenPrivKeyEd25519(),
83 blockKeeper: newBlockKeeper(chain, peers),
85 newTxCh: make(chan *types.Tx, maxTxChanSize),
86 newBlockCh: newBlockCh,
87 txSyncCh: make(chan *txSyncMsg),
88 quitSync: make(chan struct{}),
90 newAddrCh: wallet.AccountMgr.NewAddrCh,
92 manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
93 protocolReactor := NewProtocolReactor(manager, manager.peers)
94 manager.sw.AddReactor("PROTOCOL", protocolReactor)
96 // Create & add listener
97 var listenerStatus bool
99 if !config.VaultMode {
100 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
101 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
102 //manager.sw.AddListener(l)
104 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
108 manager.sw.SetDiscv(discv)
110 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
111 manager.sw.SetNodePrivKey(manager.privKey)
115 //BestPeer return the highest p2p peerInfo
116 func (sm *SyncManager) BestPeer() *PeerInfo {
117 bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
119 return bestPeer.getPeerInfo()
124 // GetNewTxCh return a unconfirmed transaction feed channel
125 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
129 //GetPeerInfos return peer info of all peers
130 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
131 return sm.peers.getPeerInfos()
134 //IsCaughtUp check wheather the peer finish the sync
135 func (sm *SyncManager) IsCaughtUp() bool {
136 peer := sm.peers.bestPeer(consensus.SFFullNode)
137 return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
140 //NodeInfo get P2P peer node info
141 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
142 return sm.sw.NodeInfo()
145 //StopPeer try to stop peer by given ID
146 func (sm *SyncManager) StopPeer(peerID string) error {
147 if peer := sm.peers.getPeer(peerID); peer == nil {
148 return errors.New("peerId not exist")
150 sm.peers.removePeer(peerID)
154 //Switch get sync manager switch
155 func (sm *SyncManager) Switch() *p2p.Switch {
159 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
160 sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
163 func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
164 sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
167 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
168 blocks, err := msg.GetBlocks()
170 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
174 sm.blockKeeper.processBlocks(peer.ID(), blocks)
177 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
178 var block *types.Block
181 block, err = sm.chain.GetBlockByHeight(msg.Height)
183 block, err = sm.chain.GetBlockByHash(msg.GetHash())
186 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
190 ok, err := peer.sendBlock(block)
192 sm.peers.removePeer(peer.ID())
195 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
199 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
200 blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
201 if err != nil || len(blocks) == 0 {
206 sendBlocks := []*types.Block{}
207 for _, block := range blocks {
208 rawData, err := block.MarshalText()
210 log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
214 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
217 totalSize += len(rawData)
218 sendBlocks = append(sendBlocks, block)
221 ok, err := peer.sendBlocks(sendBlocks)
223 sm.peers.removePeer(peer.ID())
226 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
230 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
231 headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
232 if err != nil || len(headers) == 0 {
233 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
237 ok, err := peer.sendHeaders(headers)
239 sm.peers.removePeer(peer.ID())
242 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
246 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
247 headers, err := msg.GetHeaders()
249 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
253 sm.blockKeeper.processHeaders(peer.ID(), headers)
256 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
257 bestHeader := sm.chain.BestBlockHeader()
258 genesisBlock, err := sm.chain.GetBlockByHeight(0)
260 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
263 genesisHash := genesisBlock.Hash()
264 msg := NewStatusResponseMessage(bestHeader, &genesisHash)
265 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
266 sm.peers.removePeer(peer.ID())
270 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
271 if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
272 peer.setStatus(msg.Height, msg.GetHash())
276 if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
277 log.WithFields(log.Fields{
278 "remote genesis": genesisHash.String(),
279 "local genesis": sm.genesisHash.String(),
280 }).Warn("fail hand shake due to differnt genesis")
283 if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
284 log.WithFields(log.Fields{
285 "peer ServiceFlag": basePeer.ServiceFlag(),
286 }).Warn("fail hand shake due to remote peer is not full node support spv proof")
289 sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
292 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
293 tx, err := msg.GetTransaction()
295 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
299 if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
300 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
304 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
305 peer := sm.peers.getPeer(basePeer.ID())
306 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
310 switch msg := msg.(type) {
311 //case *GetBlockMessage:
312 // sm.handleGetBlockMsg(peer, msg)
314 //case *BlockMessage:
315 // sm.handleBlockMsg(peer, msg)
317 case *StatusRequestMessage:
318 sm.handleStatusRequestMsg(basePeer)
320 case *StatusResponseMessage:
321 sm.handleStatusResponseMsg(basePeer, msg)
323 case *TransactionMessage:
324 sm.handleTransactionMsg(peer, msg)
326 //case *MineBlockMessage:
327 // sm.handleMineBlockMsg(peer, msg)
329 //case *GetHeadersMessage:
330 // sm.handleGetHeadersMsg(peer, msg)
332 case *HeadersMessage:
333 sm.handleHeadersMsg(peer, msg)
335 //case *GetBlocksMessage:
336 // sm.handleGetBlocksMsg(peer, msg)
338 //case *BlocksMessage:
339 // sm.handleBlocksMsg(peer, msg)
341 case *MerkleBlockMessage:
342 sm.handleMerkelBlockMsg(peer, msg)
345 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
350 func protocolAndAddress(listenAddr string) (string, string) {
351 p, address := "tcp", listenAddr
352 parts := strings.SplitN(address, "://", 2)
354 p, address = parts[0], parts[1]
359 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
360 nodeInfo := &p2p.NodeInfo{
361 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
362 Moniker: sm.config.Moniker,
363 Network: sm.config.ChainID,
364 Version: version.Version,
365 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
368 if !sm.sw.IsListening() {
372 p2pListener := sm.sw.Listeners()[0]
374 // We assume that the rpcListener has the same ExternalAddress.
375 // This is probably true because both P2P and RPC listeners use UPnP,
376 // except of course if the rpc is only bound to localhost
378 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
380 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
385 //Start start sync manager service
386 func (sm *SyncManager) Start() {
387 if _, err := sm.sw.Start(); err != nil {
388 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
390 go sm.spvAddressMgr()
391 // broadcast transactions
392 go sm.txBroadcastLoop()
393 go sm.minedBroadcastLoop()
397 //Stop stop sync manager
398 func (sm *SyncManager) Stop() {
403 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
404 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
409 conn, err := net.ListenUDP("udp", addr)
414 realaddr := conn.LocalAddr().(*net.UDPAddr)
415 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
420 // add the seeds node to the discover table
421 if config.P2P.Seeds == "" {
424 nodes := []*discover.Node{}
425 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
426 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
427 nodes = append(nodes, discover.MustParseNode(url))
429 if err = ntab.SetFallbackNodes(nodes); err != nil {
435 func (sm *SyncManager) minedBroadcastLoop() {
438 case blockHash := <-sm.newBlockCh:
439 block, err := sm.chain.GetBlockByHash(blockHash)
441 log.Errorf("Failed on mined broadcast loop get block %v", err)
444 if err := sm.peers.broadcastMinedBlock(block); err != nil {
445 log.Errorf("Broadcast mine block error. %v", err)