7 log "github.com/sirupsen/logrus"
8 cmn "github.com/tendermint/tmlibs/common"
10 "github.com/bytom/errors"
11 "github.com/bytom/p2p"
12 "github.com/bytom/p2p/trust"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/protocol/bc/types"
19 // BlockchainChannel is a channel for blocks and status updates
20 BlockchainChannel = byte(0x40)
21 protocolHandshakeTimeout = time.Second * 10
25 //ErrProtocolHandshakeTimeout peers handshake timeout
26 ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
29 // Response describes the response standard.
30 type Response struct {
31 Status string `json:"status,omitempty"`
32 Msg string `json:"msg,omitempty"`
33 Data interface{} `json:"data,omitempty"`
36 type initalPeerStatus struct {
42 //ProtocolReactor handles new coming protocol message.
43 type ProtocolReactor struct {
47 blockKeeper *blockKeeper
48 txPool *protocol.TxPool
53 newPeerCh chan struct{}
54 quitReqBlockCh chan *string
56 peerStatusCh chan *initalPeerStatus
59 // NewProtocolReactor returns the reactor of whole blockchain.
60 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
61 pr := &ProtocolReactor{
63 blockKeeper: blockPeer,
70 quitReqBlockCh: quitReqBlockCh,
71 peerStatusCh: make(chan *initalPeerStatus),
73 pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
77 // GetChannels implements Reactor
78 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
79 return []*p2p.ChannelDescriptor{
80 &p2p.ChannelDescriptor{
81 ID: BlockchainChannel,
83 SendQueueCapacity: 100,
88 // OnStart implements BaseService
89 func (pr *ProtocolReactor) OnStart() error {
90 pr.BaseReactor.OnStart()
94 // OnStop implements BaseService
95 func (pr *ProtocolReactor) OnStop() {
96 pr.BaseReactor.OnStop()
99 // syncTransactions starts sending all currently pending transactions to the given peer.
100 func (pr *ProtocolReactor) syncTransactions(p *peer) {
101 pending := pr.txPool.GetTransactions()
102 if len(pending) == 0 {
105 txs := make([]*types.Tx, len(pending))
106 for i, batch := range pending {
109 pr.txSyncCh <- &txsync{p, txs}
112 // AddPeer implements Reactor by sending our state to peer.
113 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
114 peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
115 handshakeWait := time.NewTimer(protocolHandshakeTimeout)
118 case status := <-pr.peerStatusCh:
119 if status.peerID == peer.Key {
120 pr.peers.AddPeer(peer)
121 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
122 pr.syncTransactions(pr.peers.Peer(peer.Key))
123 pr.newPeerCh <- struct{}{}
126 case <-handshakeWait.C:
127 return ErrProtocolHandshakeTimeout
132 // RemovePeer implements Reactor by removing peer from the pool.
133 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
134 pr.quitReqBlockCh <- &peer.Key
135 pr.peers.RemovePeer(peer.Key)
138 // Receive implements Reactor by handling 4 types of messages (look below).
139 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
140 var tm *trust.TrustMetric
141 key := src.Connection().RemoteAddress.IP.String()
142 if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
143 log.Errorf("Can't get peer trust metric")
147 _, msg, err := DecodeMessage(msgBytes)
149 log.Errorf("Error decoding messagek %v", err)
152 log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
154 switch msg := msg.(type) {
155 case *BlockRequestMessage:
156 var block *types.Block
159 block, err = pr.chain.GetBlockByHeight(msg.Height)
161 block, err = pr.chain.GetBlockByHash(msg.GetHash())
164 log.Errorf("Fail on BlockRequestMessage get block: %v", err)
167 response, err := NewBlockResponseMessage(block)
169 log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
172 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
174 case *BlockResponseMessage:
175 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
176 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
178 case *StatusRequestMessage:
179 blockHeader := pr.chain.BestBlockHeader()
180 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
182 case *StatusResponseMessage:
183 peerStatus := &initalPeerStatus{
188 pr.peerStatusCh <- peerStatus
190 case *TransactionNotifyMessage:
191 tx, err := msg.GetTransaction()
193 log.Errorf("Error decoding new tx %v", err)
196 pr.blockKeeper.AddTx(tx, src.Key)
198 case *MineBlockMessage:
199 block, err := msg.GetMineBlock()
201 log.Errorf("Error decoding mined block %v", err)
204 // Mark the peer as owning the block and schedule it for import
206 pr.peers.MarkBlock(src.Key, &hash)
207 pr.fetcher.Enqueue(src.Key, block)
208 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
211 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))