7 log "github.com/sirupsen/logrus"
8 "gopkg.in/fatih/set.v0"
10 "github.com/bytom/consensus"
11 "github.com/bytom/errors"
12 "github.com/bytom/p2p"
13 "github.com/bytom/p2p/trust"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/protocol/bc/types"
19 errClosed = errors.New("peer set is closed")
20 errAlreadyRegistered = errors.New("peer is already registered")
21 errNotRegistered = errors.New("peer is not registered")
26 defaultBanThreshold = uint64(100)
31 version int // Protocol version negotiated
32 services consensus.ServiceFlag
36 banScore trust.DynamicBanScore
40 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
41 knownBlocks *set.Set // Set of block hashes known to be known by this peer
44 // PeerInfo indicate peer information
45 type PeerInfo struct {
52 func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
53 services := consensus.SFFullNode
54 if len(Peer.Other) != 0 {
55 if serviceFlag, err := strconv.ParseUint(Peer.Other[0], 10, 64); err != nil {
56 services = consensus.ServiceFlag(serviceFlag)
61 version: defaultVersion,
68 knownBlocks: set.New(),
72 func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
75 return p.height, p.hash
78 func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
86 func (p *peer) requestBlockByHash(hash *bc.Hash) error {
87 msg := &BlockRequestMessage{RawHash: hash.Byte32()}
88 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
92 func (p *peer) requestBlockByHeight(height uint64) error {
93 msg := &BlockRequestMessage{Height: height}
94 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
98 func (p *peer) SendTransactions(txs []*types.Tx) error {
99 for _, tx := range txs {
100 msg, err := NewTransactionNotifyMessage(tx)
102 return errors.New("Failed construction tx msg")
105 p.knownTxs.Add(hash.String())
107 return errPeerDropped
109 p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
114 func (p *peer) GetPeer() *p2p.Peer {
116 defer p.mtx.RUnlock()
122 func (p *peer) GetPeerInfo() *PeerInfo {
124 defer p.mtx.RUnlock()
127 RemoteAddr: p.swPeer.RemoteAddr,
133 // MarkTransaction marks a transaction as known for the peer, ensuring that it
134 // will never be propagated to this particular peer.
135 func (p *peer) MarkTransaction(hash *bc.Hash) {
139 // If we reached the memory allowance, drop a previously known transaction hash
140 for p.knownTxs.Size() >= maxKnownTxs {
143 p.knownTxs.Add(hash.String())
146 // MarkBlock marks a block as known for the peer, ensuring that the block will
147 // never be propagated to this particular peer.
148 func (p *peer) MarkBlock(hash *bc.Hash) {
152 // If we reached the memory allowance, drop a previously known block hash
153 for p.knownBlocks.Size() >= maxKnownBlocks {
156 p.knownBlocks.Add(hash.String())
159 // addBanScore increases the persistent and decaying ban score fields by the
160 // values passed as parameters. If the resulting score exceeds half of the ban
161 // threshold, a warning is logged including the reason provided. Further, if
162 // the score is above the ban threshold, the peer will be banned and
164 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
165 warnThreshold := defaultBanThreshold >> 1
166 if transient == 0 && persistent == 0 {
167 // The score is not being increased, but a warning message is still
168 // logged if the score is above the warn threshold.
169 score := p.banScore.Int()
170 if score > warnThreshold {
171 log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
175 score := p.banScore.Increase(persistent, transient)
176 if score > warnThreshold {
177 log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
178 if score > defaultBanThreshold {
179 log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
186 type peerSet struct {
187 peers map[string]*peer
192 // newPeerSet creates a new peer set to track the active participants.
193 func newPeerSet() *peerSet {
195 peers: make(map[string]*peer),
199 // Register injects a new peer into the working set, or returns an error if the
200 // peer is already known.
201 func (ps *peerSet) Register(p *peer) error {
203 defer ps.lock.Unlock()
208 if _, ok := ps.peers[p.id]; ok {
209 return errAlreadyRegistered
215 // Unregister removes a remote peer from the active set, disabling any further
216 // actions to/from that particular entity.
217 func (ps *peerSet) Unregister(id string) error {
219 defer ps.lock.Unlock()
221 if _, ok := ps.peers[id]; !ok {
222 return errNotRegistered
228 // Peer retrieves the registered peer with the given id.
229 func (ps *peerSet) Peer(id string) (*peer, bool) {
231 defer ps.lock.RUnlock()
232 p, ok := ps.peers[id]
237 // getPeerInfos return all peer information of current node
238 func (ps *peerSet) GetPeerInfos() []*PeerInfo {
239 var peerInfos []*PeerInfo
240 for _, peer := range ps.peers {
241 peerInfos = append(peerInfos, peer.GetPeerInfo())
246 // Len returns if the current number of peers in the set.
247 func (ps *peerSet) Len() int {
249 defer ps.lock.RUnlock()
254 // MarkTransaction marks a transaction as known for the peer, ensuring that it
255 // will never be propagated to this particular peer.
256 func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
258 defer ps.lock.RUnlock()
260 if peer, ok := ps.peers[peerID]; ok {
261 peer.MarkTransaction(hash)
265 // MarkBlock marks a block as known for the peer, ensuring that the block will
266 // never be propagated to this particular peer.
267 func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
269 defer ps.lock.RUnlock()
271 if peer, ok := ps.peers[peerID]; ok {
276 // PeersWithoutBlock retrieves a list of peers that do not have a given block in
277 // their set of known hashes.
278 func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
280 defer ps.lock.RUnlock()
282 list := make([]*peer, 0, len(ps.peers))
283 for _, p := range ps.peers {
284 if !p.knownBlocks.Has(hash.String()) {
285 list = append(list, p)
291 // PeersWithoutTx retrieves a list of peers that do not have a given transaction
292 // in their set of known hashes.
293 func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
295 defer ps.lock.RUnlock()
297 list := make([]*peer, 0, len(ps.peers))
298 for _, p := range ps.peers {
299 if !p.knownTxs.Has(hash.String()) {
300 list = append(list, p)
306 // BestPeer retrieves the known peer with the currently highest total difficulty.
307 func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
309 defer ps.lock.RUnlock()
311 var bestPeer *p2p.Peer
312 var bestHeight uint64
314 for _, p := range ps.peers {
315 if bestPeer == nil || p.height > bestHeight {
316 bestPeer, bestHeight = p.swPeer, p.height
320 return bestPeer, bestHeight
323 // Close disconnects all peers.
324 // No new peers can be registered after Close has returned.
325 func (ps *peerSet) Close() {
327 defer ps.lock.Unlock()
329 for _, p := range ps.peers {
335 func (ps *peerSet) AddPeer(peer *p2p.Peer) {
337 defer ps.lock.Unlock()
339 if _, ok := ps.peers[peer.Key]; !ok {
340 keeperPeer := newPeer(0, nil, peer)
341 ps.peers[peer.Key] = keeperPeer
342 log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
345 log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
348 func (ps *peerSet) RemovePeer(peerID string) {
350 defer ps.lock.Unlock()
352 delete(ps.peers, peerID)
353 log.WithField("ID", peerID).Info("Delete peer from peerset")
356 func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
358 defer ps.lock.Unlock()
360 if peer, ok := ps.peers[peerID]; ok {
361 peer.SetStatus(height, hash)
365 func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
366 peer, ok := ps.Peer(peerID)
368 return errors.New("Can't find peer. ")
370 return peer.requestBlockByHash(hash)
373 func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
374 peer, ok := ps.Peer(peerID)
376 return errors.New("Can't find peer. ")
378 return peer.requestBlockByHeight(height)
381 func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
382 msg, err := NewMinedBlockMessage(block)
384 return nil, errors.New("Failed construction block msg")
387 peers := ps.PeersWithoutBlock(&hash)
388 abnormalPeers := make([]*peer, 0)
389 for _, peer := range peers {
390 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
391 abnormalPeers = append(abnormalPeers, peer)
394 if p, ok := ps.Peer(peer.id); ok {
398 return abnormalPeers, nil
401 func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
402 return ps.BroadcastMinedBlock(block)
405 func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
406 msg, err := NewTransactionNotifyMessage(tx)
408 return nil, errors.New("Failed construction tx msg")
410 peers := ps.PeersWithoutTx(&tx.ID)
411 abnormalPeers := make([]*peer, 0)
412 for _, peer := range peers {
413 if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
414 abnormalPeers = append(abnormalPeers, peer)
417 if p, ok := ps.Peer(peer.id); ok {
418 p.MarkTransaction(&tx.ID)
421 return abnormalPeers, nil