OSDN Git Service

Merge pull request #681 from Bytom/fix-tx-feed
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "sync"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9         cmn "github.com/tendermint/tmlibs/common"
10
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 const (
19         // BlockchainChannel is a channel for blocks and status updates
20         BlockchainChannel        = byte(0x40)
21         protocolHandshakeTimeout = time.Second * 10
22         handshakeRetryTicker     = 4 * time.Second
23 )
24
25 var (
26         //ErrProtocolHandshakeTimeout peers handshake timeout
27         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
28         ErrStatusRequest            = errors.New("Status request error")
29 )
30
31 // Response describes the response standard.
32 type Response struct {
33         Status string      `json:"status,omitempty"`
34         Msg    string      `json:"msg,omitempty"`
35         Data   interface{} `json:"data,omitempty"`
36 }
37
38 type initalPeerStatus struct {
39         peerID string
40         height uint64
41         hash   *bc.Hash
42 }
43
44 //ProtocolReactor handles new coming protocol message.
45 type ProtocolReactor struct {
46         p2p.BaseReactor
47
48         chain       *protocol.Chain
49         blockKeeper *blockKeeper
50         txPool      *protocol.TxPool
51         sw          *p2p.Switch
52         fetcher     *Fetcher
53         peers       *peerSet
54         handshakeMu sync.Mutex
55
56         newPeerCh      chan struct{}
57         quitReqBlockCh chan *string
58         txSyncCh       chan *txsync
59         peerStatusCh   chan *initalPeerStatus
60 }
61
62 // NewProtocolReactor returns the reactor of whole blockchain.
63 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
64         pr := &ProtocolReactor{
65                 chain:          chain,
66                 blockKeeper:    blockPeer,
67                 txPool:         txPool,
68                 sw:             sw,
69                 fetcher:        fetcher,
70                 peers:          peers,
71                 newPeerCh:      newPeerCh,
72                 txSyncCh:       txSyncCh,
73                 quitReqBlockCh: quitReqBlockCh,
74                 peerStatusCh:   make(chan *initalPeerStatus),
75         }
76         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
77         return pr
78 }
79
80 // GetChannels implements Reactor
81 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
82         return []*p2p.ChannelDescriptor{
83                 &p2p.ChannelDescriptor{
84                         ID:                BlockchainChannel,
85                         Priority:          5,
86                         SendQueueCapacity: 100,
87                 },
88         }
89 }
90
91 // OnStart implements BaseService
92 func (pr *ProtocolReactor) OnStart() error {
93         pr.BaseReactor.OnStart()
94         return nil
95 }
96
97 // OnStop implements BaseService
98 func (pr *ProtocolReactor) OnStop() {
99         pr.BaseReactor.OnStop()
100 }
101
102 // syncTransactions starts sending all currently pending transactions to the given peer.
103 func (pr *ProtocolReactor) syncTransactions(p *peer) {
104         pending := pr.txPool.GetTransactions()
105         if len(pending) == 0 {
106                 return
107         }
108         txs := make([]*types.Tx, len(pending))
109         for i, batch := range pending {
110                 txs[i] = batch.Tx
111         }
112         pr.txSyncCh <- &txsync{p, txs}
113 }
114
115 // AddPeer implements Reactor by sending our state to peer.
116 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
117         pr.handshakeMu.Lock()
118         defer pr.handshakeMu.Unlock()
119
120         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
121                 return ErrStatusRequest
122         }
123         retryTicker := time.Tick(handshakeRetryTicker)
124         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
125         for {
126                 select {
127                 case status := <-pr.peerStatusCh:
128                         if status.peerID == peer.Key {
129                                 pr.peers.AddPeer(peer)
130                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
131                                 pr.syncTransactions(pr.peers.Peer(peer.Key))
132                                 pr.newPeerCh <- struct{}{}
133                                 return nil
134                         }
135                 case <-retryTicker:
136                         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
137                                 return ErrStatusRequest
138                         }
139                 case <-handshakeWait.C:
140                         return ErrProtocolHandshakeTimeout
141                 }
142         }
143 }
144
145 // RemovePeer implements Reactor by removing peer from the pool.
146 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
147         pr.quitReqBlockCh <- &peer.Key
148         pr.peers.RemovePeer(peer.Key)
149 }
150
151 // Receive implements Reactor by handling 4 types of messages (look below).
152 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
153         _, msg, err := DecodeMessage(msgBytes)
154         if err != nil {
155                 log.Errorf("Error decoding messagek %v", err)
156                 return
157         }
158         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
159
160         switch msg := msg.(type) {
161         case *BlockRequestMessage:
162                 var block *types.Block
163                 var err error
164                 if msg.Height != 0 {
165                         block, err = pr.chain.GetBlockByHeight(msg.Height)
166                 } else {
167                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
168                 }
169                 if err != nil {
170                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
171                         return
172                 }
173                 response, err := NewBlockResponseMessage(block)
174                 if err != nil {
175                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
176                         return
177                 }
178                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
179
180         case *BlockResponseMessage:
181                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
182                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
183
184         case *StatusRequestMessage:
185                 blockHeader := pr.chain.BestBlockHeader()
186                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
187
188         case *StatusResponseMessage:
189                 peerStatus := &initalPeerStatus{
190                         peerID: src.Key,
191                         height: msg.Height,
192                         hash:   msg.GetHash(),
193                 }
194                 pr.peerStatusCh <- peerStatus
195
196         case *TransactionNotifyMessage:
197                 tx, err := msg.GetTransaction()
198                 if err != nil {
199                         log.Errorf("Error decoding new tx %v", err)
200                         return
201                 }
202                 pr.blockKeeper.AddTx(tx, src.Key)
203
204         case *MineBlockMessage:
205                 block, err := msg.GetMineBlock()
206                 if err != nil {
207                         log.Errorf("Error decoding mined block %v", err)
208                         return
209                 }
210                 // Mark the peer as owning the block and schedule it for import
211                 hash := block.Hash()
212                 pr.peers.MarkBlock(src.Key, &hash)
213                 pr.fetcher.Enqueue(src.Key, block)
214                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
215
216         default:
217                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
218         }
219 }