OSDN Git Service

Add txstatus to core saveBlock
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "errors"
6         "net"
7         "path"
8         "reflect"
9         "strconv"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15
16         "github.com/bytom/account"
17         cfg "github.com/bytom/config"
18         "github.com/bytom/consensus"
19         "github.com/bytom/p2p"
20         "github.com/bytom/p2p/discover"
21         "github.com/bytom/protocol/bc"
22         "github.com/bytom/protocol/bc/types"
23         "github.com/bytom/version"
24         "github.com/bytom/wallet"
25         "sync"
26 )
27
28 const (
29         maxTxChanSize = 10000
30 )
31
32 // Chain is the interface for Bytom core
33 type Chain interface {
34         BestBlockHeader() *types.BlockHeader
35         BestBlockHeight() uint64
36         CalcNextSeed(*bc.Hash) (*bc.Hash, error)
37         GetBlockByHash(*bc.Hash) (*types.Block, error)
38         GetBlockByHeight(uint64) (*types.Block, error)
39         GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
40         GetHeaderByHeight(uint64) (*types.BlockHeader, error)
41         InMainChain(bc.Hash) bool
42         ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
43 }
44
45 //SyncManager Sync Manager is responsible for the business layer information synchronization
46 type SyncManager struct {
47         sw          *p2p.Switch
48         genesisHash bc.Hash
49
50         privKey     crypto.PrivKeyEd25519 // local node's p2p key
51         chain       Chain
52         blockKeeper *blockKeeper
53         peers       *peerSet
54
55         newTxCh      chan *types.Tx
56         txNotifyCh   chan *types.Tx
57         newBlockCh   chan *bc.Hash
58         newAddrCh    chan *account.CtrlProgram
59         spvAddresses []*account.CtrlProgram
60         addrMutex    sync.RWMutex
61         txSyncCh     chan *txSyncMsg
62         quitSync     chan struct{}
63         config       *cfg.Config
64 }
65
66 //NewSyncManager create a sync manager
67 func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
68         genesisHeader, err := chain.GetHeaderByHeight(0)
69         if err != nil {
70                 return nil, err
71         }
72
73         sw := p2p.NewSwitch(config)
74         peers := newPeerSet(sw)
75         manager := &SyncManager{
76                 sw:          sw,
77                 genesisHash: genesisHeader.Hash(),
78                 chain:       chain,
79                 privKey:     crypto.GenPrivKeyEd25519(),
80                 blockKeeper: newBlockKeeper(chain, peers),
81                 peers:       peers,
82                 newTxCh:     make(chan *types.Tx, maxTxChanSize),
83                 txNotifyCh:  wallet.GetTxCh(),
84                 newBlockCh:  newBlockCh,
85                 txSyncCh:    make(chan *txSyncMsg),
86                 quitSync:    make(chan struct{}),
87                 config:      config,
88                 newAddrCh:   wallet.AccountMgr.NewAddrCh,
89         }
90         manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
91         protocolReactor := NewProtocolReactor(manager, manager.peers)
92         manager.sw.AddReactor("PROTOCOL", protocolReactor)
93
94         // Create & add listener
95         var listenerStatus bool
96         var l p2p.Listener
97         if !config.VaultMode {
98                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
99                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
100                 //manager.sw.AddListener(l)
101
102                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
103                 if err != nil {
104                         return nil, err
105                 }
106                 manager.sw.SetDiscv(discv)
107         }
108         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
109         manager.sw.SetNodePrivKey(manager.privKey)
110         return manager, nil
111 }
112
113 //BestPeer return the highest p2p peerInfo
114 func (sm *SyncManager) BestPeer() *PeerInfo {
115         bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
116         if bestPeer != nil {
117                 return bestPeer.getPeerInfo()
118         }
119         return nil
120 }
121
122 // GetNewTxCh return a unconfirmed transaction feed channel
123 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
124         return sm.newTxCh
125 }
126
127 //GetPeerInfos return peer info of all peers
128 func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
129         return sm.peers.getPeerInfos()
130 }
131
132 //IsCaughtUp check wheather the peer finish the sync
133 func (sm *SyncManager) IsCaughtUp() bool {
134         peer := sm.peers.bestPeer(consensus.SFFullNode)
135         return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
136 }
137
138 //NodeInfo get P2P peer node info
139 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
140         return sm.sw.NodeInfo()
141 }
142
143 //StopPeer try to stop peer by given ID
144 func (sm *SyncManager) StopPeer(peerID string) error {
145         if peer := sm.peers.getPeer(peerID); peer == nil {
146                 return errors.New("peerId not exist")
147         }
148         sm.peers.removePeer(peerID)
149         return nil
150 }
151
152 //Switch get sync manager switch
153 func (sm *SyncManager) Switch() *p2p.Switch {
154         return sm.sw
155 }
156
157 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
158         sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
159 }
160
161 func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
162         sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
163 }
164
165 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
166         blocks, err := msg.GetBlocks()
167         if err != nil {
168                 log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
169                 return
170         }
171
172         sm.blockKeeper.processBlocks(peer.ID(), blocks)
173 }
174
175 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
176         var block *types.Block
177         var err error
178         if msg.Height != 0 {
179                 block, err = sm.chain.GetBlockByHeight(msg.Height)
180         } else {
181                 block, err = sm.chain.GetBlockByHash(msg.GetHash())
182         }
183         if err != nil {
184                 log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
185                 return
186         }
187
188         ok, err := peer.sendBlock(block)
189         if !ok {
190                 sm.peers.removePeer(peer.ID())
191         }
192         if err != nil {
193                 log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
194         }
195 }
196
197 func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
198         blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
199         if err != nil || len(blocks) == 0 {
200                 return
201         }
202
203         totalSize := 0
204         sendBlocks := []*types.Block{}
205         for _, block := range blocks {
206                 rawData, err := block.MarshalText()
207                 if err != nil {
208                         log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
209                         continue
210                 }
211
212                 if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
213                         break
214                 }
215                 totalSize += len(rawData)
216                 sendBlocks = append(sendBlocks, block)
217         }
218
219         ok, err := peer.sendBlocks(sendBlocks)
220         if !ok {
221                 sm.peers.removePeer(peer.ID())
222         }
223         if err != nil {
224                 log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
225         }
226 }
227
228 func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
229         headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
230         if err != nil || len(headers) == 0 {
231                 log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
232                 return
233         }
234
235         ok, err := peer.sendHeaders(headers)
236         if !ok {
237                 sm.peers.removePeer(peer.ID())
238         }
239         if err != nil {
240                 log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
241         }
242 }
243
244 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
245         headers, err := msg.GetHeaders()
246         if err != nil {
247                 log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
248                 return
249         }
250
251         sm.blockKeeper.processHeaders(peer.ID(), headers)
252 }
253
254 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
255         bestHeader := sm.chain.BestBlockHeader()
256         genesisBlock, err := sm.chain.GetBlockByHeight(0)
257         if err != nil {
258                 log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
259         }
260
261         genesisHash := genesisBlock.Hash()
262         msg := NewStatusResponseMessage(bestHeader, &genesisHash)
263         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
264                 sm.peers.removePeer(peer.ID())
265         }
266 }
267
268 func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
269         if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
270                 peer.setStatus(msg.Height, msg.GetHash())
271                 return
272         }
273
274         if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
275                 log.WithFields(log.Fields{
276                         "remote genesis": genesisHash.String(),
277                         "local genesis":  sm.genesisHash.String(),
278                 }).Warn("fail hand shake due to differnt genesis")
279                 return
280         }
281         if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
282                 log.WithFields(log.Fields{
283                         "peer ServiceFlag": basePeer.ServiceFlag(),
284                 }).Warn("fail hand shake due to remote peer is not full node support spv proof")
285                 return
286         }
287         sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
288 }
289
290 func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
291         tx, err := msg.GetTransaction()
292         if err != nil {
293                 sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
294                 return
295         }
296         sm.txNotifyCh <- tx
297 }
298
299 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
300         peer := sm.peers.getPeer(basePeer.ID())
301         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
302                 return
303         }
304
305         switch msg := msg.(type) {
306         case *StatusRequestMessage:
307                 sm.handleStatusRequestMsg(basePeer)
308
309         case *StatusResponseMessage:
310                 sm.handleStatusResponseMsg(basePeer, msg)
311
312         case *TransactionMessage:
313                 sm.handleTransactionMsg(peer, msg)
314
315         case *HeadersMessage:
316                 sm.handleHeadersMsg(peer, msg)
317
318         case *MerkleBlockMessage:
319                 sm.handleMerkelBlockMsg(peer, msg)
320
321         default:
322                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
323         }
324 }
325
326 // Defaults to tcp
327 func protocolAndAddress(listenAddr string) (string, string) {
328         p, address := "tcp", listenAddr
329         parts := strings.SplitN(address, "://", 2)
330         if len(parts) == 2 {
331                 p, address = parts[0], parts[1]
332         }
333         return p, address
334 }
335
336 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
337         nodeInfo := &p2p.NodeInfo{
338                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
339                 Moniker: sm.config.Moniker,
340                 Network: sm.config.ChainID,
341                 Version: version.Version,
342                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
343         }
344
345         if !sm.sw.IsListening() {
346                 return nodeInfo
347         }
348
349         p2pListener := sm.sw.Listeners()[0]
350
351         // We assume that the rpcListener has the same ExternalAddress.
352         // This is probably true because both P2P and RPC listeners use UPnP,
353         // except of course if the rpc is only bound to localhost
354         if listenerStatus {
355                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
356         } else {
357                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
358         }
359         return nodeInfo
360 }
361
362 //Start start sync manager service
363 func (sm *SyncManager) Start() {
364         if _, err := sm.sw.Start(); err != nil {
365                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
366         }
367         go sm.spvAddressMgr()
368         // broadcast transactions
369         go sm.txBroadcastLoop()
370         go sm.minedBroadcastLoop()
371         go sm.txSyncLoop()
372 }
373
374 //Stop stop sync manager
375 func (sm *SyncManager) Stop() {
376         close(sm.quitSync)
377         sm.sw.Stop()
378 }
379
380 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
381         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
382         if err != nil {
383                 return nil, err
384         }
385
386         conn, err := net.ListenUDP("udp", addr)
387         if err != nil {
388                 return nil, err
389         }
390
391         realaddr := conn.LocalAddr().(*net.UDPAddr)
392         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
393         if err != nil {
394                 return nil, err
395         }
396
397         // add the seeds node to the discover table
398         if config.P2P.Seeds == "" {
399                 return ntab, nil
400         }
401         nodes := []*discover.Node{}
402         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
403                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
404                 nodes = append(nodes, discover.MustParseNode(url))
405         }
406         if err = ntab.SetFallbackNodes(nodes); err != nil {
407                 return nil, err
408         }
409         return ntab, nil
410 }
411
412 func (sm *SyncManager) minedBroadcastLoop() {
413         for {
414                 select {
415                 case blockHash := <-sm.newBlockCh:
416                         block, err := sm.chain.GetBlockByHash(blockHash)
417                         if err != nil {
418                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
419                                 return
420                         }
421                         if err := sm.peers.broadcastMinedBlock(block); err != nil {
422                                 log.Errorf("Broadcast mine block error. %v", err)
423                                 return
424                         }
425                 case <-sm.quitSync:
426                         return
427                 }
428         }
429 }