OSDN Git Service

Merge pull request #680 from Bytom/fix-print-json-invalid-type-assertion
[bytom/bytom.git] / netsync / peer.go
1 package netsync
2
3 import (
4         "sync"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/fatih/set.v0"
8
9         "github.com/bytom/errors"
10         "github.com/bytom/p2p"
11         "github.com/bytom/p2p/trust"
12         "github.com/bytom/protocol/bc"
13         "github.com/bytom/protocol/bc/types"
14 )
15
16 var (
17         errClosed            = errors.New("peer set is closed")
18         errAlreadyRegistered = errors.New("peer is already registered")
19         errNotRegistered     = errors.New("peer is not registered")
20 )
21
22 const (
23         defaultVersion      = 1
24         defaultBanThreshold = uint64(100)
25 )
26
27 type peer struct {
28         mtx      sync.RWMutex
29         version  int // Protocol version negotiated
30         id       string
31         height   uint64
32         hash     *bc.Hash
33         banScore trust.DynamicBanScore
34
35         swPeer *p2p.Peer
36
37         knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
38         knownBlocks *set.Set // Set of block hashes known to be known by this peer
39 }
40
41 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
42         return &peer{
43                 version:     defaultVersion,
44                 height:      height,
45                 hash:        hash,
46                 swPeer:        Peer,
47                 knownTxs:    set.New(),
48                 knownBlocks: set.New(),
49         }
50 }
51
52 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
53         p.mtx.RLock()
54         defer p.mtx.RUnlock()
55         return p.height, p.hash
56 }
57
58 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
59         p.mtx.Lock()
60         defer p.mtx.Unlock()
61
62         p.height = height
63         p.hash = hash
64 }
65
66 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
67         msg := &BlockRequestMessage{RawHash: hash.Byte32()}
68         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
69         return nil
70 }
71
72 func (p *peer) requestBlockByHeight(height uint64) error {
73         msg := &BlockRequestMessage{Height: height}
74         p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
75         return nil
76 }
77
78 func (p *peer) SendTransactions(txs []*types.Tx) error {
79         for _, tx := range txs {
80                 msg, err := NewTransactionNotifyMessage(tx)
81                 if err != nil {
82                         return errors.New("Failed construction tx msg")
83                 }
84                 hash := &tx.ID
85                 p.knownTxs.Add(hash.String())
86                 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
87         }
88         return nil
89 }
90
91 func (p *peer) getPeer() *p2p.Peer {
92         p.mtx.RLock()
93         defer p.mtx.RUnlock()
94
95         return p.swPeer
96 }
97
98 // MarkTransaction marks a transaction as known for the peer, ensuring that it
99 // will never be propagated to this particular peer.
100 func (p *peer) MarkTransaction(hash *bc.Hash) {
101         p.mtx.Lock()
102         defer p.mtx.Unlock()
103
104         // If we reached the memory allowance, drop a previously known transaction hash
105         for p.knownTxs.Size() >= maxKnownTxs {
106                 p.knownTxs.Pop()
107         }
108         p.knownTxs.Add(hash.String())
109 }
110
111 // MarkBlock marks a block as known for the peer, ensuring that the block will
112 // never be propagated to this particular peer.
113 func (p *peer) MarkBlock(hash *bc.Hash) {
114         p.mtx.Lock()
115         defer p.mtx.Unlock()
116
117         // If we reached the memory allowance, drop a previously known block hash
118         for p.knownBlocks.Size() >= maxKnownBlocks {
119                 p.knownBlocks.Pop()
120         }
121         p.knownBlocks.Add(hash.String())
122 }
123
124 // addBanScore increases the persistent and decaying ban score fields by the
125 // values passed as parameters. If the resulting score exceeds half of the ban
126 // threshold, a warning is logged including the reason provided. Further, if
127 // the score is above the ban threshold, the peer will be banned and
128 // disconnected.
129 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
130         warnThreshold := defaultBanThreshold >> 1
131         if transient == 0 && persistent == 0 {
132                 // The score is not being increased, but a warning message is still
133                 // logged if the score is above the warn threshold.
134                 score := p.banScore.Int()
135                 if score > warnThreshold {
136                         log.Info("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p, reason, score)
137                 }
138                 return false
139         }
140         score := p.banScore.Increase(persistent, transient)
141         if score > warnThreshold {
142                 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p, reason, score)
143                 if score > defaultBanThreshold {
144                         log.Errorf("Misbehaving peer %s -- banning and disconnecting", p)
145                         return true
146                 }
147         }
148         return false
149 }
150
151 type peerSet struct {
152         peers  map[string]*peer
153         lock   sync.RWMutex
154         closed bool
155 }
156
157 // newPeerSet creates a new peer set to track the active participants.
158 func newPeerSet() *peerSet {
159         return &peerSet{
160                 peers: make(map[string]*peer),
161         }
162 }
163
164 // Register injects a new peer into the working set, or returns an error if the
165 // peer is already known.
166 func (ps *peerSet) Register(p *peer) error {
167         ps.lock.Lock()
168         defer ps.lock.Unlock()
169
170         if ps.closed {
171                 return errClosed
172         }
173         if _, ok := ps.peers[p.id]; ok {
174                 return errAlreadyRegistered
175         }
176         ps.peers[p.id] = p
177         return nil
178 }
179
180 // Unregister removes a remote peer from the active set, disabling any further
181 // actions to/from that particular entity.
182 func (ps *peerSet) Unregister(id string) error {
183         ps.lock.Lock()
184         defer ps.lock.Unlock()
185
186         if _, ok := ps.peers[id]; !ok {
187                 return errNotRegistered
188         }
189         delete(ps.peers, id)
190         return nil
191 }
192
193 // Peer retrieves the registered peer with the given id.
194 func (ps *peerSet) Peer(id string) *peer {
195         ps.lock.RLock()
196         defer ps.lock.RUnlock()
197
198         return ps.peers[id]
199 }
200
201 // Len returns if the current number of peers in the set.
202 func (ps *peerSet) Len() int {
203         ps.lock.RLock()
204         defer ps.lock.RUnlock()
205
206         return len(ps.peers)
207 }
208
209 // MarkTransaction marks a transaction as known for the peer, ensuring that it
210 // will never be propagated to this particular peer.
211 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
212         ps.lock.RLock()
213         defer ps.lock.RUnlock()
214
215         if peer, ok := ps.peers[peerID]; ok {
216                 peer.MarkTransaction(hash)
217         }
218 }
219
220 // MarkBlock marks a block as known for the peer, ensuring that the block will
221 // never be propagated to this particular peer.
222 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
223         ps.lock.RLock()
224         defer ps.lock.RUnlock()
225
226         if peer, ok := ps.peers[peerID]; ok {
227                 peer.MarkBlock(hash)
228         }
229 }
230
231 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
232 // their set of known hashes.
233 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
234         ps.lock.RLock()
235         defer ps.lock.RUnlock()
236
237         list := make([]*peer, 0, len(ps.peers))
238         for _, p := range ps.peers {
239                 if !p.knownBlocks.Has(hash.String()) {
240                         list = append(list, p)
241                 }
242         }
243         return list
244 }
245
246 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
247 // in their set of known hashes.
248 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
249         ps.lock.RLock()
250         defer ps.lock.RUnlock()
251
252         list := make([]*peer, 0, len(ps.peers))
253         for _, p := range ps.peers {
254                 if !p.knownTxs.Has(hash.String()) {
255                         list = append(list, p)
256                 }
257         }
258         return list
259 }
260
261 // BestPeer retrieves the known peer with the currently highest total difficulty.
262 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
263         ps.lock.RLock()
264         defer ps.lock.RUnlock()
265
266         var bestPeer *p2p.Peer
267         var bestHeight uint64
268
269         for _, p := range ps.peers {
270                 if bestPeer == nil || p.height > bestHeight {
271                         bestPeer, bestHeight = p.swPeer, p.height
272                 }
273         }
274
275         return bestPeer, bestHeight
276 }
277
278 // Close disconnects all peers.
279 // No new peers can be registered after Close has returned.
280 func (ps *peerSet) Close() {
281         ps.lock.Lock()
282         defer ps.lock.Unlock()
283
284         for _, p := range ps.peers {
285                 p.swPeer.CloseConn()
286         }
287         ps.closed = true
288 }
289
290 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
291         ps.lock.Lock()
292         defer ps.lock.Unlock()
293
294         if _, ok := ps.peers[peer.Key]; !ok {
295                 keeperPeer := newPeer(0, nil, peer)
296                 ps.peers[peer.Key] = keeperPeer
297                 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
298                 return
299         }
300         log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
301 }
302
303 func (ps *peerSet) RemovePeer(peerID string) {
304         ps.lock.Lock()
305         defer ps.lock.Unlock()
306
307         delete(ps.peers, peerID)
308         log.WithField("ID", peerID).Info("Delete peer from peerset")
309 }
310
311 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
312         ps.lock.Lock()
313         defer ps.lock.Unlock()
314
315         if peer, ok := ps.peers[peerID]; ok {
316                 peer.SetStatus(height, hash)
317         }
318 }
319
320 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
321         ps.lock.Lock()
322         defer ps.lock.Unlock()
323
324         peer, ok := ps.peers[peerID]
325         if !ok {
326                 return errors.New("Can't find peer. ")
327         }
328         return peer.requestBlockByHash(hash)
329 }
330
331 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
332         ps.lock.Lock()
333         defer ps.lock.Unlock()
334
335         peer, ok := ps.peers[peerID]
336         if !ok {
337                 return errors.New("Can't find peer. ")
338         }
339         return peer.requestBlockByHeight(height)
340 }
341
342 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
343         msg, err := NewMinedBlockMessage(block)
344         if err != nil {
345                 return errors.New("Failed construction block msg")
346         }
347         hash := block.Hash()
348         peers := ps.PeersWithoutBlock(&hash)
349         for _, peer := range peers {
350                 ps.MarkBlock(peer.swPeer.Key, &hash)
351                 peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
352         }
353         return nil
354 }
355
356 func (ps *peerSet) BroadcastNewStatus(block *types.Block) error {
357         return ps.BroadcastMinedBlock(block)
358 }
359
360 func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
361         msg, err := NewTransactionNotifyMessage(tx)
362         if err != nil {
363                 return errors.New("Failed construction tx msg")
364         }
365         peers := ps.PeersWithoutTx(&tx.ID)
366         for _, peer := range peers {
367                 ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
368                 peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
369         }
370         return nil
371 }