OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "sync"
7
8         log "github.com/sirupsen/logrus"
9         "gopkg.in/fatih/set.v0"
10
11         "github.com/bytom/consensus"
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 const (
19         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
20         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
21         defaultBanThreshold = uint64(100)
22 )
23
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
26         Addr() net.Addr
27         ID() string
28         ServiceFlag() consensus.ServiceFlag
29         TrySend(byte, interface{}) bool
30 }
31
32 //BasePeerSet is the intergace for connection level peer manager
33 type BasePeerSet interface {
34         AddBannedPeer(string) error
35         StopPeerGracefully(string)
36 }
37
38 // PeerInfo indicate peer status snap
39 type PeerInfo struct {
40         ID         string `json:"peer_id"`
41         RemoteAddr string `json:"remote_addr"`
42         Height     uint64 `json:"height"`
43         Delay      uint32 `json:"delay"`
44 }
45
46 type peer struct {
47         BasePeer
48         mtx         sync.RWMutex
49         services    consensus.ServiceFlag
50         height      uint64
51         hash        *bc.Hash
52         banScore    trust.DynamicBanScore
53         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
54         knownBlocks *set.Set // Set of block hashes known to be known by this peer
55         filterAdds  *set.Set // Set of addresses that the spv node cares about.
56 }
57
58 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
59         return &peer{
60                 BasePeer:    basePeer,
61                 services:    basePeer.ServiceFlag(),
62                 height:      height,
63                 hash:        hash,
64                 knownTxs:    set.New(),
65                 knownBlocks: set.New(),
66                 filterAdds:  set.New(),
67         }
68 }
69
70 func (p *peer) Height() uint64 {
71         p.mtx.RLock()
72         defer p.mtx.RUnlock()
73         return p.height
74 }
75
76 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
77         score := p.banScore.Increase(persistent, transient)
78         if score > defaultBanThreshold {
79                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
80                 return true
81         }
82
83         warnThreshold := defaultBanThreshold >> 1
84         if score > warnThreshold {
85                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
86         }
87         return false
88 }
89
90 func (p *peer) addFilterAddress(address []byte) {
91         p.mtx.Lock()
92         defer p.mtx.Unlock()
93
94         if p.filterAdds.Size() >= maxFilterAddressCount {
95                 log.Warn("the count of filter addresses is greater than limit")
96                 return
97         }
98         if len(address) > maxFilterAddressSize {
99                 log.Warn("the size of filter address is greater than limit")
100                 return
101         }
102         p.filterAdds.Add(hex.EncodeToString(address))
103 }
104
105 func (p *peer) addFilterAddresses(addresses [][]byte) {
106         if !p.filterAdds.IsEmpty() {
107                 p.filterAdds.Clear()
108         }
109         for _, address := range addresses {
110                 p.addFilterAddress(address)
111         }
112 }
113
114 func (p *peer) getBlockByHeight(height uint64) bool {
115         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
116         return p.TrySend(BlockchainChannel, msg)
117 }
118
119 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
120         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
121         return p.TrySend(BlockchainChannel, msg)
122 }
123
124 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
125         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
126         return p.TrySend(BlockchainChannel, msg)
127 }
128
129 func (p *peer) getPeerInfo() *PeerInfo {
130         p.mtx.RLock()
131         defer p.mtx.RUnlock()
132         return &PeerInfo{
133                 ID:         p.ID(),
134                 RemoteAddr: p.Addr().String(),
135                 Height:     p.height,
136         }
137 }
138
139 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
140         var relatedTxs []*types.Tx
141         var relatedStatuses []*bc.TxVerifyResult
142         for i, tx := range txs {
143                 if p.isRelatedTx(tx) {
144                         relatedTxs = append(relatedTxs, tx)
145                         relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
146                 }
147         }
148         return relatedTxs, relatedStatuses
149 }
150
151 func (p *peer) isRelatedTx(tx *types.Tx) bool {
152         for _, input := range tx.Inputs {
153                 switch inp := input.TypedInput.(type) {
154                 case *types.SpendInput:
155                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
156                                 return true
157                         }
158                 }
159         }
160         for _, output := range tx.Outputs {
161                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
162                         return true
163                 }
164         }
165         return false
166 }
167
168 func (p *peer) isSPVNode() bool {
169         return !p.services.IsEnable(consensus.SFFullNode)
170 }
171
172 func (p *peer) markBlock(hash *bc.Hash) {
173         p.mtx.Lock()
174         defer p.mtx.Unlock()
175
176         for p.knownBlocks.Size() >= maxKnownBlocks {
177                 p.knownBlocks.Pop()
178         }
179         p.knownBlocks.Add(hash.String())
180 }
181
182 func (p *peer) markTransaction(hash *bc.Hash) {
183         p.mtx.Lock()
184         defer p.mtx.Unlock()
185
186         for p.knownTxs.Size() >= maxKnownTxs {
187                 p.knownTxs.Pop()
188         }
189         p.knownTxs.Add(hash.String())
190 }
191
192 func (p *peer) sendBlock(block *types.Block) (bool, error) {
193         msg, err := NewBlockMessage(block)
194         if err != nil {
195                 return false, errors.Wrap(err, "fail on NewBlockMessage")
196         }
197
198         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
199         if ok {
200                 blcokHash := block.Hash()
201                 p.knownBlocks.Add(blcokHash.String())
202         }
203         return ok, nil
204 }
205
206 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
207         msg, err := NewBlocksMessage(blocks)
208         if err != nil {
209                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
210         }
211
212         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
213                 return ok, nil
214         }
215
216         for _, block := range blocks {
217                 blcokHash := block.Hash()
218                 p.knownBlocks.Add(blcokHash.String())
219         }
220         return true, nil
221 }
222
223 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
224         msg, err := NewHeadersMessage(headers)
225         if err != nil {
226                 return false, errors.New("fail on NewHeadersMessage")
227         }
228
229         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
230         return ok, nil
231 }
232
233 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
234         msg := NewMerkleBlockMessage()
235         if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
236                 return false, err
237         }
238
239         relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
240
241         txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
242         if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
243                 return false, nil
244         }
245
246         statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
247         if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
248                 return false, nil
249         }
250
251         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
252         return ok, nil
253 }
254
255 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
256         for _, tx := range txs {
257                 if p.isSPVNode() && !p.isRelatedTx(tx) {
258                         continue
259                 }
260                 msg, err := NewTransactionMessage(tx)
261                 if err != nil {
262                         return false, errors.Wrap(err, "failed to tx msg")
263                 }
264
265                 if p.knownTxs.Has(tx.ID.String()) {
266                         continue
267                 }
268                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
269                         return ok, nil
270                 }
271                 p.knownTxs.Add(tx.ID.String())
272         }
273         return true, nil
274 }
275
276 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
277         p.mtx.Lock()
278         defer p.mtx.Unlock()
279         p.height = height
280         p.hash = hash
281 }
282
283 type peerSet struct {
284         BasePeerSet
285         mtx   sync.RWMutex
286         peers map[string]*peer
287 }
288
289 // newPeerSet creates a new peer set to track the active participants.
290 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
291         return &peerSet{
292                 BasePeerSet: basePeerSet,
293                 peers:       make(map[string]*peer),
294         }
295 }
296
297 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
298         ps.mtx.Lock()
299         peer := ps.peers[peerID]
300         ps.mtx.Unlock()
301
302         if peer == nil {
303                 return
304         }
305         if ban := peer.addBanScore(persistent, transient, reason); !ban {
306                 return
307         }
308         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
309                 log.WithField("err", err).Error("fail on add ban peer")
310         }
311         ps.removePeer(peerID)
312 }
313
314 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
315         ps.mtx.Lock()
316         defer ps.mtx.Unlock()
317
318         if _, ok := ps.peers[peer.ID()]; !ok {
319                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
320                 return
321         }
322         log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
323 }
324
325 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
326         ps.mtx.RLock()
327         defer ps.mtx.RUnlock()
328
329         var bestPeer *peer
330         for _, p := range ps.peers {
331                 if !p.services.IsEnable(flag) {
332                         continue
333                 }
334                 if bestPeer == nil || p.height > bestPeer.height {
335                         bestPeer = p
336                 }
337         }
338         return bestPeer
339 }
340
341 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
342         msg, err := NewMinedBlockMessage(block)
343         if err != nil {
344                 return errors.Wrap(err, "fail on broadcast mined block")
345         }
346
347         hash := block.Hash()
348         peers := ps.peersWithoutBlock(&hash)
349         for _, peer := range peers {
350                 if peer.isSPVNode() {
351                         continue
352                 }
353                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
354                         ps.removePeer(peer.ID())
355                         continue
356                 }
357                 peer.markBlock(&hash)
358         }
359         return nil
360 }
361
362 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
363         bestBlockHash := bestBlock.Hash()
364         peers := ps.peersWithoutBlock(&bestBlockHash)
365
366         genesisHash := genesisBlock.Hash()
367         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
368         for _, peer := range peers {
369                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
370                         ps.removePeer(peer.ID())
371                         continue
372                 }
373         }
374         return nil
375 }
376
377 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
378         msg, err := NewTransactionMessage(tx)
379         if err != nil {
380                 return errors.Wrap(err, "fail on broadcast tx")
381         }
382
383         peers := ps.peersWithoutTx(&tx.ID)
384         for _, peer := range peers {
385                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
386                         continue
387                 }
388                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
389                         ps.removePeer(peer.ID())
390                         continue
391                 }
392                 peer.markTransaction(&tx.ID)
393         }
394         return nil
395 }
396
397 func (ps *peerSet) errorHandler(peerID string, err error) {
398         if errors.Root(err) == errPeerMisbehave {
399                 ps.addBanScore(peerID, 20, 0, err.Error())
400         } else {
401                 ps.removePeer(peerID)
402         }
403 }
404
405 // Peer retrieves the registered peer with the given id.
406 func (ps *peerSet) getPeer(id string) *peer {
407         ps.mtx.RLock()
408         defer ps.mtx.RUnlock()
409         return ps.peers[id]
410 }
411
412 func (ps *peerSet) getPeerInfos() []*PeerInfo {
413         ps.mtx.RLock()
414         defer ps.mtx.RUnlock()
415
416         result := []*PeerInfo{}
417         for _, peer := range ps.peers {
418                 result = append(result, peer.getPeerInfo())
419         }
420         return result
421 }
422
423 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
424         ps.mtx.RLock()
425         defer ps.mtx.RUnlock()
426
427         peers := []*peer{}
428         for _, peer := range ps.peers {
429                 if !peer.knownBlocks.Has(hash.String()) {
430                         peers = append(peers, peer)
431                 }
432         }
433         return peers
434 }
435
436 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
437         ps.mtx.RLock()
438         defer ps.mtx.RUnlock()
439
440         peers := []*peer{}
441         for _, peer := range ps.peers {
442                 if !peer.knownTxs.Has(hash.String()) {
443                         peers = append(peers, peer)
444                 }
445         }
446         return peers
447 }
448
449 func (ps *peerSet) removePeer(peerID string) {
450         ps.mtx.Lock()
451         delete(ps.peers, peerID)
452         ps.mtx.Unlock()
453         ps.StopPeerGracefully(peerID)
454 }