9 log "github.com/sirupsen/logrus"
10 cmn "github.com/tendermint/tmlibs/common"
12 "github.com/bytom/errors"
13 "github.com/bytom/p2p"
14 "github.com/bytom/p2p/connection"
15 "github.com/bytom/protocol"
16 "github.com/bytom/protocol/bc"
17 "github.com/bytom/protocol/bc/types"
21 // BlockchainChannel is a channel for blocks and status updates
22 BlockchainChannel = byte(0x40)
23 protocolHandshakeTimeout = time.Second * 10
24 handshakeRetryTicker = 4 * time.Second
28 //ErrProtocolHandshakeTimeout peers handshake timeout
29 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
30 ErrStatusRequest = errors.New("Status request error")
31 ErrDiffGenesisHash = errors.New("Different genesis hash")
34 // Response describes the response standard.
35 type Response struct {
36 Status string `json:"status,omitempty"`
37 Msg string `json:"msg,omitempty"`
38 Data interface{} `json:"data,omitempty"`
41 type initalPeerStatus struct {
48 //ProtocolReactor handles new coming protocol message.
49 type ProtocolReactor struct {
53 blockKeeper *blockKeeper
54 txPool *protocol.TxPool
58 handshakeMu sync.Mutex
61 newPeerCh chan struct{}
62 quitReqBlockCh chan *string
64 peerStatusCh chan *initalPeerStatus
67 // NewProtocolReactor returns the reactor of whole blockchain.
68 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
69 pr := &ProtocolReactor{
71 blockKeeper: blockPeer,
78 quitReqBlockCh: quitReqBlockCh,
79 peerStatusCh: make(chan *initalPeerStatus),
81 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
82 genesisBlock, _ := pr.chain.GetBlockByHeight(0)
83 pr.genesisHash = genesisBlock.Hash()
88 // GetChannels implements Reactor
89 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
90 return []*connection.ChannelDescriptor{
91 &connection.ChannelDescriptor{
92 ID: BlockchainChannel,
94 SendQueueCapacity: 100,
99 // OnStart implements BaseService
100 func (pr *ProtocolReactor) OnStart() error {
101 pr.BaseReactor.OnStart()
105 // OnStop implements BaseService
106 func (pr *ProtocolReactor) OnStop() {
107 pr.BaseReactor.OnStop()
110 // syncTransactions starts sending all currently pending transactions to the given peer.
111 func (pr *ProtocolReactor) syncTransactions(p *peer) {
115 pending := pr.txPool.GetTransactions()
116 if len(pending) == 0 {
119 txs := make([]*types.Tx, len(pending))
120 for i, batch := range pending {
123 pr.txSyncCh <- &txsync{p, txs}
126 // AddPeer implements Reactor by sending our state to peer.
127 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
128 pr.handshakeMu.Lock()
129 defer pr.handshakeMu.Unlock()
131 return errPeerDropped
133 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
134 return ErrStatusRequest
136 retryTicker := time.Tick(handshakeRetryTicker)
137 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
140 case status := <-pr.peerStatusCh:
141 if status.peerID == peer.Key {
142 if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
143 log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
144 return ErrDiffGenesisHash
146 pr.peers.AddPeer(peer)
147 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
148 prPeer, ok := pr.peers.Peer(peer.Key)
150 return errPeerDropped
152 pr.syncTransactions(prPeer)
153 pr.newPeerCh <- struct{}{}
158 return errPeerDropped
160 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
161 return ErrStatusRequest
163 case <-handshakeWait.C:
164 return ErrProtocolHandshakeTimeout
169 // RemovePeer implements Reactor by removing peer from the pool.
170 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
172 case pr.quitReqBlockCh <- &peer.Key:
174 log.Warning("quitReqBlockCh is full")
176 pr.peers.RemovePeer(peer.Key)
179 // Receive implements Reactor by handling 4 types of messages (look below).
180 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
181 _, msg, err := DecodeMessage(msgBytes)
183 log.Errorf("Error decoding message %v", err)
186 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
188 switch msg := msg.(type) {
189 case *BlockRequestMessage:
190 var block *types.Block
193 block, err = pr.chain.GetBlockByHeight(msg.Height)
195 block, err = pr.chain.GetBlockByHash(msg.GetHash())
198 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
201 response, err := NewBlockResponseMessage(block)
203 log.Errorf("Fail on BlockRequestMessage create response: %v", err)
206 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
208 case *BlockResponseMessage:
209 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
210 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
212 case *StatusRequestMessage:
213 blockHeader := pr.chain.BestBlockHeader()
214 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
216 case *StatusResponseMessage:
217 peerStatus := &initalPeerStatus{
221 genesisHash: msg.GetGenesisHash(),
223 pr.peerStatusCh <- peerStatus
225 case *TransactionNotifyMessage:
226 tx, err := msg.GetTransaction()
228 log.Errorf("Error decoding new tx %v", err)
231 pr.blockKeeper.AddTx(tx, src.Key)
233 case *MineBlockMessage:
234 block, err := msg.GetMineBlock()
236 log.Errorf("Error decoding mined block %v", err)
239 // Mark the peer as owning the block and schedule it for import
241 pr.peers.MarkBlock(src.Key, &hash)
242 pr.fetcher.Enqueue(src.Key, block)
243 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
246 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))