package netsync
import (
- "strconv"
+ "encoding/hex"
+ "net"
"sync"
log "github.com/sirupsen/logrus"
"github.com/bytom/consensus"
"github.com/bytom/errors"
- "github.com/bytom/p2p"
"github.com/bytom/p2p/trust"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
-var (
- errClosed = errors.New("peer set is closed")
- errAlreadyRegistered = errors.New("peer is already registered")
- errNotRegistered = errors.New("peer is not registered")
-)
-
const (
- defaultVersion = 1
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+ maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
defaultBanThreshold = uint64(100)
)
-type peer struct {
- mtx sync.RWMutex
- version int // Protocol version negotiated
- services consensus.ServiceFlag
- id string
- height uint64
- hash *bc.Hash
- banScore trust.DynamicBanScore
-
- swPeer *p2p.Peer
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+ Addr() net.Addr
+ ID() string
+ ServiceFlag() consensus.ServiceFlag
+ TrySend(byte, interface{}) bool
+}
- knownTxs *set.Set // Set of transaction hashes known to be known by this peer
- knownBlocks *set.Set // Set of block hashes known to be known by this peer
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+ AddBannedPeer(string) error
+ StopPeerGracefully(string)
}
-// PeerInfo indicate peer information
+// PeerInfo indicate peer status snap
type PeerInfo struct {
- Id string `json:"id"`
+ ID string `json:"peer_id"`
RemoteAddr string `json:"remote_addr"`
Height uint64 `json:"height"`
Delay uint32 `json:"delay"`
}
-func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
- services := consensus.SFFullNode
- if len(Peer.Other) != 0 {
- if serviceFlag, err := strconv.ParseUint(Peer.Other[0], 10, 64); err != nil {
- services = consensus.ServiceFlag(serviceFlag)
- }
- }
+type peer struct {
+ BasePeer
+ mtx sync.RWMutex
+ services consensus.ServiceFlag
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+ knownTxs *set.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
+}
+func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
return &peer{
- version: defaultVersion,
- services: services,
- id: Peer.Key,
+ BasePeer: basePeer,
+ services: basePeer.ServiceFlag(),
height: height,
hash: hash,
- swPeer: Peer,
knownTxs: set.New(),
knownBlocks: set.New(),
+ filterAdds: set.New(),
}
}
-func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+func (p *peer) Height() uint64 {
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.height, p.hash
+ return p.height
}
-func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+ score := p.banScore.Increase(persistent, transient)
+ if score > defaultBanThreshold {
+ log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
+ return true
+ }
- p.height = height
- p.hash = hash
+ warnThreshold := defaultBanThreshold >> 1
+ if score > warnThreshold {
+ log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
+ }
+ return false
}
-func (p *peer) requestBlockByHash(hash *bc.Hash) error {
- msg := &BlockRequestMessage{RawHash: hash.Byte32()}
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
-}
+func (p *peer) addFilterAddress(address []byte) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
-func (p *peer) requestBlockByHeight(height uint64) error {
- msg := &BlockRequestMessage{Height: height}
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
- return nil
+ if p.filterAdds.Size() >= maxFilterAddressCount {
+ log.Warn("the count of filter addresses is greater than limit")
+ return
+ }
+ if len(address) > maxFilterAddressSize {
+ log.Warn("the size of filter address is greater than limit")
+ return
+ }
+ p.filterAdds.Add(hex.EncodeToString(address))
}
-func (p *peer) SendTransactions(txs []*types.Tx) error {
- for _, tx := range txs {
- msg, err := NewTransactionNotifyMessage(tx)
- if err != nil {
- return errors.New("Failed construction tx msg")
- }
- hash := &tx.ID
- p.knownTxs.Add(hash.String())
- if p.swPeer == nil {
- return errPeerDropped
- }
- p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+ if !p.filterAdds.IsEmpty() {
+ p.filterAdds.Clear()
+ }
+ for _, address := range addresses {
+ p.addFilterAddress(address)
}
- return nil
}
-func (p *peer) GetPeer() *p2p.Peer {
- p.mtx.RLock()
- defer p.mtx.RUnlock()
+func (p *peer) getBlockByHeight(height uint64) bool {
+ msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
+ return p.TrySend(BlockchainChannel, msg)
+}
- return p.swPeer
+func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+ return p.TrySend(BlockchainChannel, msg)
}
+func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+ msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+ return p.TrySend(BlockchainChannel, msg)
+}
-func (p *peer) GetPeerInfo() *PeerInfo {
+func (p *peer) getPeerInfo() *PeerInfo {
p.mtx.RLock()
defer p.mtx.RUnlock()
return &PeerInfo{
- Id: p.id,
- RemoteAddr: p.swPeer.RemoteAddr,
+ ID: p.ID(),
+ RemoteAddr: p.Addr().String(),
Height: p.height,
- Delay: 0, // TODO
}
}
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (p *peer) MarkTransaction(hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- // If we reached the memory allowance, drop a previously known transaction hash
- for p.knownTxs.Size() >= maxKnownTxs {
- p.knownTxs.Pop()
- }
- p.knownTxs.Add(hash.String())
-}
-
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (p *peer) MarkBlock(hash *bc.Hash) {
- p.mtx.Lock()
- defer p.mtx.Unlock()
-
- // If we reached the memory allowance, drop a previously known block hash
- for p.knownBlocks.Size() >= maxKnownBlocks {
- p.knownBlocks.Pop()
+func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+ var relatedTxs []*types.Tx
+ var relatedStatuses []*bc.TxVerifyResult
+ for i, tx := range txs {
+ if p.isRelatedTx(tx) {
+ relatedTxs = append(relatedTxs, tx)
+ relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+ }
}
- p.knownBlocks.Add(hash.String())
+ return relatedTxs, relatedStatuses
}
-// addBanScore increases the persistent and decaying ban score fields by the
-// values passed as parameters. If the resulting score exceeds half of the ban
-// threshold, a warning is logged including the reason provided. Further, if
-// the score is above the ban threshold, the peer will be banned and
-// disconnected.
-func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
- warnThreshold := defaultBanThreshold >> 1
- if transient == 0 && persistent == 0 {
- // The score is not being increased, but a warning message is still
- // logged if the score is above the warn threshold.
- score := p.banScore.Int()
- if score > warnThreshold {
- log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+ for _, input := range tx.Inputs {
+ switch inp := input.TypedInput.(type) {
+ case *types.SpendInput:
+ if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+ return true
+ }
}
- return false
}
- score := p.banScore.Increase(persistent, transient)
- if score > warnThreshold {
- log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
- if score > defaultBanThreshold {
- log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
+ for _, output := range tx.Outputs {
+ if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
return true
}
}
return false
}
-type peerSet struct {
- peers map[string]*peer
- lock sync.RWMutex
- closed bool
+func (p *peer) isSPVNode() bool {
+ return !p.services.IsEnable(consensus.SFFullNode)
}
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
- return &peerSet{
- peers: make(map[string]*peer),
+func (p *peer) markBlock(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ for p.knownBlocks.Size() >= maxKnownBlocks {
+ p.knownBlocks.Pop()
}
+ p.knownBlocks.Add(hash.String())
}
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *peerSet) Register(p *peer) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (p *peer) markTransaction(hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
- if ps.closed {
- return errClosed
- }
- if _, ok := ps.peers[p.id]; ok {
- return errAlreadyRegistered
+ for p.knownTxs.Size() >= maxKnownTxs {
+ p.knownTxs.Pop()
}
- ps.peers[p.id] = p
- return nil
+ p.knownTxs.Add(hash.String())
}
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *peerSet) Unregister(id string) error {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (p *peer) sendBlock(block *types.Block) (bool, error) {
+ msg, err := NewBlockMessage(block)
+ if err != nil {
+ return false, errors.Wrap(err, "fail on NewBlockMessage")
+ }
- if _, ok := ps.peers[id]; !ok {
- return errNotRegistered
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ if ok {
+ blcokHash := block.Hash()
+ p.knownBlocks.Add(blcokHash.String())
}
- delete(ps.peers, id)
- return nil
+ return ok, nil
}
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) (*peer, bool) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
- p, ok := ps.peers[id]
- return p, ok
-}
+func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
+ msg, err := NewBlocksMessage(blocks)
+ if err != nil {
+ return false, errors.Wrap(err, "fail on NewBlocksMessage")
+ }
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return ok, nil
+ }
-// getPeerInfos return all peer information of current node
-func (ps *peerSet) GetPeerInfos() []*PeerInfo {
- var peerInfos []*PeerInfo
- for _, peer := range ps.peers {
- peerInfos = append(peerInfos, peer.GetPeerInfo())
+ for _, block := range blocks {
+ blcokHash := block.Hash()
+ p.knownBlocks.Add(blcokHash.String())
}
- return peerInfos
+ return true, nil
}
-// Len returns if the current number of peers in the set.
-func (ps *peerSet) Len() int {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
+ msg, err := NewHeadersMessage(headers)
+ if err != nil {
+ return false, errors.New("fail on NewHeadersMessage")
+ }
- return len(ps.peers)
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return ok, nil
}
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
-
- if peer, ok := ps.peers[peerID]; ok {
- peer.MarkTransaction(hash)
+func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+ msg := NewMerkleBlockMessage()
+ if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+ return false, err
}
-}
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+ relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
- if peer, ok := ps.peers[peerID]; ok {
- peer.MarkBlock(hash)
+ txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+ if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+ return false, nil
}
-}
-
-// PeersWithoutBlock retrieves a list of peers that do not have a given block in
-// their set of known hashes.
-func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownBlocks.Has(hash.String()) {
- list = append(list, p)
- }
+ statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
+ if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+ return false, nil
}
- return list
+
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return ok, nil
}
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
+ for _, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) {
+ continue
+ }
+ msg, err := NewTransactionMessage(tx)
+ if err != nil {
+ return false, errors.Wrap(err, "failed to tx msg")
+ }
- list := make([]*peer, 0, len(ps.peers))
- for _, p := range ps.peers {
- if !p.knownTxs.Has(hash.String()) {
- list = append(list, p)
+ if p.knownTxs.Has(tx.ID.String()) {
+ continue
+ }
+ if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ return ok, nil
}
+ p.knownTxs.Add(tx.ID.String())
}
- return list
+ return true, nil
}
-// BestPeer retrieves the known peer with the currently highest total difficulty.
-func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
- ps.lock.RLock()
- defer ps.lock.RUnlock()
+func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+ p.height = height
+ p.hash = hash
+}
- var bestPeer *p2p.Peer
- var bestHeight uint64
+type peerSet struct {
+ BasePeerSet
+ mtx sync.RWMutex
+ peers map[string]*peer
+}
- for _, p := range ps.peers {
- if bestPeer == nil || p.height > bestHeight {
- bestPeer, bestHeight = p.swPeer, p.height
- }
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet(basePeerSet BasePeerSet) *peerSet {
+ return &peerSet{
+ BasePeerSet: basePeerSet,
+ peers: make(map[string]*peer),
}
-
- return bestPeer, bestHeight
}
-// Close disconnects all peers.
-// No new peers can be registered after Close has returned.
-func (ps *peerSet) Close() {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
- for _, p := range ps.peers {
- p.swPeer.CloseConn()
+ if peer == nil {
+ return
+ }
+ if ban := peer.addBanScore(persistent, transient, reason); !ban {
+ return
}
- ps.closed = true
+ if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+ log.WithField("err", err).Error("fail on add ban peer")
+ }
+ ps.removePeer(peerID)
}
-func (ps *peerSet) AddPeer(peer *p2p.Peer) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+ ps.mtx.Lock()
+ defer ps.mtx.Unlock()
- if _, ok := ps.peers[peer.Key]; !ok {
- keeperPeer := newPeer(0, nil, peer)
- ps.peers[peer.Key] = keeperPeer
- log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
+ if _, ok := ps.peers[peer.ID()]; !ok {
+ ps.peers[peer.ID()] = newPeer(height, hash, peer)
return
}
- log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
+ log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
}
-func (ps *peerSet) RemovePeer(peerID string) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
+func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
- delete(ps.peers, peerID)
- log.WithField("ID", peerID).Info("Delete peer from peerset")
+ var bestPeer *peer
+ for _, p := range ps.peers {
+ if !p.services.IsEnable(flag) {
+ continue
+ }
+ if bestPeer == nil || p.height > bestPeer.height {
+ bestPeer = p
+ }
+ }
+ return bestPeer
}
-func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
- ps.lock.Lock()
- defer ps.lock.Unlock()
-
- if peer, ok := ps.peers[peerID]; ok {
- peer.SetStatus(height, hash)
+func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
+ msg, err := NewMinedBlockMessage(block)
+ if err != nil {
+ return errors.Wrap(err, "fail on broadcast mined block")
}
-}
-func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
- peer, ok := ps.Peer(peerID)
- if !ok {
- return errors.New("Can't find peer. ")
+ hash := block.Hash()
+ peers := ps.peersWithoutBlock(&hash)
+ for _, peer := range peers {
+ if peer.isSPVNode() {
+ continue
+ }
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
+ }
+ peer.markBlock(&hash)
}
- return peer.requestBlockByHash(hash)
+ return nil
}
-func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
- peer, ok := ps.Peer(peerID)
- if !ok {
- return errors.New("Can't find peer. ")
+func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
+ bestBlockHash := bestBlock.Hash()
+ peers := ps.peersWithoutBlock(&bestBlockHash)
+
+ genesisHash := genesisBlock.Hash()
+ msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+ for _, peer := range peers {
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
+ }
}
- return peer.requestBlockByHeight(height)
+ return nil
}
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
- msg, err := NewMinedBlockMessage(block)
+func (ps *peerSet) broadcastTx(tx *types.Tx) error {
+ msg, err := NewTransactionMessage(tx)
if err != nil {
- return nil, errors.New("Failed construction block msg")
+ return errors.Wrap(err, "fail on broadcast tx")
}
- hash := block.Hash()
- peers := ps.PeersWithoutBlock(&hash)
- abnormalPeers := make([]*peer, 0)
+
+ peers := ps.peersWithoutTx(&tx.ID)
for _, peer := range peers {
- if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- abnormalPeers = append(abnormalPeers, peer)
+ if peer.isSPVNode() && !peer.isRelatedTx(tx) {
continue
}
- if p, ok := ps.Peer(peer.id); ok {
- p.MarkBlock(&hash)
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
}
+ peer.markTransaction(&tx.ID)
+ }
+ return nil
+}
+
+func (ps *peerSet) errorHandler(peerID string, err error) {
+ if errors.Root(err) == errPeerMisbehave {
+ ps.addBanScore(peerID, 20, 0, err.Error())
+ } else {
+ ps.removePeer(peerID)
}
- return abnormalPeers, nil
}
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
- return ps.BroadcastMinedBlock(block)
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) getPeer(id string) *peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+ return ps.peers[id]
}
-func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
- msg, err := NewTransactionNotifyMessage(tx)
- if err != nil {
- return nil, errors.New("Failed construction tx msg")
+func (ps *peerSet) getPeerInfos() []*PeerInfo {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ result := []*PeerInfo{}
+ for _, peer := range ps.peers {
+ result = append(result, peer.getPeerInfo())
}
- peers := ps.PeersWithoutTx(&tx.ID)
- abnormalPeers := make([]*peer, 0)
- for _, peer := range peers {
- if ok := peer.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- abnormalPeers = append(abnormalPeers, peer)
- continue
+ return result
+}
+
+func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ peers := []*peer{}
+ for _, peer := range ps.peers {
+ if !peer.knownBlocks.Has(hash.String()) {
+ peers = append(peers, peer)
}
- if p, ok := ps.Peer(peer.id); ok {
- p.MarkTransaction(&tx.ID)
+ }
+ return peers
+}
+
+func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ peers := []*peer{}
+ for _, peer := range ps.peers {
+ if !peer.knownTxs.Has(hash.String()) {
+ peers = append(peers, peer)
}
}
- return abnormalPeers, nil
+ return peers
+}
+
+func (ps *peerSet) removePeer(peerID string) {
+ ps.mtx.Lock()
+ delete(ps.peers, peerID)
+ ps.mtx.Unlock()
+ ps.StopPeerGracefully(peerID)
}