OSDN Git Service

Merge pull request #1260 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "net"
5         "sync"
6         "encoding/hex"
7
8         log "github.com/sirupsen/logrus"
9         "gopkg.in/fatih/set.v0"
10
11         "github.com/bytom/consensus"
12         "github.com/bytom/errors"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 const (
19         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
20         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
21         defaultBanThreshold = uint64(100)
22 )
23
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
26         Addr() net.Addr
27         ID() string
28         ServiceFlag() consensus.ServiceFlag
29         TrySend(byte, interface{}) bool
30 }
31
32 //BasePeerSet is the intergace for connection level peer manager
33 type BasePeerSet interface {
34         AddBannedPeer(string) error
35         StopPeerGracefully(string)
36 }
37
38 // PeerInfo indicate peer status snap
39 type PeerInfo struct {
40         ID         string `json:"peer_id"`
41         RemoteAddr string `json:"remote_addr"`
42         Height     uint64 `json:"height"`
43         Delay      uint32 `json:"delay"`
44 }
45
46 type peer struct {
47         BasePeer
48         mtx         sync.RWMutex
49         services    consensus.ServiceFlag
50         height      uint64
51         hash        *bc.Hash
52         banScore    trust.DynamicBanScore
53         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
54         knownBlocks *set.Set // Set of block hashes known to be known by this peer
55         filterAdds  *set.Set // Set of addresses that the spv node cares about.
56 }
57
58 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
59         return &peer{
60                 BasePeer:    basePeer,
61                 services:    basePeer.ServiceFlag(),
62                 height:      height,
63                 hash:        hash,
64                 knownTxs:    set.New(),
65                 knownBlocks: set.New(),
66                 filterAdds:  set.New(),
67         }
68 }
69
70 func (p *peer) Height() uint64 {
71         p.mtx.RLock()
72         defer p.mtx.RUnlock()
73         return p.height
74 }
75
76 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
77         score := p.banScore.Increase(persistent, transient)
78         if score > defaultBanThreshold {
79                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
80                 return true
81         }
82
83         warnThreshold := defaultBanThreshold >> 1
84         if score > warnThreshold {
85                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
86         }
87         return false
88 }
89
90 func (p *peer) addFilterAddresses(addresses [][]byte) {
91         p.mtx.Lock()
92         defer p.mtx.Unlock()
93         
94         if (!p.filterAdds.IsEmpty()) {
95                 p.filterAdds.Clear()
96         }
97         for _, address := range addresses {
98                 p.filterAdds.Add(hex.EncodeToString(address))
99         }
100 }
101
102 func (p *peer) getBlockByHeight(height uint64) bool {
103         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
104         return p.TrySend(BlockchainChannel, msg)
105 }
106
107 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
108         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
109         return p.TrySend(BlockchainChannel, msg)
110 }
111
112 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
113         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
114         return p.TrySend(BlockchainChannel, msg)
115 }
116
117 func (p *peer) getPeerInfo() *PeerInfo {
118         p.mtx.RLock()
119         defer p.mtx.RUnlock()
120         return &PeerInfo{
121                 ID:         p.ID(),
122                 RemoteAddr: p.Addr().String(),
123                 Height:     p.height,
124         }
125 }
126
127 func (p *peer) isRelatedTx(tx *types.Tx) bool {
128         for _, input := range(tx.Inputs) {
129                 switch inp := input.TypedInput.(type) {
130                 case *types.SpendInput:
131                         if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
132                                 return true
133                         }
134                 }
135         }
136         for _, output := range(tx.Outputs) {
137                 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
138                         return true
139                 }
140         }
141         return false
142 }
143
144 func (p *peer) isSPVNode() bool {
145         return !p.services.IsEnable(consensus.SFFullNode)
146 }
147
148 func (p *peer) markBlock(hash *bc.Hash) {
149         p.mtx.Lock()
150         defer p.mtx.Unlock()
151
152         for p.knownBlocks.Size() >= maxKnownBlocks {
153                 p.knownBlocks.Pop()
154         }
155         p.knownBlocks.Add(hash.String())
156 }
157
158 func (p *peer) markTransaction(hash *bc.Hash) {
159         p.mtx.Lock()
160         defer p.mtx.Unlock()
161
162         for p.knownTxs.Size() >= maxKnownTxs {
163                 p.knownTxs.Pop()
164         }
165         p.knownTxs.Add(hash.String())
166 }
167
168 func (p *peer) sendBlock(block *types.Block) (bool, error) {
169         msg, err := NewBlockMessage(block)
170         if err != nil {
171                 return false, errors.Wrap(err, "fail on NewBlockMessage")
172         }
173
174         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
175         if ok {
176                 blcokHash := block.Hash()
177                 p.knownBlocks.Add(blcokHash.String())
178         }
179         return ok, nil
180 }
181
182 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
183         msg, err := NewBlocksMessage(blocks)
184         if err != nil {
185                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
186         }
187
188         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
189                 return ok, nil
190         }
191
192         for _, block := range blocks {
193                 blcokHash := block.Hash()
194                 p.knownBlocks.Add(blcokHash.String())
195         }
196         return true, nil
197 }
198
199 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
200         msg, err := NewHeadersMessage(headers)
201         if err != nil {
202                 return false, errors.New("fail on NewHeadersMessage")
203         }
204
205         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
206         return ok, nil
207 }
208
209 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
210         for _, tx := range txs {
211                 if p.isSPVNode() && !p.isRelatedTx(tx) {
212                         continue
213                 }
214                 msg, err := NewTransactionMessage(tx)
215                 if err != nil {
216                         return false, errors.Wrap(err, "failed to tx msg")
217                 }
218
219                 if p.knownTxs.Has(tx.ID.String()) {
220                         continue
221                 }
222                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
223                         return ok, nil
224                 }
225                 p.knownTxs.Add(tx.ID.String())
226         }
227         return true, nil
228 }
229
230 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
231         p.mtx.Lock()
232         defer p.mtx.Unlock()
233         p.height = height
234         p.hash = hash
235 }
236
237 type peerSet struct {
238         BasePeerSet
239         mtx   sync.RWMutex
240         peers map[string]*peer
241 }
242
243 // newPeerSet creates a new peer set to track the active participants.
244 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
245         return &peerSet{
246                 BasePeerSet: basePeerSet,
247                 peers:       make(map[string]*peer),
248         }
249 }
250
251 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
252         ps.mtx.Lock()
253         peer := ps.peers[peerID]
254         ps.mtx.Unlock()
255
256         if peer == nil {
257                 return
258         }
259         if ban := peer.addBanScore(persistent, transient, reason); !ban {
260                 return
261         }
262         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
263                 log.WithField("err", err).Error("fail on add ban peer")
264         }
265         ps.removePeer(peerID)
266 }
267
268 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
269         ps.mtx.Lock()
270         defer ps.mtx.Unlock()
271
272         if _, ok := ps.peers[peer.ID()]; !ok {
273                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
274                 return
275         }
276         log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
277 }
278
279 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
280         ps.mtx.RLock()
281         defer ps.mtx.RUnlock()
282
283         var bestPeer *peer
284         for _, p := range ps.peers {
285                 if !p.services.IsEnable(flag) {
286                         continue
287                 }
288                 if bestPeer == nil || p.height > bestPeer.height {
289                         bestPeer = p
290                 }
291         }
292         return bestPeer
293 }
294
295 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
296         msg, err := NewMinedBlockMessage(block)
297         if err != nil {
298                 return errors.Wrap(err, "fail on broadcast mined block")
299         }
300
301         hash := block.Hash()
302         peers := ps.peersWithoutBlock(&hash)
303         for _, peer := range peers {
304                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
305                         ps.removePeer(peer.ID())
306                         continue
307                 }
308                 peer.markBlock(&hash)
309         }
310         return nil
311 }
312
313 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
314         genesisHash := genesisBlock.Hash()
315         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
316         for _, peer := range ps.peers {
317                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
318                         ps.removePeer(peer.ID())
319                         continue
320                 }
321         }
322         return nil
323 }
324
325 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
326         msg, err := NewTransactionMessage(tx)
327         if err != nil {
328                 return errors.Wrap(err, "fail on broadcast tx")
329         }
330
331         peers := ps.peersWithoutTx(&tx.ID)
332         for _, peer := range peers {
333                 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
334                         continue
335                 }
336                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
337                         ps.removePeer(peer.ID())
338                         continue
339                 }
340                 peer.markTransaction(&tx.ID)
341         }
342         return nil
343 }
344
345 func (ps *peerSet) errorHandler(peerID string, err error) {
346         if errors.Root(err) == errPeerMisbehave {
347                 ps.addBanScore(peerID, 20, 0, err.Error())
348         } else {
349                 ps.removePeer(peerID)
350         }
351 }
352
353 // Peer retrieves the registered peer with the given id.
354 func (ps *peerSet) getPeer(id string) *peer {
355         ps.mtx.RLock()
356         defer ps.mtx.RUnlock()
357         return ps.peers[id]
358 }
359
360 func (ps *peerSet) getPeerInfos() []*PeerInfo {
361         ps.mtx.RLock()
362         defer ps.mtx.RUnlock()
363
364         result := []*PeerInfo{}
365         for _, peer := range ps.peers {
366                 result = append(result, peer.getPeerInfo())
367         }
368         return result
369 }
370
371 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
372         ps.mtx.RLock()
373         defer ps.mtx.RUnlock()
374
375         peers := []*peer{}
376         for _, peer := range ps.peers {
377                 if !peer.knownBlocks.Has(hash.String()) {
378                         peers = append(peers, peer)
379                 }
380         }
381         return peers
382 }
383
384 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
385         ps.mtx.RLock()
386         defer ps.mtx.RUnlock()
387
388         peers := []*peer{}
389         for _, peer := range ps.peers {
390                 if !peer.knownTxs.Has(hash.String()) {
391                         peers = append(peers, peer)
392                 }
393         }
394         return peers
395 }
396
397 func (ps *peerSet) removePeer(peerID string) {
398         ps.mtx.Lock()
399         delete(ps.peers, peerID)
400         ps.mtx.Unlock()
401         ps.StopPeerGracefully(peerID)
402 }