OSDN Git Service

Merge pull request #671 from Bytom/dev
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "time"
6
7         log "github.com/sirupsen/logrus"
8         cmn "github.com/tendermint/tmlibs/common"
9
10         "github.com/bytom/errors"
11         "github.com/bytom/p2p"
12         "github.com/bytom/p2p/trust"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 const (
19         // BlockchainChannel is a channel for blocks and status updates
20         BlockchainChannel        = byte(0x40)
21         protocolHandshakeTimeout = time.Second * 10
22 )
23
24 var (
25         //ErrProtocolHandshakeTimeout peers handshake timeout
26         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
27 )
28
29 // Response describes the response standard.
30 type Response struct {
31         Status string      `json:"status,omitempty"`
32         Msg    string      `json:"msg,omitempty"`
33         Data   interface{} `json:"data,omitempty"`
34 }
35
36 type initalPeerStatus struct {
37         peerID string
38         height uint64
39         hash   *bc.Hash
40 }
41
42 //ProtocolReactor handles new coming protocol message.
43 type ProtocolReactor struct {
44         p2p.BaseReactor
45
46         chain       *protocol.Chain
47         blockKeeper *blockKeeper
48         txPool      *protocol.TxPool
49         sw          *p2p.Switch
50         fetcher     *Fetcher
51         peers       *peerSet
52
53         newPeerCh      chan struct{}
54         quitReqBlockCh chan *string
55         txSyncCh       chan *txsync
56         peerStatusCh   chan *initalPeerStatus
57 }
58
59 // NewProtocolReactor returns the reactor of whole blockchain.
60 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
61         pr := &ProtocolReactor{
62                 chain:          chain,
63                 blockKeeper:    blockPeer,
64                 txPool:         txPool,
65                 sw:             sw,
66                 fetcher:        fetcher,
67                 peers:          peers,
68                 newPeerCh:      newPeerCh,
69                 txSyncCh:       txSyncCh,
70                 quitReqBlockCh: quitReqBlockCh,
71                 peerStatusCh:   make(chan *initalPeerStatus),
72         }
73         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
74         return pr
75 }
76
77 // GetChannels implements Reactor
78 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
79         return []*p2p.ChannelDescriptor{
80                 &p2p.ChannelDescriptor{
81                         ID:                BlockchainChannel,
82                         Priority:          5,
83                         SendQueueCapacity: 100,
84                 },
85         }
86 }
87
88 // OnStart implements BaseService
89 func (pr *ProtocolReactor) OnStart() error {
90         pr.BaseReactor.OnStart()
91         return nil
92 }
93
94 // OnStop implements BaseService
95 func (pr *ProtocolReactor) OnStop() {
96         pr.BaseReactor.OnStop()
97 }
98
99 // syncTransactions starts sending all currently pending transactions to the given peer.
100 func (pr *ProtocolReactor) syncTransactions(p *peer) {
101         pending := pr.txPool.GetTransactions()
102         if len(pending) == 0 {
103                 return
104         }
105         txs := make([]*types.Tx, len(pending))
106         for i, batch := range pending {
107                 txs[i] = batch.Tx
108         }
109         pr.txSyncCh <- &txsync{p, txs}
110 }
111
112 // AddPeer implements Reactor by sending our state to peer.
113 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
114         peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
115         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
116         for {
117                 select {
118                 case status := <-pr.peerStatusCh:
119                         if status.peerID == peer.Key {
120                                 pr.peers.AddPeer(peer)
121                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
122                                 pr.syncTransactions(pr.peers.Peer(peer.Key))
123                                 pr.newPeerCh <- struct{}{}
124                                 return nil
125                         }
126                 case <-handshakeWait.C:
127                         return ErrProtocolHandshakeTimeout
128                 }
129         }
130 }
131
132 // RemovePeer implements Reactor by removing peer from the pool.
133 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
134         pr.quitReqBlockCh <- &peer.Key
135         pr.peers.RemovePeer(peer.Key)
136 }
137
138 // Receive implements Reactor by handling 4 types of messages (look below).
139 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
140         var tm *trust.TrustMetric
141         key := src.Connection().RemoteAddress.IP.String()
142         if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
143                 log.Errorf("Can't get peer trust metric")
144                 return
145         }
146
147         _, msg, err := DecodeMessage(msgBytes)
148         if err != nil {
149                 log.Errorf("Error decoding messagek %v", err)
150                 return
151         }
152         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
153
154         switch msg := msg.(type) {
155         case *BlockRequestMessage:
156                 var block *types.Block
157                 var err error
158                 if msg.Height != 0 {
159                         block, err = pr.chain.GetBlockByHeight(msg.Height)
160                 } else {
161                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
162                 }
163                 if err != nil {
164                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
165                         return
166                 }
167                 response, err := NewBlockResponseMessage(block)
168                 if err != nil {
169                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
170                         return
171                 }
172                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
173
174         case *BlockResponseMessage:
175                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
176                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
177
178         case *StatusRequestMessage:
179                 blockHeader := pr.chain.BestBlockHeader()
180                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
181
182         case *StatusResponseMessage:
183                 peerStatus := &initalPeerStatus{
184                         peerID: src.Key,
185                         height: msg.Height,
186                         hash:   msg.GetHash(),
187                 }
188                 pr.peerStatusCh <- peerStatus
189
190         case *TransactionNotifyMessage:
191                 tx, err := msg.GetTransaction()
192                 if err != nil {
193                         log.Errorf("Error decoding new tx %v", err)
194                         return
195                 }
196                 pr.blockKeeper.AddTx(tx, src.Key)
197
198         case *MineBlockMessage:
199                 block, err := msg.GetMineBlock()
200                 if err != nil {
201                         log.Errorf("Error decoding mined block %v", err)
202                         return
203                 }
204                 // Mark the peer as owning the block and schedule it for import
205                 hash := block.Hash()
206                 pr.peers.MarkBlock(src.Key, &hash)
207                 pr.fetcher.Enqueue(src.Key, block)
208                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
209
210         default:
211                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
212         }
213 }