OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
index ef64ef8..eaa20ef 100644 (file)
 package netsync
 
 import (
+       "encoding/hex"
+       "net"
        "sync"
 
        log "github.com/sirupsen/logrus"
        "gopkg.in/fatih/set.v0"
 
+       "github.com/bytom/consensus"
        "github.com/bytom/errors"
-       "github.com/bytom/p2p"
        "github.com/bytom/p2p/trust"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
 )
 
-var (
-       errClosed            = errors.New("peer set is closed")
-       errAlreadyRegistered = errors.New("peer is already registered")
-       errNotRegistered     = errors.New("peer is not registered")
-)
-
 const (
-       defaultVersion      = 1
+       maxKnownTxs         = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+       maxKnownBlocks      = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
        defaultBanThreshold = uint64(100)
 )
 
-type peer struct {
-       mtx      sync.RWMutex
-       version  int // Protocol version negotiated
-       id       string
-       height   uint64
-       hash     *bc.Hash
-       banScore trust.DynamicBanScore
+//BasePeer is the interface for connection level peer
+type BasePeer interface {
+       Addr() net.Addr
+       ID() string
+       ServiceFlag() consensus.ServiceFlag
+       TrySend(byte, interface{}) bool
+}
 
-       swPeer *p2p.Peer
+//BasePeerSet is the intergace for connection level peer manager
+type BasePeerSet interface {
+       AddBannedPeer(string) error
+       StopPeerGracefully(string)
+}
 
+// PeerInfo indicate peer status snap
+type PeerInfo struct {
+       ID         string `json:"peer_id"`
+       RemoteAddr string `json:"remote_addr"`
+       Height     uint64 `json:"height"`
+       Delay      uint32 `json:"delay"`
+}
+
+type peer struct {
+       BasePeer
+       mtx         sync.RWMutex
+       services    consensus.ServiceFlag
+       height      uint64
+       hash        *bc.Hash
+       banScore    trust.DynamicBanScore
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       filterAdds  *set.Set // Set of addresses that the spv node cares about.
 }
 
-func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
+func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
        return &peer{
-               version:     defaultVersion,
-               id:          Peer.Key,
+               BasePeer:    basePeer,
+               services:    basePeer.ServiceFlag(),
                height:      height,
                hash:        hash,
-               swPeer:      Peer,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
+               filterAdds:  set.New(),
        }
 }
 
-func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+func (p *peer) Height() uint64 {
        p.mtx.RLock()
        defer p.mtx.RUnlock()
-       return p.height, p.hash
+       return p.height
 }
 
-func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+       score := p.banScore.Increase(persistent, transient)
+       if score > defaultBanThreshold {
+               log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
+               return true
+       }
 
-       p.height = height
-       p.hash = hash
+       warnThreshold := defaultBanThreshold >> 1
+       if score > warnThreshold {
+               log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
+       }
+       return false
 }
 
-func (p *peer) requestBlockByHash(hash *bc.Hash) error {
-       msg := &BlockRequestMessage{RawHash: hash.Byte32()}
-       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
-}
+func (p *peer) addFilterAddress(address []byte) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
 
-func (p *peer) requestBlockByHeight(height uint64) error {
-       msg := &BlockRequestMessage{Height: height}
-       p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
+       if p.filterAdds.Size() >= maxFilterAddressCount {
+               log.Warn("the count of filter addresses is greater than limit")
+               return
+       }
+       if len(address) > maxFilterAddressSize {
+               log.Warn("the size of filter address is greater than limit")
+               return
+       }
+       p.filterAdds.Add(hex.EncodeToString(address))
 }
 
