OSDN Git Service

add unit test for netsync (#1162)
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/p2p"
19         "github.com/bytom/p2p/discover"
20         "github.com/bytom/p2p/pex"
21         core "github.com/bytom/protocol"
22         "github.com/bytom/protocol/bc"
23         "github.com/bytom/protocol/bc/types"
24         "github.com/bytom/version"
25 )
26
27 const (
28         maxTxChanSize = 10000
29 )
30
31 // Chain is the interface for Bytom core
32 type Chain interface {
33         BestBlockHeader() *types.BlockHeader
34         BestBlockHeight() uint64
35         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
36         GetBlockByHash(*bc.Hash) (*types.Block, error)
37         GetBlockByHeight(uint64) (*types.Block, error)
38         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
39         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
40         InMainChain(bc.Hash) bool
41         ProcessBlock(*types.Block) (bool, error)
42         ValidateTx(*types.Tx) (bool, error)
43 }
44
45 //SyncManager Sync Manager is responsible for the business layer information synchronization
46 type SyncManager struct {
47         sw          *p2p.Switch
48         genesisHash bc.Hash
49
50         privKey      crypto.PrivKeyEd25519 // local node's p2p key
51         chain        Chain
52         txPool       *core.TxPool
53         blockFetcher *blockFetcher
54         blockKeeper  *blockKeeper
55         peers        *peerSet
56
57         newTxCh    chan *types.Tx
58         newBlockCh chan *bc.Hash
59         txSyncCh   chan *txSyncMsg
60         quitSync   chan struct{}
61         config     *cfg.Config
62 }
63
64 //NewSyncManager create a sync manager
65 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
66         genesisHeader, err := chain.GetHeaderByHeight(0)
67         if err != nil {
68                 return nil, err
69         }
70
71         sw := p2p.NewSwitch(config)
72         peers := newPeerSet(sw)
73         manager := &SyncManager{
74                 sw:           sw,
75                 genesisHash:  genesisHeader.Hash(),
76                 txPool:       txPool,
77                 chain:        chain,
78                 privKey:      crypto.GenPrivKeyEd25519(),
79                 blockFetcher: newBlockFetcher(chain, peers),
80                 blockKeeper:  newBlockKeeper(chain, peers),
81                 peers:        peers,
82                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
83                 newBlockCh:   newBlockCh,
84                 txSyncCh:     make(chan *txSyncMsg),
85                 quitSync:     make(chan struct{}),
86                 config:       config,
87         }
88
89         protocolReactor := NewProtocolReactor(manager, manager.peers)
90         manager.sw.AddReactor("PROTOCOL", protocolReactor)
91
92         // Create & add listener
93         var listenerStatus bool
94         var l p2p.Listener
95         if !config.VaultMode {
96                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
97                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
98                 manager.sw.AddListener(l)
99
100                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
101                 if err != nil {
102                         return nil, err
103                 }
104
105                 pexReactor := pex.NewPEXReactor(discv)
106                 manager.sw.AddReactor("PEX", pexReactor)
107         }
108         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
109         manager.sw.SetNodePrivKey(manager.privKey)
110         return manager, nil
111 }
112
113 //BestPeer return the highest p2p peerInfo
114 func (sm *SyncManager) BestPeer() *PeerInfo {
115         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
116         if bestPeer != nil {
117                 return bestPeer.getPeerInfo()
118         }
119         return nil
120 }
121
122 // GetNewTxCh return a unconfirmed transaction feed channel
123 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
124         return sm.newTxCh
125 }
126
127 //GetPeerInfos return peer info of all peers
128 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
129         return sm.peers.getPeerInfos()
130 }
131
132 //IsCaughtUp check wheather the peer finish the sync
133 func (sm *SyncManager) IsCaughtUp() bool {
134         peer := sm.peers.bestPeer(consensus.SFFullNode)
135         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
136 }
137
138 //NodeInfo get P2P peer node info
139 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
140         return sm.sw.NodeInfo()
141 }
142
143 //StopPeer try to stop peer by given ID
144 func (sm *SyncManager) StopPeer(peerID string) error {
145         if peer := sm.peers.getPeer(peerID); peer == nil {
146                 return errors.New("peerId not exist")
147         }
148         sm.peers.removePeer(peerID)
149         return nil
150 }
151
152 //Switch get sync manager switch
153 func (sm *SyncManager) Switch() *p2p.Switch {
154         return sm.sw
155 }
156
157 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
158         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
159 }
160
161 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
162         blocks, err := msg.GetBlocks()
163         if err != nil {
164                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
165                 return
166         }
167
168         sm.blockKeeper.processBlocks(peer.ID(), blocks)
169 }
170
171 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
172         var block *types.Block
173         var err error
174         if msg.Height != 0 {
175                 block, err = sm.chain.GetBlockByHeight(msg.Height)
176         } else {
177                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
178         }
179         if err != nil {
180                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
181                 return
182         }
183
184         ok, err := peer.sendBlock(block)
185         if !ok {
186                 sm.peers.removePeer(peer.ID())
187         }
188         if err != nil {
189                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
190         }
191 }
192
193 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
194         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
195         if err != nil || len(blocks) == 0 {
196                 return
197         }
198
199         totalSize := 0
200         sendBlocks := []*types.Block{}
201         for _, block := range blocks {
202                 rawData, err := block.MarshalText()
203                 if err != nil {
204                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
205                         continue
206                 }
207
208                 if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
209                         break
210                 }
211                 totalSize += len(rawData)
212                 sendBlocks = append(sendBlocks, block)
213         }
214
215         ok, err := peer.sendBlocks(sendBlocks)
216         if !ok {
217                 sm.peers.removePeer(peer.ID())
218         }
219         if err != nil {
220                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
221         }
222 }
223
224 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
225         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
226         if err != nil || len(headers) == 0 {
227                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
228                 return
229         }
230
231         ok, err := peer.sendHeaders(headers)
232         if !ok {
233                 sm.peers.removePeer(peer.ID())
234         }
235         if err != nil {
236                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
237         }
238 }
239
240 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
241         headers, err := msg.GetHeaders()
242         if err != nil {
243                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
244                 return
245         }
246
247         sm.blockKeeper.processHeaders(peer.ID(), headers)
248 }
249
250 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
251         block, err := msg.GetMineBlock()
252         if err != nil {
253                 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
254                 return
255         }
256
257         hash := block.Hash()
258         peer.markBlock(&hash)
259         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
260         peer.setStatus(block.Height, &hash)
261 }
262
263 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
264         bestHeader := sm.chain.BestBlockHeader()
265         genesisBlock, err := sm.chain.GetBlockByHeight(0)
266         if err != nil {
267                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
268         }
269
270         genesisHash := genesisBlock.Hash()
271         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
272         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
273                 sm.peers.removePeer(peer.ID())
274         }
275 }
276
277 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
278         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
279                 peer.setStatus(msg.Height, msg.GetHash())
280                 return
281         }
282
283         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
284                 log.WithFields(log.Fields{
285                         "remote genesis": genesisHash.String(),
286                         "local genesis":  sm.genesisHash.String(),
287                 }).Warn("fail hand shake due to differnt genesis")
288                 return
289         }
290
291         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
292 }
293
294 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
295         tx, err := msg.GetTransaction()
296         if err != nil {
297                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
298                 return
299         }
300
301         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
302                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
303         }
304 }
305
306 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
307         peer := sm.peers.getPeer(basePeer.ID())
308         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
309                 return
310         }
311
312         switch msg := msg.(type) {
313         case *GetBlockMessage:
314                 sm.handleGetBlockMsg(peer, msg)
315
316         case *BlockMessage:
317                 sm.handleBlockMsg(peer, msg)
318
319         case *StatusRequestMessage:
320                 sm.handleStatusRequestMsg(basePeer)
321
322         case *StatusResponseMessage:
323                 sm.handleStatusResponseMsg(basePeer, msg)
324
325         case *TransactionMessage:
326                 sm.handleTransactionMsg(peer, msg)
327
328         case *MineBlockMessage:
329                 sm.handleMineBlockMsg(peer, msg)
330
331         case *GetHeadersMessage:
332                 sm.handleGetHeadersMsg(peer, msg)
333
334         case *HeadersMessage:
335                 sm.handleHeadersMsg(peer, msg)
336
337         case *GetBlocksMessage:
338                 sm.handleGetBlocksMsg(peer, msg)
339
340         case *BlocksMessage:
341                 sm.handleBlocksMsg(peer, msg)
342
343         default:
344                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
345         }
346 }
347
348 // Defaults to tcp
349 func protocolAndAddress(listenAddr string) (string, string) {
350         p, address := "tcp", listenAddr
351         parts := strings.SplitN(address, "://", 2)
352         if len(parts) == 2 {
353                 p, address = parts[0], parts[1]
354         }
355         return p, address
356 }
357
358 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
359         nodeInfo := &p2p.NodeInfo{
360                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
361                 Moniker: sm.config.Moniker,
362                 Network: sm.config.ChainID,
363                 Version: version.Version,
364                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
365         }
366
367         if !sm.sw.IsListening() {
368                 return nodeInfo
369         }
370
371         p2pListener := sm.sw.Listeners()[0]
372
373         // We assume that the rpcListener has the same ExternalAddress.
374         // This is probably true because both P2P and RPC listeners use UPnP,
375         // except of course if the rpc is only bound to localhost
376         if listenerStatus {
377                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
378         } else {
379                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
380         }
381         return nodeInfo
382 }
383
384 //Start start sync manager service
385 func (sm *SyncManager) Start() {
386         if _, err := sm.sw.Start(); err != nil {
387                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
388         }
389         // broadcast transactions
390         go sm.txBroadcastLoop()
391         go sm.minedBroadcastLoop()
392         go sm.txSyncLoop()
393 }
394
395 //Stop stop sync manager
396 func (sm *SyncManager) Stop() {
397         close(sm.quitSync)
398         sm.sw.Stop()
399 }
400
401 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
402         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
403         if err != nil {
404                 return nil, err
405         }
406
407         conn, err := net.ListenUDP("udp", addr)
408         if err != nil {
409                 return nil, err
410         }
411
412         realaddr := conn.LocalAddr().(*net.UDPAddr)
413         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
414         if err != nil {
415                 return nil, err
416         }
417
418         // add the seeds node to the discover table
419         if config.P2P.Seeds == "" {
420                 return ntab, nil
421         }
422         nodes := []*discover.Node{}
423         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
424                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
425                 nodes = append(nodes, discover.MustParseNode(url))
426         }
427         if err = ntab.SetFallbackNodes(nodes); err != nil {
428                 return nil, err
429         }
430         return ntab, nil
431 }
432
433 func (sm *SyncManager) minedBroadcastLoop() {
434         for {
435                 select {
436                 case blockHash := <-sm.newBlockCh:
437                         block, err := sm.chain.GetBlockByHash(blockHash)
438                         if err != nil {
439                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
440                                 return
441                         }
442                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
443                                 log.Errorf("Broadcast mine block error. %v", err)
444                                 return
445                         }
446                 case <-sm.quitSync:
447                         return
448                 }
449         }
450 }