7 log "github.com/sirupsen/logrus"
9 "github.com/bytom/errors"
10 "github.com/bytom/p2p"
11 "github.com/bytom/p2p/connection"
15 handshakeTimeout = 10 * time.Second
16 handshakeCheckPerid = 500 * time.Millisecond
20 errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
21 errStatusRequest = errors.New("Status request error")
24 //ProtocolReactor handles new coming protocol message.
25 type ProtocolReactor struct {
32 // NewProtocolReactor returns the reactor of whole blockchain.
33 func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
34 pr := &ProtocolReactor{
38 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
42 // GetChannels implements Reactor
43 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
44 return []*connection.ChannelDescriptor{
45 &connection.ChannelDescriptor{
46 ID: BlockchainChannel,
48 SendQueueCapacity: 100,
53 // OnStart implements BaseService
54 func (pr *ProtocolReactor) OnStart() error {
55 pr.BaseReactor.OnStart()
59 // OnStop implements BaseService
60 func (pr *ProtocolReactor) OnStop() {
61 pr.BaseReactor.OnStop()
64 // AddPeer implements Reactor by sending our state to peer.
65 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
66 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
67 return errStatusRequest
70 checkTicker := time.NewTimer(handshakeCheckPerid)
71 timeoutTicker := time.NewTimer(handshakeTimeout)
75 if exist := pr.peers.getPeer(peer.Key); exist != nil {
76 pr.sm.syncTransactions(peer.Key)
80 case <-timeoutTicker.C:
81 return errProtocolHandshakeTimeout
86 // RemovePeer implements Reactor by removing peer from the pool.
87 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
88 pr.peers.removePeer(peer.Key)
91 // Receive implements Reactor by handling 4 types of messages (look below).
92 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
93 msgType, msg, err := DecodeMessage(msgBytes)
95 log.WithField("err", err).Errorf("fail on reactor decoding message")
99 peer := pr.peers.getPeer(src.Key)
100 if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
104 switch msg := msg.(type) {
105 case *GetBlockMessage:
106 pr.sm.handleGetBlockMsg(peer, msg)
109 pr.sm.handleBlockMsg(peer, msg)
111 case *StatusRequestMessage:
112 pr.sm.handleStatusRequestMsg(src)
114 case *StatusResponseMessage:
115 pr.sm.handleStatusResponseMsg(src, msg)
117 case *TransactionMessage:
118 pr.sm.handleTransactionMsg(peer, msg)
120 case *MineBlockMessage:
121 pr.sm.handleMineBlockMsg(peer, msg)
123 case *GetHeadersMessage:
124 pr.sm.handleGetHeadersMsg(peer, msg)
126 case *HeadersMessage:
127 pr.sm.handleHeadersMsg(peer, msg)
129 case *GetBlocksMessage:
130 pr.sm.handleGetBlocksMsg(peer, msg)
133 pr.sm.handleBlocksMsg(peer, msg)
136 log.Errorf("unknown message type %v", reflect.TypeOf(msg))