OSDN Git Service

Del unused block_fetcher function
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         "github.com/bytom/account"
17         cfg "github.com/bytom/config"
18         "github.com/bytom/consensus"
19         "github.com/bytom/p2p"
20         "github.com/bytom/p2p/discover"
21         core "github.com/bytom/protocol"
22         "github.com/bytom/protocol/bc"
23         "github.com/bytom/protocol/bc/types"
24         "github.com/bytom/version"
25         "github.com/bytom/wallet"
26         "sync"
27 )
28
29 const (
30         maxTxChanSize = 10000
31 )
32
33 // Chain is the interface for Bytom core
34 type Chain interface {
35         BestBlockHeader() *types.BlockHeader
36         BestBlockHeight() uint64
37         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
38         GetBlockByHash(*bc.Hash) (*types.Block, error)
39         GetBlockByHeight(uint64) (*types.Block, error)
40         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
41         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
42         InMainChain(bc.Hash) bool
43         ProcessBlock(*types.Block) (bool, error)
44         ValidateTx(*types.Tx) (bool, error)
45 }
46
47 //SyncManager Sync Manager is responsible for the business layer information synchronization
48 type SyncManager struct {
49         sw          *p2p.Switch
50         genesisHash bc.Hash
51
52         privKey      crypto.PrivKeyEd25519 // local node's p2p key
53         chain        Chain
54         txPool       *core.TxPool
55         blockKeeper  *blockKeeper
56         peers        *peerSet
57
58         newTxCh      chan *types.Tx
59         newBlockCh   chan *bc.Hash
60         newAddrCh    chan *account.CtrlProgram
61         spvAddresses []*account.CtrlProgram
62         addrMutex    sync.RWMutex
63         txSyncCh     chan *txSyncMsg
64         quitSync     chan struct{}
65         config       *cfg.Config
66 }
67
68 //NewSyncManager create a sync manager
69 func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
70         genesisHeader, err := chain.GetHeaderByHeight(0)
71         if err != nil {
72                 return nil, err
73         }
74
75         sw := p2p.NewSwitch(config)
76         peers := newPeerSet(sw)
77         manager := &SyncManager{
78                 sw:           sw,
79                 genesisHash:  genesisHeader.Hash(),
80                 txPool:       txPool,
81                 chain:        chain,
82                 privKey:      crypto.GenPrivKeyEd25519(),
83                 blockKeeper:  newBlockKeeper(chain, peers),
84                 peers:        peers,
85                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
86                 newBlockCh:   newBlockCh,
87                 txSyncCh:     make(chan *txSyncMsg),
88                 quitSync:     make(chan struct{}),
89                 config:       config,
90                 newAddrCh:    wallet.AccountMgr.NewAddrCh,
91         }
92         manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
93         protocolReactor := NewProtocolReactor(manager, manager.peers)
94         manager.sw.AddReactor("PROTOCOL", protocolReactor)
95
96         // Create & add listener
97         var listenerStatus bool
98         var l p2p.Listener
99         if !config.VaultMode {
100                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
101                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
102                 //manager.sw.AddListener(l)
103
104                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
105                 if err != nil {
106                         return nil, err
107                 }
108                 manager.sw.SetDiscv(discv)
109         }
110         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
111         manager.sw.SetNodePrivKey(manager.privKey)
112         return manager, nil
113 }
114
115 //BestPeer return the highest p2p peerInfo
116 func (sm *SyncManager) BestPeer() *PeerInfo {
117         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
118         if bestPeer != nil {
119                 return bestPeer.getPeerInfo()
120         }
121         return nil
122 }
123
124 // GetNewTxCh return a unconfirmed transaction feed channel
125 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
126         return sm.newTxCh
127 }
128
129 //GetPeerInfos return peer info of all peers
130 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
131         return sm.peers.getPeerInfos()
132 }
133
134 //IsCaughtUp check wheather the peer finish the sync
135 func (sm *SyncManager) IsCaughtUp() bool {
136         peer := sm.peers.bestPeer(consensus.SFFullNode)
137         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
138 }
139
140 //NodeInfo get P2P peer node info
141 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
142         return sm.sw.NodeInfo()
143 }
144
145 //StopPeer try to stop peer by given ID
146 func (sm *SyncManager) StopPeer(peerID string) error {
147         if peer := sm.peers.getPeer(peerID); peer == nil {
148                 return errors.New("peerId not exist")
149         }
150         sm.peers.removePeer(peerID)
151         return nil
152 }
153
154 //Switch get sync manager switch
155 func (sm *SyncManager) Switch() *p2p.Switch {
156         return sm.sw
157 }
158
159 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
160         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
161 }
162
163 func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
164         sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
165 }
166
167 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
168         blocks, err := msg.GetBlocks()
169         if err != nil {
170                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
171                 return
172         }
173
174         sm.blockKeeper.processBlocks(peer.ID(), blocks)
175 }
176
177 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
178         var block *types.Block
179         var err error
180         if msg.Height != 0 {
181                 block, err = sm.chain.GetBlockByHeight(msg.Height)
182         } else {
183                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
184         }
185         if err != nil {
186                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
187                 return
188         }
189
190         ok, err := peer.sendBlock(block)
191         if !ok {
192                 sm.peers.removePeer(peer.ID())
193         }
194         if err != nil {
195                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
196         }
197 }
198
199 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
200         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
201         if err != nil || len(blocks) == 0 {
202                 return
203         }
204
205         totalSize := 0
206         sendBlocks := []*types.Block{}
207         for _, block := range blocks {
208                 rawData, err := block.MarshalText()
209                 if err != nil {
210                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
211                         continue
212                 }
213
214                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
215                         break
216                 }
217                 totalSize += len(rawData)
218                 sendBlocks = append(sendBlocks, block)
219         }
220
221         ok, err := peer.sendBlocks(sendBlocks)
222         if !ok {
223                 sm.peers.removePeer(peer.ID())
224         }
225         if err != nil {
226                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
227         }
228 }
229
230 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
231         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
232         if err != nil || len(headers) == 0 {
233                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
234                 return
235         }
236
237         ok, err := peer.sendHeaders(headers)
238         if !ok {
239                 sm.peers.removePeer(peer.ID())
240         }
241         if err != nil {
242                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
243         }
244 }
245
246 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
247         headers, err := msg.GetHeaders()
248         if err != nil {
249                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
250                 return
251         }
252
253         sm.blockKeeper.processHeaders(peer.ID(), headers)
254 }
255
256 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
257         bestHeader := sm.chain.BestBlockHeader()
258         genesisBlock, err := sm.chain.GetBlockByHeight(0)
259         if err != nil {
260                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
261         }
262
263         genesisHash := genesisBlock.Hash()
264         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
265         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
266                 sm.peers.removePeer(peer.ID())
267         }
268 }
269
270 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
271         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
272                 peer.setStatus(msg.Height, msg.GetHash())
273                 return
274         }
275
276         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
277                 log.WithFields(log.Fields{
278                         "remote genesis": genesisHash.String(),
279                         "local genesis":  sm.genesisHash.String(),
280                 }).Warn("fail hand shake due to differnt genesis")
281                 return
282         }
283         if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
284                 log.WithFields(log.Fields{
285                         "peer ServiceFlag": basePeer.ServiceFlag(),
286                 }).Warn("fail hand shake due to remote peer is not full node support spv proof")
287                 return
288         }
289         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
290 }
291
292 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
293         tx, err := msg.GetTransaction()
294         if err != nil {
295                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
296                 return
297         }
298
299         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
300                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
301         }
302 }
303
304 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
305         peer := sm.peers.getPeer(basePeer.ID())
306         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
307                 return
308         }
309
310         switch msg := msg.(type) {
311         //case *GetBlockMessage:
312         //      sm.handleGetBlockMsg(peer, msg)
313         //
314         //case *BlockMessage:
315         //      sm.handleBlockMsg(peer, msg)
316
317         case *StatusRequestMessage:
318                 sm.handleStatusRequestMsg(basePeer)
319
320         case *StatusResponseMessage:
321                 sm.handleStatusResponseMsg(basePeer, msg)
322
323         case *TransactionMessage:
324                 sm.handleTransactionMsg(peer, msg)
325
326         //case *MineBlockMessage:
327         //      sm.handleMineBlockMsg(peer, msg)
328
329         //case *GetHeadersMessage:
330         //      sm.handleGetHeadersMsg(peer, msg)
331
332         case *HeadersMessage:
333                 sm.handleHeadersMsg(peer, msg)
334
335         //case *GetBlocksMessage:
336         //      sm.handleGetBlocksMsg(peer, msg)
337
338         //case *BlocksMessage:
339         //      sm.handleBlocksMsg(peer, msg)
340
341         case *MerkleBlockMessage:
342                 sm.handleMerkelBlockMsg(peer, msg)
343
344         default:
345                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
346         }
347 }
348
349 // Defaults to tcp
350 func protocolAndAddress(listenAddr string) (string, string) {
351         p, address := "tcp", listenAddr
352         parts := strings.SplitN(address, "://", 2)
353         if len(parts) == 2 {
354                 p, address = parts[0], parts[1]
355         }
356         return p, address
357 }
358
359 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
360         nodeInfo := &p2p.NodeInfo{
361                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
362                 Moniker: sm.config.Moniker,
363                 Network: sm.config.ChainID,
364                 Version: version.Version,
365                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
366         }
367
368         if !sm.sw.IsListening() {
369                 return nodeInfo
370         }
371
372         p2pListener := sm.sw.Listeners()[0]
373
374         // We assume that the rpcListener has the same ExternalAddress.
375         // This is probably true because both P2P and RPC listeners use UPnP,
376         // except of course if the rpc is only bound to localhost
377         if listenerStatus {
378                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
379         } else {
380                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
381         }
382         return nodeInfo
383 }
384
385 //Start start sync manager service
386 func (sm *SyncManager) Start() {
387         if _, err := sm.sw.Start(); err != nil {
388                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
389         }
390         go sm.spvAddressMgr()
391         // broadcast transactions
392         go sm.txBroadcastLoop()
393         go sm.minedBroadcastLoop()
394         go sm.txSyncLoop()
395 }
396
397 //Stop stop sync manager
398 func (sm *SyncManager) Stop() {
399         close(sm.quitSync)
400         sm.sw.Stop()
401 }
402
403 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
404         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
405         if err != nil {
406                 return nil, err
407         }
408
409         conn, err := net.ListenUDP("udp", addr)
410         if err != nil {
411                 return nil, err
412         }
413
414         realaddr := conn.LocalAddr().(*net.UDPAddr)
415         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
416         if err != nil {
417                 return nil, err
418         }
419
420         // add the seeds node to the discover table
421         if config.P2P.Seeds == "" {
422                 return ntab, nil
423         }
424         nodes := []*discover.Node{}
425         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
426                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
427                 nodes = append(nodes, discover.MustParseNode(url))
428         }
429         if err = ntab.SetFallbackNodes(nodes); err != nil {
430                 return nil, err
431         }
432         return ntab, nil
433 }
434
435 func (sm *SyncManager) minedBroadcastLoop() {
436         for {
437                 select {
438                 case blockHash := <-sm.newBlockCh:
439                         block, err := sm.chain.GetBlockByHash(blockHash)
440                         if err != nil {
441                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
442                                 return
443                         }
444                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
445                                 log.Errorf("Broadcast mine block error. %v", err)
446                                 return
447                         }
448                 case <-sm.quitSync:
449                         return
450                 }
451         }
452 }