8 log "github.com/sirupsen/logrus"
9 cmn "github.com/tendermint/tmlibs/common"
11 "github.com/bytom/errors"
12 "github.com/bytom/p2p"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/protocol/bc/types"
19 // BlockchainChannel is a channel for blocks and status updates
20 BlockchainChannel = byte(0x40)
21 protocolHandshakeTimeout = time.Second * 10
22 handshakeRetryTicker = 4 * time.Second
26 //ErrProtocolHandshakeTimeout peers handshake timeout
27 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
28 ErrStatusRequest = errors.New("Status request error")
31 // Response describes the response standard.
32 type Response struct {
33 Status string `json:"status,omitempty"`
34 Msg string `json:"msg,omitempty"`
35 Data interface{} `json:"data,omitempty"`
38 type initalPeerStatus struct {
44 //ProtocolReactor handles new coming protocol message.
45 type ProtocolReactor struct {
49 blockKeeper *blockKeeper
50 txPool *protocol.TxPool
54 handshakeMu sync.Mutex
56 newPeerCh chan struct{}
57 quitReqBlockCh chan *string
59 peerStatusCh chan *initalPeerStatus
62 // NewProtocolReactor returns the reactor of whole blockchain.
63 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
64 pr := &ProtocolReactor{
66 blockKeeper: blockPeer,
73 quitReqBlockCh: quitReqBlockCh,
74 peerStatusCh: make(chan *initalPeerStatus),
76 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
80 // GetChannels implements Reactor
81 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
82 return []*p2p.ChannelDescriptor{
83 &p2p.ChannelDescriptor{
84 ID: BlockchainChannel,
86 SendQueueCapacity: 100,
91 // OnStart implements BaseService
92 func (pr *ProtocolReactor) OnStart() error {
93 pr.BaseReactor.OnStart()
97 // OnStop implements BaseService
98 func (pr *ProtocolReactor) OnStop() {
99 pr.BaseReactor.OnStop()
102 // syncTransactions starts sending all currently pending transactions to the given peer.
103 func (pr *ProtocolReactor) syncTransactions(p *peer) {
104 pending := pr.txPool.GetTransactions()
105 if len(pending) == 0 {
108 txs := make([]*types.Tx, len(pending))
109 for i, batch := range pending {
112 pr.txSyncCh <- &txsync{p, txs}
115 // AddPeer implements Reactor by sending our state to peer.
116 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
117 pr.handshakeMu.Lock()
118 defer pr.handshakeMu.Unlock()
120 if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
121 return ErrStatusRequest
123 retryTicker := time.Tick(handshakeRetryTicker)
124 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
127 case status := <-pr.peerStatusCh:
128 if status.peerID == peer.Key {
129 pr.peers.AddPeer(peer)
130 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
131 pr.syncTransactions(pr.peers.Peer(peer.Key))
132 pr.newPeerCh <- struct{}{}
136 if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
137 return ErrStatusRequest
139 case <-handshakeWait.C:
140 return ErrProtocolHandshakeTimeout
145 // RemovePeer implements Reactor by removing peer from the pool.
146 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
147 pr.quitReqBlockCh <- &peer.Key
148 pr.peers.RemovePeer(peer.Key)
151 // Receive implements Reactor by handling 4 types of messages (look below).
152 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
153 _, msg, err := DecodeMessage(msgBytes)
155 log.Errorf("Error decoding messagek %v", err)
158 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
160 switch msg := msg.(type) {
161 case *BlockRequestMessage:
162 var block *types.Block
165 block, err = pr.chain.GetBlockByHeight(msg.Height)
167 block, err = pr.chain.GetBlockByHash(msg.GetHash())
170 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
173 response, err := NewBlockResponseMessage(block)
175 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
178 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
180 case *BlockResponseMessage:
181 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
182 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
184 case *StatusRequestMessage:
185 blockHeader := pr.chain.BestBlockHeader()
186 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
188 case *StatusResponseMessage:
189 peerStatus := &initalPeerStatus{
194 pr.peerStatusCh <- peerStatus
196 case *TransactionNotifyMessage:
197 tx, err := msg.GetTransaction()
199 log.Errorf("Error decoding new tx %v", err)
202 pr.blockKeeper.AddTx(tx, src.Key)
204 case *MineBlockMessage:
205 block, err := msg.GetMineBlock()
207 log.Errorf("Error decoding mined block %v", err)
210 // Mark the peer as owning the block and schedule it for import
212 pr.peers.MarkBlock(src.Key, &hash)
213 pr.fetcher.Enqueue(src.Key, block)
214 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
217 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))