-func (p *peer) SendTransactions(txs []*types.Tx) error {
-       for _, tx := range txs {
-               msg, err := NewTransactionNotifyMessage(tx)
-               if err != nil {
-                       return errors.New("Failed construction tx msg")
-               }
-               hash := &tx.ID
-               p.knownTxs.Add(hash.String())
-               if p.swPeer == nil {
-                       return errPeerDropped
-               }
-               p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+       if !p.filterAdds.IsEmpty() {
+               p.filterAdds.Clear()
+       }
+       for _, address := range addresses {
+               p.addFilterAddress(address)
        }
-       return nil
 }
 
-func (p *peer) getPeer() *p2p.Peer {
-       p.mtx.RLock()
-       defer p.mtx.RUnlock()
+func (p *peer) getBlockByHeight(height uint64) bool {
+       msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
+       return p.TrySend(BlockchainChannel, msg)
+}
 
-       return p.swPeer
+func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
+       return p.TrySend(BlockchainChannel, msg)
 }
 
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (p *peer) MarkTransaction(hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
+func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
+       msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
+       return p.TrySend(BlockchainChannel, msg)
+}
 
-       // If we reached the memory allowance, drop a previously known transaction hash
-       for p.knownTxs.Size() >= maxKnownTxs {
-               p.knownTxs.Pop()
+func (p *peer) getPeerInfo() *PeerInfo {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+       return &PeerInfo{
+               ID:         p.ID(),
+               RemoteAddr: p.Addr().String(),
+               Height:     p.height,
        }
-       p.knownTxs.Add(hash.String())
 }
 
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (p *peer) MarkBlock(hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       // If we reached the memory allowance, drop a previously known block hash
-       for p.knownBlocks.Size() >= maxKnownBlocks {
-               p.knownBlocks.Pop()
+func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+       var relatedTxs []*types.Tx
+       var relatedStatuses []*bc.TxVerifyResult
+       for i, tx := range txs {
+               if p.isRelatedTx(tx) {
+                       relatedTxs = append(relatedTxs, tx)
+                       relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+               }
        }
-       p.knownBlocks.Add(hash.String())
+       return relatedTxs, relatedStatuses
 }
 
-// addBanScore increases the persistent and decaying ban score fields by the
-// values passed as parameters. If the resulting score exceeds half of the ban
-// threshold, a warning is logged including the reason provided. Further, if
-// the score is above the ban threshold, the peer will be banned and
-// disconnected.
-func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
-       warnThreshold := defaultBanThreshold >> 1
-       if transient == 0 && persistent == 0 {
-               // The score is not being increased, but a warning message is still
-               // logged if the score is above the warn threshold.
-               score := p.banScore.Int()
-               if score > warnThreshold {
-                       log.Infof("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p.id, reason, score)
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+       for _, input := range tx.Inputs {
+               switch inp := input.TypedInput.(type) {
+               case *types.SpendInput:
+                       if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+                               return true
+                       }
                }
-               return false
        }
-       score := p.banScore.Increase(persistent, transient)
-       if score > warnThreshold {
-               log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p.id, reason, score)
-               if score > defaultBanThreshold {
-                       log.Errorf("Misbehaving peer %s -- banning and disconnecting", p.id)
+       for _, output := range tx.Outputs {
+               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
                        return true
                }
        }
        return false
 }
 
-type peerSet struct {
-       peers  map[string]*peer
-       lock   sync.RWMutex
-       closed bool
+func (p *peer) isSPVNode() bool {
+       return !p.services.IsEnable(consensus.SFFullNode)
 }
 
-// newPeerSet creates a new peer set to track the active participants.
-func newPeerSet() *peerSet {
-       return &peerSet{
-               peers: make(map[string]*peer),
+func (p *peer) markBlock(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       for p.knownBlocks.Size() >= maxKnownBlocks {
+               p.knownBlocks.Pop()
        }
+       p.knownBlocks.Add(hash.String())
 }
 
-// Register injects a new peer into the working set, or returns an error if the
-// peer is already known.
-func (ps *peerSet) Register(p *peer) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (p *peer) markTransaction(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
 
-       if ps.closed {
-               return errClosed
-       }
-       if _, ok := ps.peers[p.id]; ok {
-               return errAlreadyRegistered
+       for p.knownTxs.Size() >= maxKnownTxs {
+               p.knownTxs.Pop()
        }
-       ps.peers[p.id] = p
-       return nil
+       p.knownTxs.Add(hash.String())
 }
 
-// Unregister removes a remote peer from the active set, disabling any further
-// actions to/from that particular entity.
-func (ps *peerSet) Unregister(id string) error {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (p *peer) sendBlock(block *types.Block) (bool, error) {
+       msg, err := NewBlockMessage(block)
+       if err != nil {
+               return false, errors.Wrap(err, "fail on NewBlockMessage")
+       }
 
-       if _, ok := ps.peers[id]; !ok {
-               return errNotRegistered
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       if ok {
+               blcokHash := block.Hash()
+               p.knownBlocks.Add(blcokHash.String())
        }
-       delete(ps.peers, id)
-       return nil
+       return ok, nil
 }
 
-// Peer retrieves the registered peer with the given id.
-func (ps *peerSet) Peer(id string) (*peer, bool) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-       p, ok := ps.peers[id]
-       return p, ok
-}
+func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
+       msg, err := NewBlocksMessage(blocks)
+       if err != nil {
+               return false, errors.Wrap(err, "fail on NewBlocksMessage")
+       }
 
-// Len returns if the current number of peers in the set.
-func (ps *peerSet) Len() int {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+       if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               return ok, nil
+       }
 
-       return len(ps.peers)
+       for _, block := range blocks {
+               blcokHash := block.Hash()
+               p.knownBlocks.Add(blcokHash.String())
+       }
+       return true, nil
 }
 
-// MarkTransaction marks a transaction as known for the peer, ensuring that it
-// will never be propagated to this particular peer.
-func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.MarkTransaction(hash)
+func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
+       msg, err := NewHeadersMessage(headers)
+       if err != nil {
+               return false, errors.New("fail on NewHeadersMessage")
        }
-}
 
-// MarkBlock marks a block as known for the peer, ensuring that the block will
-// never be propagated to this particular peer.
-func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return ok, nil
+}
 
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.MarkBlock(hash)
+func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+       msg := NewMerkleBlockMessage()
+       if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+               return false, err
        }
-}
 
-// PeersWithoutBlock retrieves a list of peers that do not have a given block in
-// their set of known hashes.
-func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+       relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
 
-       list := make([]*peer, 0, len(ps.peers))
-       for _, p := range ps.peers {
-               if !p.knownBlocks.Has(hash.String()) {
-                       list = append(list, p)
-               }
+       txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+       if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+               return false, nil
        }
-       return list
-}
 
-// PeersWithoutTx retrieves a list of peers that do not have a given transaction
-// in their set of known hashes.
-func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
-
-       list := make([]*peer, 0, len(ps.peers))
-       for _, p := range ps.peers {
-               if !p.knownTxs.Has(hash.String()) {
-                       list = append(list, p)
-               }
+       statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
+       if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+               return false, nil
        }
-       return list
-}
 
-// BestPeer retrieves the known peer with the currently highest total difficulty.
-func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
-       ps.lock.RLock()
-       defer ps.lock.RUnlock()
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return ok, nil
+}
 
-       var bestPeer *p2p.Peer
-       var bestHeight uint64
+func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
+       for _, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) {
+                       continue
+               }
+               msg, err := NewTransactionMessage(tx)
+               if err != nil {
+                       return false, errors.Wrap(err, "failed to tx msg")
+               }
 
