6 log "github.com/sirupsen/logrus"
7 "gopkg.in/fatih/set.v0"
9 "github.com/bytom/errors"
10 "github.com/bytom/p2p"
11 "github.com/bytom/p2p/trust"
12 "github.com/bytom/protocol/bc"
13 "github.com/bytom/protocol/bc/types"
17 errClosed = errors.New("peer set is closed")
18 errAlreadyRegistered = errors.New("peer is already registered")
19 errNotRegistered = errors.New("peer is not registered")
24 defaultBanThreshold = uint64(100)
29 version int // Protocol version negotiated
33 banScore trust.DynamicBanScore
37 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
38 knownBlocks *set.Set // Set of block hashes known to be known by this peer
41 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
43 version: defaultVersion,
48 knownBlocks: set.New(),
52 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
55 return p.height, p.hash
58 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
66 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
67 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
68 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
72 func (p *peer) requestBlockByHeight(height uint64) error {
73 msg := &BlockRequestMessage{Height: height}
74 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
78 func (p *peer) SendTransactions(txs []*types.Tx) error {
79 for _, tx := range txs {
80 msg, err := NewTransactionNotifyMessage(tx)
82 return errors.New("Failed construction tx msg")
85 p.knownTxs.Add(hash.String())
86 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
91 func (p *peer) getPeer() *p2p.Peer {
98 // MarkTransaction marks a transaction as known for the peer, ensuring that it
99 // will never be propagated to this particular peer.
100 func (p *peer) MarkTransaction(hash *bc.Hash) {
104 // If we reached the memory allowance, drop a previously known transaction hash
105 for p.knownTxs.Size() >= maxKnownTxs {
108 p.knownTxs.Add(hash.String())
111 // MarkBlock marks a block as known for the peer, ensuring that the block will
112 // never be propagated to this particular peer.
113 func (p *peer) MarkBlock(hash *bc.Hash) {
117 // If we reached the memory allowance, drop a previously known block hash
118 for p.knownBlocks.Size() >= maxKnownBlocks {
121 p.knownBlocks.Add(hash.String())
124 // addBanScore increases the persistent and decaying ban score fields by the
125 // values passed as parameters. If the resulting score exceeds half of the ban
126 // threshold, a warning is logged including the reason provided. Further, if
127 // the score is above the ban threshold, the peer will be banned and
129 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
130 warnThreshold := defaultBanThreshold >> 1
131 if transient == 0 && persistent == 0 {
132 // The score is not being increased, but a warning message is still
133 // logged if the score is above the warn threshold.
134 score := p.banScore.Int()
135 if score > warnThreshold {
136 log.Info("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p, reason, score)
140 score := p.banScore.Increase(persistent, transient)
141 if score > warnThreshold {
142 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p, reason, score)
143 if score > defaultBanThreshold {
144 log.Errorf("Misbehaving peer %s -- banning and disconnecting", p)
151 type peerSet struct {
152 peers map[string]*peer
157 // newPeerSet creates a new peer set to track the active participants.
158 func newPeerSet() *peerSet {
160 peers: make(map[string]*peer),
164 // Register injects a new peer into the working set, or returns an error if the
165 // peer is already known.
166 func (ps *peerSet) Register(p *peer) error {
168 defer ps.lock.Unlock()
173 if _, ok := ps.peers[p.id]; ok {
174 return errAlreadyRegistered
180 // Unregister removes a remote peer from the active set, disabling any further
181 // actions to/from that particular entity.
182 func (ps *peerSet) Unregister(id string) error {
184 defer ps.lock.Unlock()
186 if _, ok := ps.peers[id]; !ok {
187 return errNotRegistered
193 // Peer retrieves the registered peer with the given id.
194 func (ps *peerSet) Peer(id string) *peer {
196 defer ps.lock.RUnlock()
201 // Len returns if the current number of peers in the set.
202 func (ps *peerSet) Len() int {
204 defer ps.lock.RUnlock()
209 // MarkTransaction marks a transaction as known for the peer, ensuring that it
210 // will never be propagated to this particular peer.
211 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
213 defer ps.lock.RUnlock()
215 if peer, ok := ps.peers[peerID]; ok {
216 peer.MarkTransaction(hash)
220 // MarkBlock marks a block as known for the peer, ensuring that the block will
221 // never be propagated to this particular peer.
222 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
224 defer ps.lock.RUnlock()
226 if peer, ok := ps.peers[peerID]; ok {
231 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
232 // their set of known hashes.
233 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
235 defer ps.lock.RUnlock()
237 list := make([]*peer, 0, len(ps.peers))
238 for _, p := range ps.peers {
239 if !p.knownBlocks.Has(hash.String()) {
240 list = append(list, p)
246 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
247 // in their set of known hashes.
248 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
250 defer ps.lock.RUnlock()
252 list := make([]*peer, 0, len(ps.peers))
253 for _, p := range ps.peers {
254 if !p.knownTxs.Has(hash.String()) {
255 list = append(list, p)
261 // BestPeer retrieves the known peer with the currently highest total difficulty.
262 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
264 defer ps.lock.RUnlock()
266 var bestPeer *p2p.Peer
267 var bestHeight uint64
269 for _, p := range ps.peers {
270 if bestPeer == nil || p.height > bestHeight {
271 bestPeer, bestHeight = p.swPeer, p.height
275 return bestPeer, bestHeight
278 // Close disconnects all peers.
279 // No new peers can be registered after Close has returned.
280 func (ps *peerSet) Close() {
282 defer ps.lock.Unlock()
284 for _, p := range ps.peers {
290 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
292 defer ps.lock.Unlock()
294 if _, ok := ps.peers[peer.Key]; !ok {
295 keeperPeer := newPeer(0, nil, peer)
296 ps.peers[peer.Key] = keeperPeer
297 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
300 log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
303 func (ps *peerSet) RemovePeer(peerID string) {
305 defer ps.lock.Unlock()
307 delete(ps.peers, peerID)
308 log.WithField("ID", peerID).Info("Delete peer from peerset")
311 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
313 defer ps.lock.Unlock()
315 if peer, ok := ps.peers[peerID]; ok {
316 peer.SetStatus(height, hash)
320 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
322 defer ps.lock.Unlock()
324 peer, ok := ps.peers[peerID]
326 return errors.New("Can't find peer. ")
328 return peer.requestBlockByHash(hash)
331 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
333 defer ps.lock.Unlock()
335 peer, ok := ps.peers[peerID]
337 return errors.New("Can't find peer. ")
339 return peer.requestBlockByHeight(height)
342 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
343 msg, err := NewMinedBlockMessage(block)
345 return errors.New("Failed construction block msg")
348 peers := ps.PeersWithoutBlock(&hash)
349 for _, peer := range peers {
350 ps.MarkBlock(peer.swPeer.Key, &hash)
351 peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
356 func (ps *peerSet) BroadcastNewStatus(block *types.Block) error {
357 return ps.BroadcastMinedBlock(block)
360 func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
361 msg, err := NewTransactionNotifyMessage(tx)
363 return errors.New("Failed construction tx msg")
365 peers := ps.PeersWithoutTx(&tx.ID)
366 for _, peer := range peers {
367 ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
368 peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})