OSDN Git Service

Use best block for default in /get-hash-rate (#1243)
[bytom/bytom-spv.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "net"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8         "gopkg.in/fatih/set.v0"
9
10         "github.com/bytom/consensus"
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p/trust"
13         "github.com/bytom/protocol/bc"
14         "github.com/bytom/protocol/bc/types"
15 )
16
17 const (
18         maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
19         maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
20         defaultBanThreshold = uint64(100)
21 )
22
23 //BasePeer is the interface for connection level peer
24 type BasePeer interface {
25         Addr() net.Addr
26         ID() string
27         ServiceFlag() consensus.ServiceFlag
28         TrySend(byte, interface{}) bool
29 }
30
31 //BasePeerSet is the intergace for connection level peer manager
32 type BasePeerSet interface {
33         AddBannedPeer(string) error
34         StopPeerGracefully(string)
35 }
36
37 // PeerInfo indicate peer status snap
38 type PeerInfo struct {
39         ID         string `json:"peer_id"`
40         RemoteAddr string `json:"remote_addr"`
41         Height     uint64 `json:"height"`
42         Delay      uint32 `json:"delay"`
43 }
44
45 type peer struct {
46         BasePeer
47         mtx         sync.RWMutex
48         services    consensus.ServiceFlag
49         height      uint64
50         hash        *bc.Hash
51         banScore    trust.DynamicBanScore
52         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
53         knownBlocks *set.Set // Set of block hashes known to be known by this peer
54 }
55
56 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
57         return &peer{
58                 BasePeer:    basePeer,
59                 services:    basePeer.ServiceFlag(),
60                 height:      height,
61                 hash:        hash,
62                 knownTxs:    set.New(),
63                 knownBlocks: set.New(),
64         }
65 }
66
67 func (p *peer) Height() uint64 {
68         p.mtx.RLock()
69         defer p.mtx.RUnlock()
70         return p.height
71 }
72
73 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
74         score := p.banScore.Increase(persistent, transient)
75         if score > defaultBanThreshold {
76                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
77                 return true
78         }
79
80         warnThreshold := defaultBanThreshold >> 1
81         if score > warnThreshold {
82                 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
83         }
84         return false
85 }
86
87 func (p *peer) getBlockByHeight(height uint64) bool {
88         msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
89         return p.TrySend(BlockchainChannel, msg)
90 }
91
92 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
93         msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
94         return p.TrySend(BlockchainChannel, msg)
95 }
96
97 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
98         msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
99         return p.TrySend(BlockchainChannel, msg)
100 }
101
102 func (p *peer) getPeerInfo() *PeerInfo {
103         p.mtx.RLock()
104         defer p.mtx.RUnlock()
105         return &PeerInfo{
106                 ID:         p.ID(),
107                 RemoteAddr: p.Addr().String(),
108                 Height:     p.height,
109         }
110 }
111
112 func (p *peer) markBlock(hash *bc.Hash) {
113         p.mtx.Lock()
114         defer p.mtx.Unlock()
115
116         for p.knownBlocks.Size() >= maxKnownBlocks {
117                 p.knownBlocks.Pop()
118         }
119         p.knownBlocks.Add(hash.String())
120 }
121
122 func (p *peer) markTransaction(hash *bc.Hash) {
123         p.mtx.Lock()
124         defer p.mtx.Unlock()
125
126         for p.knownTxs.Size() >= maxKnownTxs {
127                 p.knownTxs.Pop()
128         }
129         p.knownTxs.Add(hash.String())
130 }
131
132 func (p *peer) sendBlock(block *types.Block) (bool, error) {
133         msg, err := NewBlockMessage(block)
134         if err != nil {
135                 return false, errors.Wrap(err, "fail on NewBlockMessage")
136         }
137
138         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
139         if ok {
140                 blcokHash := block.Hash()
141                 p.knownBlocks.Add(blcokHash.String())
142         }
143         return ok, nil
144 }
145
146 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
147         msg, err := NewBlocksMessage(blocks)
148         if err != nil {
149                 return false, errors.Wrap(err, "fail on NewBlocksMessage")
150         }
151
152         if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
153                 return ok, nil
154         }
155
156         for _, block := range blocks {
157                 blcokHash := block.Hash()
158                 p.knownBlocks.Add(blcokHash.String())
159         }
160         return true, nil
161 }
162
163 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
164         msg, err := NewHeadersMessage(headers)
165         if err != nil {
166                 return false, errors.New("fail on NewHeadersMessage")
167         }
168
169         ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
170         return ok, nil
171 }
172
173 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
174         for _, tx := range txs {
175                 msg, err := NewTransactionMessage(tx)
176                 if err != nil {
177                         return false, errors.Wrap(err, "failed to tx msg")
178                 }
179
180                 if p.knownTxs.Has(tx.ID.String()) {
181                         continue
182                 }
183                 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
184                         return ok, nil
185                 }
186                 p.knownTxs.Add(tx.ID.String())
187         }
188         return true, nil
189 }
190
191 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
192         p.mtx.Lock()
193         defer p.mtx.Unlock()
194         p.height = height
195         p.hash = hash
196 }
197
198 type peerSet struct {
199         BasePeerSet
200         mtx   sync.RWMutex
201         peers map[string]*peer
202 }
203
204 // newPeerSet creates a new peer set to track the active participants.
205 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
206         return &peerSet{
207                 BasePeerSet: basePeerSet,
208                 peers:       make(map[string]*peer),
209         }
210 }
211
212 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
213         ps.mtx.Lock()
214         peer := ps.peers[peerID]
215         ps.mtx.Unlock()
216
217         if peer == nil {
218                 return
219         }
220         if ban := peer.addBanScore(persistent, transient, reason); !ban {
221                 return
222         }
223         if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
224                 log.WithField("err", err).Error("fail on add ban peer")
225         }
226         ps.removePeer(peerID)
227 }
228
229 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
230         ps.mtx.Lock()
231         defer ps.mtx.Unlock()
232
233         if _, ok := ps.peers[peer.ID()]; !ok {
234                 ps.peers[peer.ID()] = newPeer(height, hash, peer)
235                 return
236         }
237         log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
238 }
239
240 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
241         ps.mtx.RLock()
242         defer ps.mtx.RUnlock()
243
244         var bestPeer *peer
245         for _, p := range ps.peers {
246                 if !p.services.IsEnable(flag) {
247                         continue
248                 }
249                 if bestPeer == nil || p.height > bestPeer.height {
250                         bestPeer = p
251                 }
252         }
253         return bestPeer
254 }
255
256 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
257         msg, err := NewMinedBlockMessage(block)
258         if err != nil {
259                 return errors.Wrap(err, "fail on broadcast mined block")
260         }
261
262         hash := block.Hash()
263         peers := ps.peersWithoutBlock(&hash)
264         for _, peer := range peers {
265                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
266                         ps.removePeer(peer.ID())
267                         continue
268                 }
269                 peer.markBlock(&hash)
270         }
271         return nil
272 }
273
274 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
275         genesisHash := genesisBlock.Hash()
276         msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
277         for _, peer := range ps.peers {
278                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
279                         ps.removePeer(peer.ID())
280                         continue
281                 }
282         }
283         return nil
284 }
285
286 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
287         msg, err := NewTransactionMessage(tx)
288         if err != nil {
289                 return errors.Wrap(err, "fail on broadcast tx")
290         }
291
292         peers := ps.peersWithoutTx(&tx.ID)
293         for _, peer := range peers {
294                 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
295                         ps.removePeer(peer.ID())
296                         continue
297                 }
298                 peer.markTransaction(&tx.ID)
299         }
300         return nil
301 }
302
303 func (ps *peerSet) errorHandler(peerID string, err error) {
304         if errors.Root(err) == errPeerMisbehave {
305                 ps.addBanScore(peerID, 20, 0, err.Error())
306         } else {
307                 ps.removePeer(peerID)
308         }
309 }
310
311 // Peer retrieves the registered peer with the given id.
312 func (ps *peerSet) getPeer(id string) *peer {
313         ps.mtx.RLock()
314         defer ps.mtx.RUnlock()
315         return ps.peers[id]
316 }
317
318 func (ps *peerSet) getPeerInfos() []*PeerInfo {
319         ps.mtx.RLock()
320         defer ps.mtx.RUnlock()
321
322         result := []*PeerInfo{}
323         for _, peer := range ps.peers {
324                 result = append(result, peer.getPeerInfo())
325         }
326         return result
327 }
328
329 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
330         ps.mtx.RLock()
331         defer ps.mtx.RUnlock()
332
333         peers := []*peer{}
334         for _, peer := range ps.peers {
335                 if !peer.knownBlocks.Has(hash.String()) {
336                         peers = append(peers, peer)
337                 }
338         }
339         return peers
340 }
341
342 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
343         ps.mtx.RLock()
344         defer ps.mtx.RUnlock()
345
346         peers := []*peer{}
347         for _, peer := range ps.peers {
348                 if !peer.knownTxs.Has(hash.String()) {
349                         peers = append(peers, peer)
350                 }
351         }
352         return peers
353 }
354
355 func (ps *peerSet) removePeer(peerID string) {
356         ps.mtx.Lock()
357         delete(ps.peers, peerID)
358         ps.mtx.Unlock()
359         ps.StopPeerGracefully(peerID)
360 }