OSDN Git Service

Merge pull request #739 from Bytom/p2p_test
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/fatih/set.v0"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/p2p"
11         "github.com/bytom/p2p/trust"
12         "github.com/bytom/protocol/bc"
13         "github.com/bytom/protocol/bc/types"
14 )
15
16 var (
17         errClosed            = errors.New("peer set is closed")
18         errAlreadyRegistered = errors.New("peer is already registered")
19         errNotRegistered     = errors.New("peer is not registered")
20 )
21
22 const (
23         defaultVersion      = 1
24         defaultBanThreshold = uint64(100)
25 )
26
27 type peer struct {
28         mtx      sync.RWMutex
29         version  int // Protocol version negotiated
30         id       string
31         height   uint64
32         hash     *bc.Hash
33         banScore trust.DynamicBanScore
34
35         swPeer *p2p.Peer
36
37         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
38         knownBlocks *set.Set // Set of block hashes known to be known by this peer
39 }
40
41 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
42         return &peer{
43                 version:     defaultVersion,
44                 id:          Peer.Key,
45                 height:      height,
46                 hash:        hash,
47                 swPeer:      Peer,
48                 knownTxs:    set.New(),
49                 knownBlocks: set.New(),
50         }
51 }
52
53 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
54         p.mtx.RLock()
55         defer p.mtx.RUnlock()
56         return p.height, p.hash
57 }
58
59 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
60         p.mtx.Lock()
61         defer p.mtx.Unlock()
62
63         p.height = height
64         p.hash = hash
65 }
66
67 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
68         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
69         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
70         return nil
71 }
72
73 func (p *peer) requestBlockByHeight(height uint64) error {
74         msg := &BlockRequestMessage{Height: height}
75         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
76         return nil
77 }
78
79 func (p *peer) SendTransactions(txs []*types.Tx) error {
80         for _, tx := range txs {
81                 msg, err := NewTransactionNotifyMessage(tx)
82                 if err != nil {
83                         return errors.New("Failed construction tx msg")
84                 }
85                 hash := &tx.ID
86                 p.knownTxs.Add(hash.String())
87                 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
88         }
89         return nil
90 }
91
92 func (p *peer) getPeer() *p2p.Peer {
93         p.mtx.RLock()
94         defer p.mtx.RUnlock()
95
96         return p.swPeer
97 }
98
99 // MarkTransaction marks a transaction as known for the peer, ensuring that it
100 // will never be propagated to this particular peer.
101 func (p *peer) MarkTransaction(hash *bc.Hash) {
102         p.mtx.Lock()
103         defer p.mtx.Unlock()
104
105         // If we reached the memory allowance, drop a previously known transaction hash
106         for p.knownTxs.Size() >= maxKnownTxs {
107                 p.knownTxs.Pop()
108         }
109         p.knownTxs.Add(hash.String())
110 }
111
112 // MarkBlock marks a block as known for the peer, ensuring that the block will
113 // never be propagated to this particular peer.
114 func (p *peer) MarkBlock(hash *bc.Hash) {
115         p.mtx.Lock()
116         defer p.mtx.Unlock()
117
118         // If we reached the memory allowance, drop a previously known block hash
119         for p.knownBlocks.Size() >= maxKnownBlocks {
120                 p.knownBlocks.Pop()
121         }
122         p.knownBlocks.Add(hash.String())
123 }
124
125 // addBanScore increases the persistent and decaying ban score fields by the
126 // values passed as parameters. If the resulting score exceeds half of the ban
127 // threshold, a warning is logged including the reason provided. Further, if
128 // the score is above the ban threshold, the peer will be banned and
129 // disconnected.
130 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
131         warnThreshold := defaultBanThreshold >> 1
132         if transient == 0 && persistent == 0 {
133                 // The score is not being increased, but a warning message is still
134                 // logged if the score is above the warn threshold.
135                 score := p.banScore.Int()
136                 if score > warnThreshold {
137                         log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
138                 }
139                 return false
140         }
141         score := p.banScore.Increase(persistent, transient)
142         if score > warnThreshold {
143                 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
144                 if score > defaultBanThreshold {
145                         log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
146                         return true
147                 }
148         }
149         return false
150 }
151
152 type peerSet struct {
153         peers  map[string]*peer
154         lock   sync.RWMutex
155         closed bool
156 }
157
158 // newPeerSet creates a new peer set to track the active participants.
159 func newPeerSet() *peerSet {
160         return &peerSet{
161                 peers: make(map[string]*peer),
162         }
163 }
164
165 // Register injects a new peer into the working set, or returns an error if the
166 // peer is already known.
167 func (ps *peerSet) Register(p *peer) error {
168         ps.lock.Lock()
169         defer ps.lock.Unlock()
170
171         if ps.closed {
172                 return errClosed
173         }
174         if _, ok := ps.peers[p.id]; ok {
175                 return errAlreadyRegistered
176         }
177         ps.peers[p.id] = p
178         return nil
179 }
180
181 // Unregister removes a remote peer from the active set, disabling any further
182 // actions to/from that particular entity.
183 func (ps *peerSet) Unregister(id string) error {
184         ps.lock.Lock()
185         defer ps.lock.Unlock()
186
187         if _, ok := ps.peers[id]; !ok {
188                 return errNotRegistered
189         }
190         delete(ps.peers, id)
191         return nil
192 }
193
194 // Peer retrieves the registered peer with the given id.
195 func (ps *peerSet) Peer(id string) *peer {
196         ps.lock.RLock()
197         defer ps.lock.RUnlock()
198
199         return ps.peers[id]
200 }
201
202 // Len returns if the current number of peers in the set.
203 func (ps *peerSet) Len() int {
204         ps.lock.RLock()
205         defer ps.lock.RUnlock()
206
207         return len(ps.peers)
208 }
209
210 // MarkTransaction marks a transaction as known for the peer, ensuring that it
211 // will never be propagated to this particular peer.
212 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
213         ps.lock.RLock()
214         defer ps.lock.RUnlock()
215
216         if peer, ok := ps.peers[peerID]; ok {
217                 peer.MarkTransaction(hash)
218         }
219 }
220
221 // MarkBlock marks a block as known for the peer, ensuring that the block will
222 // never be propagated to this particular peer.
223 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
224         ps.lock.RLock()
225         defer ps.lock.RUnlock()
226
227         if peer, ok := ps.peers[peerID]; ok {
228                 peer.MarkBlock(hash)
229         }
230 }
231
232 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
233 // their set of known hashes.
234 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
235         ps.lock.RLock()
236         defer ps.lock.RUnlock()
237
238         list := make([]*peer, 0, len(ps.peers))
239         for _, p := range ps.peers {
240                 if !p.knownBlocks.Has(hash.String()) {
241                         list = append(list, p)
242                 }
243         }
244         return list
245 }
246
247 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
248 // in their set of known hashes.
249 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
250         ps.lock.RLock()
251         defer ps.lock.RUnlock()
252
253         list := make([]*peer, 0, len(ps.peers))
254         for _, p := range ps.peers {
255                 if !p.knownTxs.Has(hash.String()) {
256                         list = append(list, p)
257                 }
258         }
259         return list
260 }
261
262 // BestPeer retrieves the known peer with the currently highest total difficulty.
263 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
264         ps.lock.RLock()
265         defer ps.lock.RUnlock()
266
267         var bestPeer *p2p.Peer
268         var bestHeight uint64
269
270         for _, p := range ps.peers {
271                 if bestPeer == nil || p.height > bestHeight {
272                         bestPeer, bestHeight = p.swPeer, p.height
273                 }
274         }
275
276         return bestPeer, bestHeight
277 }
278
279 // Close disconnects all peers.
280 // No new peers can be registered after Close has returned.
281 func (ps *peerSet) Close() {
282         ps.lock.Lock()
283         defer ps.lock.Unlock()
284
285         for _, p := range ps.peers {
286                 p.swPeer.CloseConn()
287         }
288         ps.closed = true
289 }
290
291 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
292         ps.lock.Lock()
293         defer ps.lock.Unlock()
294
295         if _, ok := ps.peers[peer.Key]; !ok {
296                 keeperPeer := newPeer(0, nil, peer)
297                 ps.peers[peer.Key] = keeperPeer
298                 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
299                 return
300         }
301         log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
302 }
303
304 func (ps *peerSet) RemovePeer(peerID string) {
305         ps.lock.Lock()
306         defer ps.lock.Unlock()
307
308         delete(ps.peers, peerID)
309         log.WithField("ID", peerID).Info("Delete peer from peerset")
310 }
311
312 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
313         ps.lock.Lock()
314         defer ps.lock.Unlock()
315
316         if peer, ok := ps.peers[peerID]; ok {
317                 peer.SetStatus(height, hash)
318         }
319 }
320
321 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
322         ps.lock.Lock()
323         defer ps.lock.Unlock()
324
325         peer, ok := ps.peers[peerID]
326         if !ok {
327                 return errors.New("Can't find peer. ")
328         }
329         return peer.requestBlockByHash(hash)
330 }
331
332 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
333         ps.lock.Lock()
334         defer ps.lock.Unlock()
335
336         peer, ok := ps.peers[peerID]
337         if !ok {
338                 return errors.New("Can't find peer. ")
339         }
340         return peer.requestBlockByHeight(height)
341 }
342
343 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
344         msg, err := NewMinedBlockMessage(block)
345         if err != nil {
346                 return nil, errors.New("Failed construction block msg")
347         }
348         hash := block.Hash()
349         peers := ps.PeersWithoutBlock(&hash)
350         abnormalPeers := make([]*peer, 0)
351         for _, peer := range peers {
352                 if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
353                         abnormalPeers = append(abnormalPeers, peer)
354                         continue
355                 }
356                 ps.MarkBlock(peer.swPeer.Key, &hash)
357         }
358         return abnormalPeers, nil
359 }
360
361 func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
362         return ps.BroadcastMinedBlock(block)
363 }
364
365 func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
366         msg, err := NewTransactionNotifyMessage(tx)
367         if err != nil {
368                 return nil, errors.New("Failed construction tx msg")
369         }
370         peers := ps.PeersWithoutTx(&tx.ID)
371         abnormalPeers := make([]*peer, 0)
372         for _, peer := range peers {
373                 if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
374                         abnormalPeers = append(abnormalPeers, peer)
375                         continue
376                 }
377                 ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
378         }
379         return abnormalPeers, nil
380 }