-       for _, p := range ps.peers {
-               if bestPeer == nil || p.height > bestHeight {
-                       bestPeer, bestHeight = p.swPeer, p.height
+               if p.knownTxs.Has(tx.ID.String()) {
+                       continue
                }
+               if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       return ok, nil
+               }
+               p.knownTxs.Add(tx.ID.String())
        }
+       return true, nil
+}
 
-       return bestPeer, bestHeight
+func (p *peer) setStatus(height uint64, hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+       p.height = height
+       p.hash = hash
 }
 
-// Close disconnects all peers.
-// No new peers can be registered after Close has returned.
-func (ps *peerSet) Close() {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+type peerSet struct {
+       BasePeerSet
+       mtx   sync.RWMutex
+       peers map[string]*peer
+}
 
-       for _, p := range ps.peers {
-               p.swPeer.CloseConn()
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet(basePeerSet BasePeerSet) *peerSet {
+       return &peerSet{
+               BasePeerSet: basePeerSet,
+               peers:       make(map[string]*peer),
        }
-       ps.closed = true
 }
 
-func (ps *peerSet) AddPeer(peer *p2p.Peer) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
 
-       if _, ok := ps.peers[peer.Key]; !ok {
-               keeperPeer := newPeer(0, nil, peer)
-               ps.peers[peer.Key] = keeperPeer
-               log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
+       if peer == nil {
                return
        }
-       log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
-}
-
-func (ps *peerSet) RemovePeer(peerID string) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
-
-       delete(ps.peers, peerID)
-       log.WithField("ID", peerID).Info("Delete peer from peerset")
+       if ban := peer.addBanScore(persistent, transient, reason); !ban {
+               return
+       }
+       if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
+               log.WithField("err", err).Error("fail on add ban peer")
+       }
+       ps.removePeer(peerID)
 }
 
-func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
-       ps.lock.Lock()
-       defer ps.lock.Unlock()
+func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
+       ps.mtx.Lock()
+       defer ps.mtx.Unlock()
 
-       if peer, ok := ps.peers[peerID]; ok {
-               peer.SetStatus(height, hash)
+       if _, ok := ps.peers[peer.ID()]; !ok {
+               ps.peers[peer.ID()] = newPeer(height, hash, peer)
+               return
        }
+       log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
 }
 
