OSDN Git Service

Merge pull request #660 from Bytom/p2p_test
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "sync"
6         "time"
7
8         log "github.com/sirupsen/logrus"
9         cmn "github.com/tendermint/tmlibs/common"
10
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/protocol/bc/types"
17 )
18
19 const (
20         // BlockchainChannel is a channel for blocks and status updates
21         BlockchainChannel        = byte(0x40)
22         protocolHandshakeTimeout = time.Second * 10
23         handshakeRetryTicker     = 4 * time.Second
24 )
25
26 var (
27         //ErrProtocolHandshakeTimeout peers handshake timeout
28         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
29         ErrStatusRequest            = errors.New("Status request error")
30 )
31
32 // Response describes the response standard.
33 type Response struct {
34         Status string      `json:"status,omitempty"`
35         Msg    string      `json:"msg,omitempty"`
36         Data   interface{} `json:"data,omitempty"`
37 }
38
39 type initalPeerStatus struct {
40         peerID string
41         height uint64
42         hash   *bc.Hash
43 }
44
45 //ProtocolReactor handles new coming protocol message.
46 type ProtocolReactor struct {
47         p2p.BaseReactor
48
49         chain       *protocol.Chain
50         blockKeeper *blockKeeper
51         txPool      *protocol.TxPool
52         sw          *p2p.Switch
53         fetcher     *Fetcher
54         peers       *peerSet
55         handshakeMu sync.Mutex
56
57         newPeerCh      chan struct{}
58         quitReqBlockCh chan *string
59         txSyncCh       chan *txsync
60         peerStatusCh   chan *initalPeerStatus
61 }
62
63 // NewProtocolReactor returns the reactor of whole blockchain.
64 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
65         pr := &ProtocolReactor{
66                 chain:          chain,
67                 blockKeeper:    blockPeer,
68                 txPool:         txPool,
69                 sw:             sw,
70                 fetcher:        fetcher,
71                 peers:          peers,
72                 newPeerCh:      newPeerCh,
73                 txSyncCh:       txSyncCh,
74                 quitReqBlockCh: quitReqBlockCh,
75                 peerStatusCh:   make(chan *initalPeerStatus),
76         }
77         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
78         return pr
79 }
80
81 // GetChannels implements Reactor
82 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
83         return []*p2p.ChannelDescriptor{
84                 &p2p.ChannelDescriptor{
85                         ID:                BlockchainChannel,
86                         Priority:          5,
87                         SendQueueCapacity: 100,
88                 },
89         }
90 }
91
92 // OnStart implements BaseService
93 func (pr *ProtocolReactor) OnStart() error {
94         pr.BaseReactor.OnStart()
95         return nil
96 }
97
98 // OnStop implements BaseService
99 func (pr *ProtocolReactor) OnStop() {
100         pr.BaseReactor.OnStop()
101 }
102
103 // syncTransactions starts sending all currently pending transactions to the given peer.
104 func (pr *ProtocolReactor) syncTransactions(p *peer) {
105         pending := pr.txPool.GetTransactions()
106         if len(pending) == 0 {
107                 return
108         }
109         txs := make([]*types.Tx, len(pending))
110         for i, batch := range pending {
111                 txs[i] = batch.Tx
112         }
113         pr.txSyncCh <- &txsync{p, txs}
114 }
115
116 // AddPeer implements Reactor by sending our state to peer.
117 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
118         pr.handshakeMu.Lock()
119         defer pr.handshakeMu.Unlock()
120
121         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
122                 return ErrStatusRequest
123         }
124         retryTicker := time.Tick(handshakeRetryTicker)
125         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
126         for {
127                 select {
128                 case status := <-pr.peerStatusCh:
129                         if status.peerID == peer.Key {
130                                 pr.peers.AddPeer(peer)
131                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
132                                 pr.syncTransactions(pr.peers.Peer(peer.Key))
133                                 pr.newPeerCh <- struct{}{}
134                                 return nil
135                         }
136                 case <-retryTicker:
137                         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
138                                 return ErrStatusRequest
139                         }
140                 case <-handshakeWait.C:
141                         return ErrProtocolHandshakeTimeout
142                 }
143         }
144 }
145
146 // RemovePeer implements Reactor by removing peer from the pool.
147 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
148         pr.quitReqBlockCh <- &peer.Key
149         pr.peers.RemovePeer(peer.Key)
150 }
151
152 // Receive implements Reactor by handling 4 types of messages (look below).
153 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
154         var tm *trust.TrustMetric
155         key := src.Connection().RemoteAddress.IP.String()
156         if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
157                 log.Errorf("Can't get peer trust metric")
158                 return
159         }
160
161         _, msg, err := DecodeMessage(msgBytes)
162         if err != nil {
163                 log.Errorf("Error decoding messagek %v", err)
164                 return
165         }
166         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
167
168         switch msg := msg.(type) {
169         case *BlockRequestMessage:
170                 var block *types.Block
171                 var err error
172                 if msg.Height != 0 {
173                         block, err = pr.chain.GetBlockByHeight(msg.Height)
174                 } else {
175                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
176                 }
177                 if err != nil {
178                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
179                         return
180                 }
181                 response, err := NewBlockResponseMessage(block)
182                 if err != nil {
183                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
184                         return
185                 }
186                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
187
188         case *BlockResponseMessage:
189                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
190                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
191
192         case *StatusRequestMessage:
193                 blockHeader := pr.chain.BestBlockHeader()
194                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
195
196         case *StatusResponseMessage:
197                 peerStatus := &initalPeerStatus{
198                         peerID: src.Key,
199                         height: msg.Height,
200                         hash:   msg.GetHash(),
201                 }
202                 pr.peerStatusCh <- peerStatus
203
204         case *TransactionNotifyMessage:
205                 tx, err := msg.GetTransaction()
206                 if err != nil {
207                         log.Errorf("Error decoding new tx %v", err)
208                         return
209                 }
210                 pr.blockKeeper.AddTx(tx, src.Key)
211
212         case *MineBlockMessage:
213                 block, err := msg.GetMineBlock()
214                 if err != nil {
215                         log.Errorf("Error decoding mined block %v", err)
216                         return
217                 }
218                 // Mark the peer as owning the block and schedule it for import
219                 hash := block.Hash()
220                 pr.peers.MarkBlock(src.Key, &hash)
221                 pr.fetcher.Enqueue(src.Key, block)
222                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
223
224         default:
225                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
226         }
227 }