OSDN Git Service

add network access control api (#1118)
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "strconv"
5         "sync"
6
7         log "github.com/sirupsen/logrus"
8         "gopkg.in/fatih/set.v0"
9
10         "github.com/bytom/consensus"
11         "github.com/bytom/errors"
12         "github.com/bytom/p2p"
13         "github.com/bytom/p2p/trust"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/protocol/bc/types"
16 )
17
18 var (
19         errClosed            = errors.New("peer set is closed")
20         errAlreadyRegistered = errors.New("peer is already registered")
21         errNotRegistered     = errors.New("peer is not registered")
22 )
23
24 const (
25         defaultVersion      = 1
26         defaultBanThreshold = uint64(100)
27 )
28
29 type peer struct {
30         mtx      sync.RWMutex
31         version  int // Protocol version negotiated
32         services consensus.ServiceFlag
33         id       string
34         height   uint64
35         hash     *bc.Hash
36         banScore trust.DynamicBanScore
37
38         swPeer *p2p.Peer
39
40         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
41         knownBlocks *set.Set // Set of block hashes known to be known by this peer
42 }
43
44 // PeerInfo indicate peer information
45 type PeerInfo struct {
46         Id         string
47         RemoteAddr string
48         Height     uint64
49         delay      uint32
50 }
51
52 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
53         services := consensus.SFFullNode
54         if len(Peer.Other) != 0 {
55                 if serviceFlag, err := strconv.ParseUint(Peer.Other[0], 10, 64); err != nil {
56                         services = consensus.ServiceFlag(serviceFlag)
57                 }
58         }
59
60         return &peer{
61                 version:     defaultVersion,
62                 services:    services,
63                 id:          Peer.Key,
64                 height:      height,
65                 hash:        hash,
66                 swPeer:      Peer,
67                 knownTxs:    set.New(),
68                 knownBlocks: set.New(),
69         }
70 }
71
72 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
73         p.mtx.RLock()
74         defer p.mtx.RUnlock()
75         return p.height, p.hash
76 }
77
78 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
79         p.mtx.Lock()
80         defer p.mtx.Unlock()
81
82         p.height = height
83         p.hash = hash
84 }
85
86 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
87         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
88         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
89         return nil
90 }
91
92 func (p *peer) requestBlockByHeight(height uint64) error {
93         msg := &BlockRequestMessage{Height: height}
94         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
95         return nil
96 }
97
98 func (p *peer) SendTransactions(txs []*types.Tx) error {
99         for _, tx := range txs {
100                 msg, err := NewTransactionNotifyMessage(tx)
101                 if err != nil {
102                         return errors.New("Failed construction tx msg")
103                 }
104                 hash := &tx.ID
105                 p.knownTxs.Add(hash.String())
106                 if p.swPeer == nil {
107                         return errPeerDropped
108                 }
109                 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
110         }
111         return nil
112 }
113
114 func (p *peer) GetPeer() *p2p.Peer {
115         p.mtx.RLock()
116         defer p.mtx.RUnlock()
117
118         return p.swPeer
119 }
120
121
122 func (p *peer) GetPeerInfo() *PeerInfo {
123         p.mtx.RLock()
124         defer p.mtx.RUnlock()
125         return &PeerInfo{
126                 Id:         p.id,
127                 RemoteAddr: p.swPeer.RemoteAddr,
128                 Height:     p.height,
129                 delay:      0, // TODO
130         }
131 }
132
133 // MarkTransaction marks a transaction as known for the peer, ensuring that it
134 // will never be propagated to this particular peer.
135 func (p *peer) MarkTransaction(hash *bc.Hash) {
136         p.mtx.Lock()
137         defer p.mtx.Unlock()
138
139         // If we reached the memory allowance, drop a previously known transaction hash
140         for p.knownTxs.Size() >= maxKnownTxs {
141                 p.knownTxs.Pop()
142         }
143         p.knownTxs.Add(hash.String())
144 }
145
146 // MarkBlock marks a block as known for the peer, ensuring that the block will
147 // never be propagated to this particular peer.
148 func (p *peer) MarkBlock(hash *bc.Hash) {
149         p.mtx.Lock()
150         defer p.mtx.Unlock()
151
152         // If we reached the memory allowance, drop a previously known block hash
153         for p.knownBlocks.Size() >= maxKnownBlocks {
154                 p.knownBlocks.Pop()
155         }
156         p.knownBlocks.Add(hash.String())
157 }
158
159 // addBanScore increases the persistent and decaying ban score fields by the
160 // values passed as parameters. If the resulting score exceeds half of the ban
161 // threshold, a warning is logged including the reason provided. Further, if
162 // the score is above the ban threshold, the peer will be banned and
163 // disconnected.
164 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
165         warnThreshold := defaultBanThreshold >> 1
166         if transient == 0 && persistent == 0 {
167                 // The score is not being increased, but a warning message is still
168                 // logged if the score is above the warn threshold.
169                 score := p.banScore.Int()
170                 if score > warnThreshold {
171                         log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
172                 }
173                 return false
174         }
175         score := p.banScore.Increase(persistent, transient)
176         if score > warnThreshold {
177                 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
178                 if score > defaultBanThreshold {
179                         log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
180                         return true
181                 }
182         }
183         return false
184 }
185
186 type peerSet struct {
187         peers  map[string]*peer
188         lock   sync.RWMutex
189         closed bool
190 }
191
192 // newPeerSet creates a new peer set to track the active participants.
193 func newPeerSet() *peerSet {
194         return &peerSet{
195                 peers: make(map[string]*peer),
196         }
197 }
198
199 // Register injects a new peer into the working set, or returns an error if the
200 // peer is already known.
201 func (ps *peerSet) Register(p *peer) error {
202         ps.lock.Lock()
203         defer ps.lock.Unlock()
204
205         if ps.closed {
206                 return errClosed
207         }
208         if _, ok := ps.peers[p.id]; ok {
209                 return errAlreadyRegistered
210         }
211         ps.peers[p.id] = p
212         return nil
213 }
214
215 // Unregister removes a remote peer from the active set, disabling any further
216 // actions to/from that particular entity.
217 func (ps *peerSet) Unregister(id string) error {
218         ps.lock.Lock()
219         defer ps.lock.Unlock()
220
221         if _, ok := ps.peers[id]; !ok {
222                 return errNotRegistered
223         }
224         delete(ps.peers, id)
225         return nil
226 }
227
228 // Peer retrieves the registered peer with the given id.
229 func (ps *peerSet) Peer(id string) (*peer, bool) {
230         ps.lock.RLock()
231         defer ps.lock.RUnlock()
232         p, ok := ps.peers[id]
233         return p, ok
234 }
235
236
237 // getPeerInfos return all peer information of current node
238 func (ps *peerSet) GetPeerInfos() []*PeerInfo {
239         var peerInfos []*PeerInfo
240         for _, peer := range ps.peers {
241                 peerInfos = append(peerInfos, peer.GetPeerInfo())
242         }
243         return peerInfos
244 }
245
246 // Len returns if the current number of peers in the set.
247 func (ps *peerSet) Len() int {
248         ps.lock.RLock()
249         defer ps.lock.RUnlock()
250
251         return len(ps.peers)
252 }
253
254 // MarkTransaction marks a transaction as known for the peer, ensuring that it
255 // will never be propagated to this particular peer.
256 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
257         ps.lock.RLock()
258         defer ps.lock.RUnlock()
259
260         if peer, ok := ps.peers[peerID]; ok {
261                 peer.MarkTransaction(hash)
262         }
263 }
264
265 // MarkBlock marks a block as known for the peer, ensuring that the block will
266 // never be propagated to this particular peer.
267 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
268         ps.lock.RLock()
269         defer ps.lock.RUnlock()
270
271         if peer, ok := ps.peers[peerID]; ok {
272                 peer.MarkBlock(hash)
273         }
274 }
275
276 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
277 // their set of known hashes.
278 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
279         ps.lock.RLock()
280         defer ps.lock.RUnlock()
281
282         list := make([]*peer, 0, len(ps.peers))
283         for _, p := range ps.peers {
284                 if !p.knownBlocks.Has(hash.String()) {
285                         list = append(list, p)
286                 }
287         }
288         return list
289 }
290
291 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
292 // in their set of known hashes.
293 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
294         ps.lock.RLock()
295         defer ps.lock.RUnlock()
296
297         list := make([]*peer, 0, len(ps.peers))
298         for _, p := range ps.peers {
299                 if !p.knownTxs.Has(hash.String()) {
300                         list = append(list, p)
301                 }
302         }
303         return list
304 }
305
306 // BestPeer retrieves the known peer with the currently highest total difficulty.
307 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
308         ps.lock.RLock()
309         defer ps.lock.RUnlock()
310
311         var bestPeer *p2p.Peer
312         var bestHeight uint64
313
314         for _, p := range ps.peers {
315                 if bestPeer == nil || p.height > bestHeight {
316                         bestPeer, bestHeight = p.swPeer, p.height
317                 }
318         }
319
320         return bestPeer, bestHeight
321 }
322
323 // Close disconnects all peers.
324 // No new peers can be registered after Close has returned.
325 func (ps *peerSet) Close() {
326         ps.lock.Lock()
327         defer ps.lock.Unlock()
328
329         for _, p := range ps.peers {
330                 p.swPeer.CloseConn()
331         }
332         ps.closed = true
333 }
334
335 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
336         ps.lock.Lock()
337         defer ps.lock.Unlock()
338
339         if _, ok := ps.peers[peer.Key]; !ok {
340                 keeperPeer := newPeer(0, nil, peer)
341                 ps.peers[peer.Key] = keeperPeer
342                 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
343                 return
344         }
345         log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
346 }
347
348 func (ps *peerSet) RemovePeer(peerID string) {
349         ps.lock.Lock()
350         defer ps.lock.Unlock()
351
352         delete(ps.peers, peerID)
353         log.WithField("ID", peerID).Info("Delete peer from peerset")
354 }
355
356 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
357         ps.lock.Lock()
358         defer ps.lock.Unlock()
359
360         if peer, ok := ps.peers[peerID]; ok {
361                 peer.SetStatus(height, hash)
362         }
363 }
364
365 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
366         peer, ok := ps.Peer(peerID)
367         if !ok {
368                 return errors.New("Can't find peer. ")
369         }
370         return peer.requestBlockByHash(hash)
371 }
372
373 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
374         peer, ok := ps.Peer(peerID)
375         if !ok {
376                 return errors.New("Can't find peer. ")
377         }
378         return peer.requestBlockByHeight(height)
379 }
380
381 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
382         msg, err := NewMinedBlockMessage(block)
383         if err != nil {
384                 return nil, errors.New("Failed construction block msg")
385         }
386         hash := block.Hash()
387         peers := ps.PeersWithoutBlock(&hash)
388         abnormalPeers := make([]*peer, 0)
389         for _, peer := range peers {
390                 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
391                         abnormalPeers = append(abnormalPeers, peer)
392                         continue
393                 }
394                 if p, ok := ps.Peer(peer.id); ok {
395                         p.MarkBlock(&hash)
396                 }
397         }
398         return abnormalPeers, nil
399 }
400
401 func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
402         return ps.BroadcastMinedBlock(block)
403 }
404
405 func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
406         msg, err := NewTransactionNotifyMessage(tx)
407         if err != nil {
408                 return nil, errors.New("Failed construction tx msg")
409         }
410         peers := ps.PeersWithoutTx(&tx.ID)
411         abnormalPeers := make([]*peer, 0)
412         for _, peer := range peers {
413                 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
414                         abnormalPeers = append(abnormalPeers, peer)
415                         continue
416                 }
417                 if p, ok := ps.Peer(peer.id); ok {
418                         p.MarkTransaction(&tx.ID)
419                 }
420         }
421         return abnormalPeers, nil
422 }