OSDN Git Service

Remove p2p peer exchange module (#1196)
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/p2p"
19         "github.com/bytom/p2p/discover"
20         core "github.com/bytom/protocol"
21         "github.com/bytom/protocol/bc"
22         "github.com/bytom/protocol/bc/types"
23         "github.com/bytom/version"
24 )
25
26 const (
27         maxTxChanSize = 10000
28 )
29
30 // Chain is the interface for Bytom core
31 type Chain interface {
32         BestBlockHeader() *types.BlockHeader
33         BestBlockHeight() uint64
34         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
35         GetBlockByHash(*bc.Hash) (*types.Block, error)
36         GetBlockByHeight(uint64) (*types.Block, error)
37         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
38         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
39         InMainChain(bc.Hash) bool
40         ProcessBlock(*types.Block) (bool, error)
41         ValidateTx(*types.Tx) (bool, error)
42 }
43
44 //SyncManager Sync Manager is responsible for the business layer information synchronization
45 type SyncManager struct {
46         sw          *p2p.Switch
47         genesisHash bc.Hash
48
49         privKey      crypto.PrivKeyEd25519 // local node's p2p key
50         chain        Chain
51         txPool       *core.TxPool
52         blockFetcher *blockFetcher
53         blockKeeper  *blockKeeper
54         peers        *peerSet
55
56         newTxCh    chan *types.Tx
57         newBlockCh chan *bc.Hash
58         txSyncCh   chan *txSyncMsg
59         quitSync   chan struct{}
60         config     *cfg.Config
61 }
62
63 //NewSyncManager create a sync manager
64 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
65         genesisHeader, err := chain.GetHeaderByHeight(0)
66         if err != nil {
67                 return nil, err
68         }
69
70         sw := p2p.NewSwitch(config)
71         peers := newPeerSet(sw)
72         manager := &SyncManager{
73                 sw:           sw,
74                 genesisHash:  genesisHeader.Hash(),
75                 txPool:       txPool,
76                 chain:        chain,
77                 privKey:      crypto.GenPrivKeyEd25519(),
78                 blockFetcher: newBlockFetcher(chain, peers),
79                 blockKeeper:  newBlockKeeper(chain, peers),
80                 peers:        peers,
81                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
82                 newBlockCh:   newBlockCh,
83                 txSyncCh:     make(chan *txSyncMsg),
84                 quitSync:     make(chan struct{}),
85                 config:       config,
86         }
87
88         protocolReactor := NewProtocolReactor(manager, manager.peers)
89         manager.sw.AddReactor("PROTOCOL", protocolReactor)
90
91         // Create & add listener
92         var listenerStatus bool
93         var l p2p.Listener
94         if !config.VaultMode {
95                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
96                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
97                 manager.sw.AddListener(l)
98
99                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
100                 if err != nil {
101                         return nil, err
102                 }
103                 manager.sw.SetDiscv(discv)
104         }
105         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
106         manager.sw.SetNodePrivKey(manager.privKey)
107         return manager, nil
108 }
109
110 //BestPeer return the highest p2p peerInfo
111 func (sm *SyncManager) BestPeer() *PeerInfo {
112         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
113         if bestPeer != nil {
114                 return bestPeer.getPeerInfo()
115         }
116         return nil
117 }
118
119 // GetNewTxCh return a unconfirmed transaction feed channel
120 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
121         return sm.newTxCh
122 }
123
124 //GetPeerInfos return peer info of all peers
125 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
126         return sm.peers.getPeerInfos()
127 }
128
129 //IsCaughtUp check wheather the peer finish the sync
130 func (sm *SyncManager) IsCaughtUp() bool {
131         peer := sm.peers.bestPeer(consensus.SFFullNode)
132         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
133 }
134
135 //NodeInfo get P2P peer node info
136 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
137         return sm.sw.NodeInfo()
138 }
139
140 //StopPeer try to stop peer by given ID
141 func (sm *SyncManager) StopPeer(peerID string) error {
142         if peer := sm.peers.getPeer(peerID); peer == nil {
143                 return errors.New("peerId not exist")
144         }
145         sm.peers.removePeer(peerID)
146         return nil
147 }
148
149 //Switch get sync manager switch
150 func (sm *SyncManager) Switch() *p2p.Switch {
151         return sm.sw
152 }
153
154 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
155         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
156 }
157
158 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
159         blocks, err := msg.GetBlocks()
160         if err != nil {
161                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
162                 return
163         }
164
165         sm.blockKeeper.processBlocks(peer.ID(), blocks)
166 }
167
168 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
169         var block *types.Block
170         var err error
171         if msg.Height != 0 {
172                 block, err = sm.chain.GetBlockByHeight(msg.Height)
173         } else {
174                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
175         }
176         if err != nil {
177                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
178                 return
179         }
180
181         ok, err := peer.sendBlock(block)
182         if !ok {
183                 sm.peers.removePeer(peer.ID())
184         }
185         if err != nil {
186                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
187         }
188 }
189
190 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
191         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
192         if err != nil || len(blocks) == 0 {
193                 return
194         }
195
196         totalSize := 0
197         sendBlocks := []*types.Block{}
198         for _, block := range blocks {
199                 rawData, err := block.MarshalText()
200                 if err != nil {
201                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
202                         continue
203                 }
204
205                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
206                         break
207                 }
208                 totalSize += len(rawData)
209                 sendBlocks = append(sendBlocks, block)
210         }
211
212         ok, err := peer.sendBlocks(sendBlocks)
213         if !ok {
214                 sm.peers.removePeer(peer.ID())
215         }
216         if err != nil {
217                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
218         }
219 }
220
221 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
222         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
223         if err != nil || len(headers) == 0 {
224                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
225                 return
226         }
227
228         ok, err := peer.sendHeaders(headers)
229         if !ok {
230                 sm.peers.removePeer(peer.ID())
231         }
232         if err != nil {
233                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
234         }
235 }
236
237 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
238         headers, err := msg.GetHeaders()
239         if err != nil {
240                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
241                 return
242         }
243
244         sm.blockKeeper.processHeaders(peer.ID(), headers)
245 }
246
247 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
248         block, err := msg.GetMineBlock()
249         if err != nil {
250                 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
251                 return
252         }
253
254         hash := block.Hash()
255         peer.markBlock(&hash)
256         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
257         peer.setStatus(block.Height, &hash)
258 }
259
260 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
261         bestHeader := sm.chain.BestBlockHeader()
262         genesisBlock, err := sm.chain.GetBlockByHeight(0)
263         if err != nil {
264                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
265         }
266
267         genesisHash := genesisBlock.Hash()
268         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
269         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
270                 sm.peers.removePeer(peer.ID())
271         }
272 }
273
274 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
275         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
276                 peer.setStatus(msg.Height, msg.GetHash())
277                 return
278         }
279
280         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
281                 log.WithFields(log.Fields{
282                         "remote genesis": genesisHash.String(),
283                         "local genesis":  sm.genesisHash.String(),
284                 }).Warn("fail hand shake due to differnt genesis")
285                 return
286         }
287
288         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
289 }
290
291 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
292         tx, err := msg.GetTransaction()
293         if err != nil {
294                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
295                 return
296         }
297
298         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
299                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
300         }
301 }
302
303 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
304         peer := sm.peers.getPeer(basePeer.ID())
305         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
306                 return
307         }
308
309         switch msg := msg.(type) {
310         case *GetBlockMessage:
311                 sm.handleGetBlockMsg(peer, msg)
312
313         case *BlockMessage:
314                 sm.handleBlockMsg(peer, msg)
315
316         case *StatusRequestMessage:
317                 sm.handleStatusRequestMsg(basePeer)
318
319         case *StatusResponseMessage:
320                 sm.handleStatusResponseMsg(basePeer, msg)
321
322         case *TransactionMessage:
323                 sm.handleTransactionMsg(peer, msg)
324
325         case *MineBlockMessage:
326                 sm.handleMineBlockMsg(peer, msg)
327
328         case *GetHeadersMessage:
329                 sm.handleGetHeadersMsg(peer, msg)
330
331         case *HeadersMessage:
332                 sm.handleHeadersMsg(peer, msg)
333
334         case *GetBlocksMessage:
335                 sm.handleGetBlocksMsg(peer, msg)
336
337         case *BlocksMessage:
338                 sm.handleBlocksMsg(peer, msg)
339
340         default:
341                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
342         }
343 }
344
345 // Defaults to tcp
346 func protocolAndAddress(listenAddr string) (string, string) {
347         p, address := "tcp", listenAddr
348         parts := strings.SplitN(address, "://", 2)
349         if len(parts) == 2 {
350                 p, address = parts[0], parts[1]
351         }
352         return p, address
353 }
354
355 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
356         nodeInfo := &p2p.NodeInfo{
357                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
358                 Moniker: sm.config.Moniker,
359                 Network: sm.config.ChainID,
360                 Version: version.Version,
361                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
362         }
363
364         if !sm.sw.IsListening() {
365                 return nodeInfo
366         }
367
368         p2pListener := sm.sw.Listeners()[0]
369
370         // We assume that the rpcListener has the same ExternalAddress.
371         // This is probably true because both P2P and RPC listeners use UPnP,
372         // except of course if the rpc is only bound to localhost
373         if listenerStatus {
374                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
375         } else {
376                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
377         }
378         return nodeInfo
379 }
380
381 //Start start sync manager service
382 func (sm *SyncManager) Start() {
383         if _, err := sm.sw.Start(); err != nil {
384                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
385         }
386         // broadcast transactions
387         go sm.txBroadcastLoop()
388         go sm.minedBroadcastLoop()
389         go sm.txSyncLoop()
390 }
391
392 //Stop stop sync manager
393 func (sm *SyncManager) Stop() {
394         close(sm.quitSync)
395         sm.sw.Stop()
396 }
397
398 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
399         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
400         if err != nil {
401                 return nil, err
402         }
403
404         conn, err := net.ListenUDP("udp", addr)
405         if err != nil {
406                 return nil, err
407         }
408
409         realaddr := conn.LocalAddr().(*net.UDPAddr)
410         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
411         if err != nil {
412                 return nil, err
413         }
414
415         // add the seeds node to the discover table
416         if config.P2P.Seeds == "" {
417                 return ntab, nil
418         }
419         nodes := []*discover.Node{}
420         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
421                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
422                 nodes = append(nodes, discover.MustParseNode(url))
423         }
424         if err = ntab.SetFallbackNodes(nodes); err != nil {
425                 return nil, err
426         }
427         return ntab, nil
428 }
429
430 func (sm *SyncManager) minedBroadcastLoop() {
431         for {
432                 select {
433                 case blockHash := <-sm.newBlockCh:
434                         block, err := sm.chain.GetBlockByHash(blockHash)
435                         if err != nil {
436                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
437                                 return
438                         }
439                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
440                                 log.Errorf("Broadcast mine block error. %v", err)
441                                 return
442                         }
443                 case <-sm.quitSync:
444                         return
445                 }
446         }
447 }