-func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       peer, ok := ps.Peer(peerID)
-       if !ok {
-               return errors.New("Can't find peer. ")
-       }
-       return peer.requestBlockByHash(hash)
-}
+func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
 
-func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
-       peer, ok := ps.Peer(peerID)
-       if !ok {
-               return errors.New("Can't find peer. ")
+       var bestPeer *peer
+       for _, p := range ps.peers {
+               if !p.services.IsEnable(flag) {
+                       continue
+               }
+               if bestPeer == nil || p.height > bestPeer.height {
+                       bestPeer = p
+               }
        }
-       return peer.requestBlockByHeight(height)
+       return bestPeer
 }
 
-func (ps *peerSet) BroadcastMinedBlock(block *types.Block) ([]*peer, error) {
+func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
        msg, err := NewMinedBlockMessage(block)
        if err != nil {
-               return nil, errors.New("Failed construction block msg")
+               return errors.Wrap(err, "fail on broadcast mined block")
        }
+
        hash := block.Hash()
-       peers := ps.PeersWithoutBlock(&hash)
-       abnormalPeers := make([]*peer, 0)
+       peers := ps.peersWithoutBlock(&hash)
        for _, peer := range peers {
-               if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       abnormalPeers = append(abnormalPeers, peer)
+               if peer.isSPVNode() {
                        continue
                }
-               if p, ok := ps.Peer(peer.id); ok {
-                       p.MarkBlock(&hash)
+               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       ps.removePeer(peer.ID())
+                       continue
                }
+               peer.markBlock(&hash)
        }
-       return abnormalPeers, nil
+       return nil
 }
 
-func (ps *peerSet) BroadcastNewStatus(block *types.Block) ([]*peer, error) {
-       return ps.BroadcastMinedBlock(block)
+func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
+       bestBlockHash := bestBlock.Hash()
+       peers := ps.peersWithoutBlock(&bestBlockHash)
+
+       genesisHash := genesisBlock.Hash()
+       msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+       for _, peer := range peers {
+               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       ps.removePeer(peer.ID())
+                       continue
+               }
+       }
+       return nil
 }
 
-func (ps *peerSet) BroadcastTx(tx *types.Tx) ([]*peer, error) {
-       msg, err := NewTransactionNotifyMessage(tx)
+func (ps *peerSet) broadcastTx(tx *types.Tx) error {
+       msg, err := NewTransactionMessage(tx)
        if err != nil {
-               return nil, errors.New("Failed construction tx msg")
+               return errors.Wrap(err, "fail on broadcast tx")
        }
-       peers := ps.PeersWithoutTx(&tx.ID)
-       abnormalPeers := make([]*peer, 0)
+
+       peers := ps.peersWithoutTx(&tx.ID)
        for _, peer := range peers {
-               if ok := peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       abnormalPeers = append(abnormalPeers, peer)
+               if peer.isSPVNode() && !peer.isRelatedTx(tx) {
                        continue
                }
-               if p, ok := ps.Peer(peer.id); ok {
-                       p.MarkTransaction(&tx.ID)
+               if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+                       ps.removePeer(peer.ID())
+                       continue
+               }
+               peer.markTransaction(&tx.ID)
+       }
+       return nil
+}
+
+func (ps *peerSet) errorHandler(peerID string, err error) {
+       if errors.Root(err) == errPeerMisbehave {
+               ps.addBanScore(peerID, 20, 0, err.Error())
+       } else {
+               ps.removePeer(peerID)
+       }
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) getPeer(id string) *peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+       return ps.peers[id]
+}
+
+func (ps *peerSet) getPeerInfos() []*PeerInfo {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       result := []*PeerInfo{}
+       for _, peer := range ps.peers {
+               result = append(result, peer.getPeerInfo())
+       }
+       return result
+}
+
+func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       peers := []*peer{}
+       for _, peer := range ps.peers {
+               if !peer.knownBlocks.Has(hash.String()) {
+                       peers = append(peers, peer)
+               }
+       }
+       return peers
+}
+
+func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       peers := []*peer{}
+       for _, peer := range ps.peers {
+               if !peer.knownTxs.Has(hash.String()) {
+                       peers = append(peers, peer)
                }
        }
-       return abnormalPeers, nil
+       return peers
+}
+
+func (ps *peerSet) removePeer(peerID string) {
+       ps.mtx.Lock()
+       delete(ps.peers, peerID)
+       ps.mtx.Unlock()
+       ps.StopPeerGracefully(peerID)
 }