OSDN Git Service

Add block fast sync function (#1104)
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "strconv"
9         "strings"
10
11         log "github.com/sirupsen/logrus"
12         "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/consensus"
17         "github.com/bytom/p2p"
18         "github.com/bytom/p2p/discover"
19         "github.com/bytom/p2p/pex"
20         core "github.com/bytom/protocol"
21         "github.com/bytom/protocol/bc"
22         "github.com/bytom/protocol/bc/types"
23         "github.com/bytom/version"
24 )
25
26 const (
27         maxTxChanSize = 10000
28 )
29
30 //SyncManager Sync Manager is responsible for the business layer information synchronization
31 type SyncManager struct {
32         sw          *p2p.Switch
33         genesisHash bc.Hash
34
35         privKey      crypto.PrivKeyEd25519 // local node's p2p key
36         chain        *core.Chain
37         txPool       *core.TxPool
38         blockFetcher *blockFetcher
39         blockKeeper  *blockKeeper
40         peers        *peerSet
41
42         newTxCh    chan *types.Tx
43         newBlockCh chan *bc.Hash
44         txSyncCh   chan *txSyncMsg
45         quitSync   chan struct{}
46         config     *cfg.Config
47 }
48
49 //NewSyncManager create a sync manager
50 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
51         genesisHeader, err := chain.GetHeaderByHeight(0)
52         if err != nil {
53                 return nil, err
54         }
55
56         sw := p2p.NewSwitch(config)
57         peers := newPeerSet(sw)
58         manager := &SyncManager{
59                 sw:           sw,
60                 genesisHash:  genesisHeader.Hash(),
61                 txPool:       txPool,
62                 chain:        chain,
63                 privKey:      crypto.GenPrivKeyEd25519(),
64                 blockFetcher: newBlockFetcher(chain, peers),
65                 blockKeeper:  newBlockKeeper(chain, peers),
66                 peers:        peers,
67                 newTxCh:      make(chan *types.Tx, maxTxChanSize),
68                 newBlockCh:   newBlockCh,
69                 txSyncCh:     make(chan *txSyncMsg),
70                 quitSync:     make(chan struct{}),
71                 config:       config,
72         }
73
74         protocolReactor := NewProtocolReactor(manager, manager.peers)
75         manager.sw.AddReactor("PROTOCOL", protocolReactor)
76
77         // Create & add listener
78         var listenerStatus bool
79         var l p2p.Listener
80         if !config.VaultMode {
81                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
82                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
83                 manager.sw.AddListener(l)
84
85                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
86                 if err != nil {
87                         return nil, err
88                 }
89
90                 pexReactor := pex.NewPEXReactor(discv)
91                 manager.sw.AddReactor("PEX", pexReactor)
92         }
93         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
94         manager.sw.SetNodePrivKey(manager.privKey)
95         return manager, nil
96 }
97
98 //BestPeer return the highest p2p peerInfo
99 func (sm *SyncManager) BestPeer() *PeerInfo {
100         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
101         if bestPeer != nil {
102                 return bestPeer.getPeerInfo()
103         }
104         return nil
105 }
106
107 // GetNewTxCh return a unconfirmed transaction feed channel
108 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
109         return sm.newTxCh
110 }
111
112 //GetPeerInfos return peer info of all peers
113 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
114         return sm.peers.getPeerInfos()
115 }
116
117 //IsCaughtUp check wheather the peer finish the sync
118 func (sm *SyncManager) IsCaughtUp() bool {
119         peer := sm.peers.bestPeer(consensus.SFFullNode)
120         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
121 }
122
123 //NodeInfo get P2P peer node info
124 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
125         return sm.sw.NodeInfo()
126 }
127
128 //StopPeer try to stop peer by given ID
129 func (sm *SyncManager) StopPeer(peerID string) error {
130         if peer := sm.peers.getPeer(peerID); peer == nil {
131                 return errors.New("peerId not exist")
132         }
133         sm.peers.removePeer(peerID)
134         return nil
135 }
136
137 //Switch get sync manager switch
138 func (sm *SyncManager) Switch() *p2p.Switch {
139         return sm.sw
140 }
141
142 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
143         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
144 }
145
146 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
147         blocks, err := msg.GetBlocks()
148         if err != nil {
149                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
150                 return
151         }
152
153         sm.blockKeeper.processBlocks(peer.ID(), blocks)
154 }
155
156 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
157         var block *types.Block
158         var err error
159         if msg.Height != 0 {
160                 block, err = sm.chain.GetBlockByHeight(msg.Height)
161         } else {
162                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
163         }
164         if err != nil {
165                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
166                 return
167         }
168
169         ok, err := peer.sendBlock(block)
170         if !ok {
171                 sm.peers.removePeer(peer.ID())
172         }
173         if err != nil {
174                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
175         }
176 }
177
178 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
179         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
180         if err != nil || len(blocks) == 0 {
181                 return
182         }
183
184         totalSize := 0
185         sendBlocks := []*types.Block{}
186         for _, block := range blocks {
187                 rawData, err := block.MarshalText()
188                 if err != nil {
189                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
190                         continue
191                 }
192
193                 if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
194                         break
195                 }
196                 totalSize += len(rawData)
197                 sendBlocks = append(sendBlocks, block)
198         }
199
200         ok, err := peer.sendBlocks(sendBlocks)
201         if !ok {
202                 sm.peers.removePeer(peer.ID())
203         }
204         if err != nil {
205                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
206         }
207 }
208
209 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
210         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
211         if err != nil || len(headers) == 0 {
212                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
213                 return
214         }
215
216         ok, err := peer.sendHeaders(headers)
217         if !ok {
218                 sm.peers.removePeer(peer.ID())
219         }
220         if err != nil {
221                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
222         }
223 }
224
225 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
226         headers, err := msg.GetHeaders()
227         if err != nil {
228                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
229                 return
230         }
231
232         sm.blockKeeper.processHeaders(peer.ID(), headers)
233 }
234
235 func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
236         block, err := msg.GetMineBlock()
237         if err != nil {
238                 log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
239                 return
240         }
241
242         hash := block.Hash()
243         peer.markBlock(&hash)
244         sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
245         peer.setStatus(block.Height, &hash)
246 }
247
248 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
249         bestHeader := sm.chain.BestBlockHeader()
250         genesisBlock, err := sm.chain.GetBlockByHeight(0)
251         if err != nil {
252                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
253         }
254
255         genesisHash := genesisBlock.Hash()
256         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
257         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
258                 sm.peers.removePeer(peer.ID())
259         }
260 }
261
262 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
263         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
264                 peer.setStatus(msg.Height, msg.GetHash())
265                 return
266         }
267
268         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
269                 log.WithFields(log.Fields{
270                         "remote genesis": genesisHash.String(),
271                         "local genesis":  sm.genesisHash.String(),
272                 }).Warn("fail hand shake due to differnt genesis")
273                 return
274         }
275
276         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
277 }
278
279 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
280         tx, err := msg.GetTransaction()
281         if err != nil {
282                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
283                 return
284         }
285
286         if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
287                 sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
288         }
289 }
290
291 // Defaults to tcp
292 func protocolAndAddress(listenAddr string) (string, string) {
293         p, address := "tcp", listenAddr
294         parts := strings.SplitN(address, "://", 2)
295         if len(parts) == 2 {
296                 p, address = parts[0], parts[1]
297         }
298         return p, address
299 }
300
301 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
302         nodeInfo := &p2p.NodeInfo{
303                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
304                 Moniker: sm.config.Moniker,
305                 Network: sm.config.ChainID,
306                 Version: version.Version,
307                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
308         }
309
310         if !sm.sw.IsListening() {
311                 return nodeInfo
312         }
313
314         p2pListener := sm.sw.Listeners()[0]
315
316         // We assume that the rpcListener has the same ExternalAddress.
317         // This is probably true because both P2P and RPC listeners use UPnP,
318         // except of course if the rpc is only bound to localhost
319         if listenerStatus {
320                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
321         } else {
322                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
323         }
324         return nodeInfo
325 }
326
327 //Start start sync manager service
328 func (sm *SyncManager) Start() {
329         if _, err := sm.sw.Start(); err != nil {
330                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
331         }
332         // broadcast transactions
333         go sm.txBroadcastLoop()
334         go sm.minedBroadcastLoop()
335         go sm.txSyncLoop()
336 }
337
338 //Stop stop sync manager
339 func (sm *SyncManager) Stop() {
340         close(sm.quitSync)
341         sm.sw.Stop()
342 }
343
344 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
345         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
346         if err != nil {
347                 return nil, err
348         }
349
350         conn, err := net.ListenUDP("udp", addr)
351         if err != nil {
352                 return nil, err
353         }
354
355         realaddr := conn.LocalAddr().(*net.UDPAddr)
356         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
357         if err != nil {
358                 return nil, err
359         }
360
361         // add the seeds node to the discover table
362         if config.P2P.Seeds == "" {
363                 return ntab, nil
364         }
365         nodes := []*discover.Node{}
366         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
367                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
368                 nodes = append(nodes, discover.MustParseNode(url))
369         }
370         if err = ntab.SetFallbackNodes(nodes); err != nil {
371                 return nil, err
372         }
373         return ntab, nil
374 }
375
376 func (sm *SyncManager) minedBroadcastLoop() {
377         for {
378                 select {
379                 case blockHash := <-sm.newBlockCh:
380                         block, err := sm.chain.GetBlockByHash(blockHash)
381                         if err != nil {
382                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
383                                 return
384                         }
385                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
386                                 log.Errorf("Broadcast mine block error. %v", err)
387                                 return
388                         }
389                 case <-sm.quitSync:
390                         return
391                 }
392         }
393 }