OSDN Git Service

Merge branch 'p2p_test' into dev
[bytom/bytom.git] / netsync / protocol_reactor.go
1 package netsync
2
3 import (
4         "reflect"
5         "strings"
6         "sync"
7         "time"
8
9         log "github.com/sirupsen/logrus"
10         cmn "github.com/tendermint/tmlibs/common"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p"
14         "github.com/bytom/p2p/trust"
15         "github.com/bytom/protocol"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/types"
18 )
19
20 const (
21         // BlockchainChannel is a channel for blocks and status updates
22         BlockchainChannel        = byte(0x40)
23         protocolHandshakeTimeout = time.Second * 10
24         handshakeRetryTicker     = 4 * time.Second
25 )
26
27 var (
28         //ErrProtocolHandshakeTimeout peers handshake timeout
29         ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
30         ErrStatusRequest            = errors.New("Status request error")
31 )
32
33 // Response describes the response standard.
34 type Response struct {
35         Status string      `json:"status,omitempty"`
36         Msg    string      `json:"msg,omitempty"`
37         Data   interface{} `json:"data,omitempty"`
38 }
39
40 type initalPeerStatus struct {
41         peerID string
42         height uint64
43         hash   *bc.Hash
44 }
45
46 //ProtocolReactor handles new coming protocol message.
47 type ProtocolReactor struct {
48         p2p.BaseReactor
49
50         chain       *protocol.Chain
51         blockKeeper *blockKeeper
52         txPool      *protocol.TxPool
53         sw          *p2p.Switch
54         fetcher     *Fetcher
55         peers       *peerSet
56         handshakeMu sync.Mutex
57
58         newPeerCh      chan struct{}
59         quitReqBlockCh chan *string
60         txSyncCh       chan *txsync
61         peerStatusCh   chan *initalPeerStatus
62 }
63
64 // NewProtocolReactor returns the reactor of whole blockchain.
65 func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
66         pr := &ProtocolReactor{
67                 chain:          chain,
68                 blockKeeper:    blockPeer,
69                 txPool:         txPool,
70                 sw:             sw,
71                 fetcher:        fetcher,
72                 peers:          peers,
73                 newPeerCh:      newPeerCh,
74                 txSyncCh:       txSyncCh,
75                 quitReqBlockCh: quitReqBlockCh,
76                 peerStatusCh:   make(chan *initalPeerStatus),
77         }
78         pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
79         return pr
80 }
81
82 // GetChannels implements Reactor
83 func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
84         return []*p2p.ChannelDescriptor{
85                 &p2p.ChannelDescriptor{
86                         ID:                BlockchainChannel,
87                         Priority:          5,
88                         SendQueueCapacity: 100,
89                 },
90         }
91 }
92
93 // OnStart implements BaseService
94 func (pr *ProtocolReactor) OnStart() error {
95         pr.BaseReactor.OnStart()
96         return nil
97 }
98
99 // OnStop implements BaseService
100 func (pr *ProtocolReactor) OnStop() {
101         pr.BaseReactor.OnStop()
102 }
103
104 // syncTransactions starts sending all currently pending transactions to the given peer.
105 func (pr *ProtocolReactor) syncTransactions(p *peer) {
106         pending := pr.txPool.GetTransactions()
107         if len(pending) == 0 {
108                 return
109         }
110         txs := make([]*types.Tx, len(pending))
111         for i, batch := range pending {
112                 txs[i] = batch.Tx
113         }
114         pr.txSyncCh <- &txsync{p, txs}
115 }
116
117 // AddPeer implements Reactor by sending our state to peer.
118 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
119         pr.handshakeMu.Lock()
120         defer pr.handshakeMu.Unlock()
121
122         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
123                 return ErrStatusRequest
124         }
125         retryTicker := time.Tick(handshakeRetryTicker)
126         handshakeWait := time.NewTimer(protocolHandshakeTimeout)
127         for {
128                 select {
129                 case status := <-pr.peerStatusCh:
130                         if strings.Compare(status.peerID, peer.Key) == 0 {
131                                 pr.peers.AddPeer(peer)
132                                 pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
133                                 pr.syncTransactions(pr.peers.Peer(peer.Key))
134                                 pr.newPeerCh <- struct{}{}
135                                 return nil
136                         }
137                 case <-retryTicker:
138                         if ok := peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
139                                 return ErrStatusRequest
140                         }
141                 case <-handshakeWait.C:
142                         return ErrProtocolHandshakeTimeout
143                 }
144         }
145 }
146
147 // RemovePeer implements Reactor by removing peer from the pool.
148 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
149         pr.quitReqBlockCh <- &peer.Key
150         pr.peers.RemovePeer(peer.Key)
151 }
152
153 // Receive implements Reactor by handling 4 types of messages (look below).
154 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
155         var tm *trust.TrustMetric
156         key := src.Connection().RemoteAddress.IP.String()
157         if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
158                 log.Errorf("Can't get peer trust metric")
159                 return
160         }
161
162         _, msg, err := DecodeMessage(msgBytes)
163         if err != nil {
164                 log.Errorf("Error decoding messagek %v", err)
165                 return
166         }
167         log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
168
169         switch msg := msg.(type) {
170         case *BlockRequestMessage:
171                 var block *types.Block
172                 var err error
173                 if msg.Height != 0 {
174                         block, err = pr.chain.GetBlockByHeight(msg.Height)
175                 } else {
176                         block, err = pr.chain.GetBlockByHash(msg.GetHash())
177                 }
178                 if err != nil {
179                         log.Errorf("Fail on BlockRequestMessage get block: %v", err)
180                         return
181                 }
182                 response, err := NewBlockResponseMessage(block)
183                 if err != nil {
184                         log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
185                         return
186                 }
187                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
188
189         case *BlockResponseMessage:
190                 log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
191                 pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
192
193         case *StatusRequestMessage:
194                 blockHeader := pr.chain.BestBlockHeader()
195                 src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
196
197         case *StatusResponseMessage:
198                 peerStatus := &initalPeerStatus{
199                         peerID: src.Key,
200                         height: msg.Height,
201                         hash:   msg.GetHash(),
202                 }
203                 pr.peerStatusCh <- peerStatus
204
205         case *TransactionNotifyMessage:
206                 tx, err := msg.GetTransaction()
207                 if err != nil {
208                         log.Errorf("Error decoding new tx %v", err)
209                         return
210                 }
211                 pr.blockKeeper.AddTx(tx, src.Key)
212
213         case *MineBlockMessage:
214                 block, err := msg.GetMineBlock()
215                 if err != nil {
216                         log.Errorf("Error decoding mined block %v", err)
217                         return
218                 }
219                 // Mark the peer as owning the block and schedule it for import
220                 hash := block.Hash()
221                 pr.peers.MarkBlock(src.Key, &hash)
222                 pr.fetcher.Enqueue(src.Key, block)
223                 pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
224
225         default:
226                 log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
227         }
228 }