OSDN Git Service

Add block fast sync function (#1104)
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/p2p"
11         "github.com/bytom/p2p/connection"
12 )
13
14 const (
15         handshakeTimeout    = 10 * time.Second
16         handshakeCheckPerid = 500 * time.Millisecond
17 )
18
19 var (
20         errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
21         errStatusRequest            = errors.New("Status request error")
22 )
23
24 //ProtocolReactor handles new coming protocol message.
25 type ProtocolReactor struct {
26         p2p.BaseReactor
27
28         sm    *SyncManager
29         peers *peerSet
30 }
31
32 // NewProtocolReactor returns the reactor of whole blockchain.
33 func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
34         pr := &ProtocolReactor{
35                 sm:    sm,
36                 peers: peers,
37         }
38         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
39         return pr
40 }
41
42 // GetChannels implements Reactor
43 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
44         return []*connection.ChannelDescriptor{
45                 &connection.ChannelDescriptor{
46                         ID:                BlockchainChannel,
47                         Priority:          5,
48                         SendQueueCapacity: 100,
49                 },
50         }
51 }
52
53 // OnStart implements BaseService
54 func (pr *ProtocolReactor) OnStart() error {
55         pr.BaseReactor.OnStart()
56         return nil
57 }
58
59 // OnStop implements BaseService
60 func (pr *ProtocolReactor) OnStop() {
61         pr.BaseReactor.OnStop()
62 }
63
64 // AddPeer implements Reactor by sending our state to peer.
65 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
66         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
67                 return errStatusRequest
68         }
69
70         checkTicker := time.NewTimer(handshakeCheckPerid)
71         timeoutTicker := time.NewTimer(handshakeTimeout)
72         for {
73                 select {
74                 case <-checkTicker.C:
75                         if exist := pr.peers.getPeer(peer.Key); exist != nil {
76                                 pr.sm.syncTransactions(peer.Key)
77                                 return nil
78                         }
79
80                 case <-timeoutTicker.C:
81                         return errProtocolHandshakeTimeout
82                 }
83         }
84 }
85
86 // RemovePeer implements Reactor by removing peer from the pool.
87 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
88         pr.peers.removePeer(peer.Key)
89 }
90
91 // Receive implements Reactor by handling 4 types of messages (look below).
92 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
93         msgType, msg, err := DecodeMessage(msgBytes)
94         if err != nil {
95                 log.WithField("err", err).Errorf("fail on reactor decoding message")
96                 return
97         }
98
99         peer := pr.peers.getPeer(src.Key)
100         if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
101                 return
102         }
103
104         switch msg := msg.(type) {
105         case *GetBlockMessage:
106                 pr.sm.handleGetBlockMsg(peer, msg)
107
108         case *BlockMessage:
109                 pr.sm.handleBlockMsg(peer, msg)
110
111         case *StatusRequestMessage:
112                 pr.sm.handleStatusRequestMsg(src)
113
114         case *StatusResponseMessage:
115                 pr.sm.handleStatusResponseMsg(src, msg)
116
117         case *TransactionMessage:
118                 pr.sm.handleTransactionMsg(peer, msg)
119
120         case *MineBlockMessage:
121                 pr.sm.handleMineBlockMsg(peer, msg)
122
123         case *GetHeadersMessage:
124                 pr.sm.handleGetHeadersMsg(peer, msg)
125
126         case *HeadersMessage:
127                 pr.sm.handleHeadersMsg(peer, msg)
128
129         case *GetBlocksMessage:
130                 pr.sm.handleGetBlocksMsg(peer, msg)
131
132         case *BlocksMessage:
133                 pr.sm.handleBlocksMsg(peer, msg)
134
135         default:
136                 log.Errorf("unknown message type %v", reflect.TypeOf(msg))
137         }
138 }