OSDN Git Service

move connection to it's folder
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "strings"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         cmn "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p"
14         "github.com/bytom/p2p/connection"
15         "github.com/bytom/protocol"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/types"
18 )
19
20 const (
21         // BlockchainChannel is a channel for blocks and status updates
22         BlockchainChannel        = byte(0x40)
23         protocolHandshakeTimeout = time.Second * 10
24         handshakeRetryTicker     = 4 * time.Second
25 )
26
27 var (
28         //ErrProtocolHandshakeTimeout peers handshake timeout
29         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
30         ErrStatusRequest            = errors.New("Status request error")
31         ErrDiffGenesisHash          = errors.New("Different genesis hash")
32 )
33
34 // Response describes the response standard.
35 type Response struct {
36         Status string      `json:"status,omitempty"`
37         Msg    string      `json:"msg,omitempty"`
38         Data   interface{} `json:"data,omitempty"`
39 }
40
41 type initalPeerStatus struct {
42         peerID      string
43         height      uint64
44         hash        *bc.Hash
45         genesisHash *bc.Hash
46 }
47
48 //ProtocolReactor handles new coming protocol message.
49 type ProtocolReactor struct {
50         p2p.BaseReactor
51
52         chain       *protocol.Chain
53         blockKeeper *blockKeeper
54         txPool      *protocol.TxPool
55         sw          *p2p.Switch
56         fetcher     *Fetcher
57         peers       *peerSet
58         handshakeMu sync.Mutex
59         genesisHash bc.Hash
60
61         newPeerCh      chan struct{}
62         quitReqBlockCh chan *string
63         txSyncCh       chan *txsync
64         peerStatusCh   chan *initalPeerStatus
65 }
66
67 // NewProtocolReactor returns the reactor of whole blockchain.
68 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
69         pr := &ProtocolReactor{
70                 chain:          chain,
71                 blockKeeper:    blockPeer,
72                 txPool:         txPool,
73                 sw:             sw,
74                 fetcher:        fetcher,
75                 peers:          peers,
76                 newPeerCh:      newPeerCh,
77                 txSyncCh:       txSyncCh,
78                 quitReqBlockCh: quitReqBlockCh,
79                 peerStatusCh:   make(chan *initalPeerStatus),
80         }
81         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
82         genesisBlock, _ := pr.chain.GetBlockByHeight(0)
83         pr.genesisHash = genesisBlock.Hash()
84
85         return pr
86 }
87
88 // GetChannels implements Reactor
89 func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
90         return []*connection.ChannelDescriptor{
91                 &connection.ChannelDescriptor{
92                         ID:                BlockchainChannel,
93                         Priority:          5,
94                         SendQueueCapacity: 100,
95                 },
96         }
97 }
98
99 // OnStart implements BaseService
100 func (pr *ProtocolReactor) OnStart() error {
101         pr.BaseReactor.OnStart()
102         return nil
103 }
104
105 // OnStop implements BaseService
106 func (pr *ProtocolReactor) OnStop() {
107         pr.BaseReactor.OnStop()
108 }
109
110 // syncTransactions starts sending all currently pending transactions to the given peer.
111 func (pr *ProtocolReactor) syncTransactions(p *peer) {
112         if p == nil {
113                 return
114         }
115         pending := pr.txPool.GetTransactions()
116         if len(pending) == 0 {
117                 return
118         }
119         txs := make([]*types.Tx, len(pending))
120         for i, batch := range pending {
121                 txs[i] = batch.Tx
122         }
123         pr.txSyncCh <- &txsync{p, txs}
124 }
125
126 // AddPeer implements Reactor by sending our state to peer.
127 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
128         pr.handshakeMu.Lock()
129         defer pr.handshakeMu.Unlock()
130         if peer == nil {
131                 return errPeerDropped
132         }
133         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
134                 return ErrStatusRequest
135         }
136         retryTicker := time.Tick(handshakeRetryTicker)
137         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
138         for {
139                 select {
140                 case status := <-pr.peerStatusCh:
141                         if status.peerID == peer.Key {
142                                 if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
143                                         log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
144                                         return ErrDiffGenesisHash
145                                 }
146                                 pr.peers.AddPeer(peer)
147                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
148                                 prPeer, ok := pr.peers.Peer(peer.Key)
149                                 if !ok {
150                                         return errPeerDropped
151                                 }
152                                 pr.syncTransactions(prPeer)
153                                 pr.newPeerCh <- struct{}{}
154                                 return nil
155                         }
156                 case <-retryTicker:
157                         if peer == nil {
158                                 return errPeerDropped
159                         }
160                         if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
161                                 return ErrStatusRequest
162                         }
163                 case <-handshakeWait.C:
164                         return ErrProtocolHandshakeTimeout
165                 }
166         }
167 }
168
169 // RemovePeer implements Reactor by removing peer from the pool.
170 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
171         select {
172         case pr.quitReqBlockCh <- &peer.Key:
173         default:
174                 log.Warning("quitReqBlockCh is full")
175         }
176         pr.peers.RemovePeer(peer.Key)
177 }
178
179 // Receive implements Reactor by handling 4 types of messages (look below).
180 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
181         _, msg, err := DecodeMessage(msgBytes)
182         if err != nil {
183                 log.Errorf("Error decoding message %v", err)
184                 return
185         }
186         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
187
188         switch msg := msg.(type) {
189         case *BlockRequestMessage:
190                 var block *types.Block
191                 var err error
192                 if msg.Height != 0 {
193                         block, err = pr.chain.GetBlockByHeight(msg.Height)
194                 } else {
195                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
196                 }
197                 if err != nil {
198                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
199                         return
200                 }
201                 response, err := NewBlockResponseMessage(block)
202                 if err != nil {
203                         log.Errorf("Fail on BlockRequestMessage create response: %v", err)
204                         return
205                 }
206                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
207
208         case *BlockResponseMessage:
209                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
210                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
211
212         case *StatusRequestMessage:
213                 blockHeader := pr.chain.BestBlockHeader()
214                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
215
216         case *StatusResponseMessage:
217                 peerStatus := &initalPeerStatus{
218                         peerID:      src.Key,
219                         height:      msg.Height,
220                         hash:        msg.GetHash(),
221                         genesisHash: msg.GetGenesisHash(),
222                 }
223                 pr.peerStatusCh <- peerStatus
224
225         case *TransactionNotifyMessage:
226                 tx, err := msg.GetTransaction()
227                 if err != nil {
228                         log.Errorf("Error decoding new tx %v", err)
229                         return
230                 }
231                 pr.blockKeeper.AddTx(tx, src.Key)
232
233         case *MineBlockMessage:
234                 block, err := msg.GetMineBlock()
235                 if err != nil {
236                         log.Errorf("Error decoding mined block %v", err)
237                         return
238                 }
239                 // Mark the peer as owning the block and schedule it for import
240                 hash := block.Hash()
241                 pr.peers.MarkBlock(src.Key, &hash)
242                 pr.fetcher.Enqueue(src.Key, block)
243                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
244
245         default:
246                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
247         }
248 }