8 log "github.com/sirupsen/logrus"
9 "gopkg.in/fatih/set.v0"
11 "github.com/bytom/consensus"
12 "github.com/bytom/errors"
13 "github.com/bytom/p2p/trust"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/protocol/bc/types"
19 maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
20 maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
21 defaultBanThreshold = uint64(100)
24 //BasePeer is the interface for connection level peer
25 type BasePeer interface {
28 ServiceFlag() consensus.ServiceFlag
29 TrySend(byte, interface{}) bool
32 //BasePeerSet is the intergace for connection level peer manager
33 type BasePeerSet interface {
34 AddBannedPeer(string) error
35 StopPeerGracefully(string)
38 // PeerInfo indicate peer status snap
39 type PeerInfo struct {
40 ID string `json:"peer_id"`
41 RemoteAddr string `json:"remote_addr"`
42 Height uint64 `json:"height"`
43 Delay uint32 `json:"delay"`
49 services consensus.ServiceFlag
52 banScore trust.DynamicBanScore
53 knownTxs *set.Set // Set of transaction hashes known to be known by this peer
54 knownBlocks *set.Set // Set of block hashes known to be known by this peer
55 filterAdds *set.Set // Set of addresses that the spv node cares about.
58 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
61 services: basePeer.ServiceFlag(),
65 knownBlocks: set.New(),
66 filterAdds: set.New(),
70 func (p *peer) Height() uint64 {
76 func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
77 score := p.banScore.Increase(persistent, transient)
78 if score > defaultBanThreshold {
79 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Errorf("banning and disconnecting")
83 warnThreshold := defaultBanThreshold >> 1
84 if score > warnThreshold {
85 log.WithFields(log.Fields{"address": p.Addr(), "score": score, "reason": reason}).Warning("ban score increasing")
90 func (p *peer) addFilterAddress(address []byte) {
94 if p.filterAdds.Size() >= maxFilterAddressCount {
95 log.Warn("the count of filter addresses is greater than limit")
98 if len(address) > maxFilterAddressSize {
99 log.Warn("the size of filter address is greater than limit")
102 p.filterAdds.Add(hex.EncodeToString(address))
105 func (p *peer) addFilterAddresses(addresses [][]byte) {
106 if !p.filterAdds.IsEmpty() {
109 for _, address := range addresses {
110 p.addFilterAddress(address)
114 func (p *peer) getBlockByHeight(height uint64) bool {
115 msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
116 return p.TrySend(BlockchainChannel, msg)
119 func (p *peer) getBlocks(locator []*bc.Hash, stopHash *bc.Hash) bool {
120 msg := struct{ BlockchainMessage }{NewGetBlocksMessage(locator, stopHash)}
121 return p.TrySend(BlockchainChannel, msg)
124 func (p *peer) getHeaders(locator []*bc.Hash, stopHash *bc.Hash) bool {
125 msg := struct{ BlockchainMessage }{NewGetHeadersMessage(locator, stopHash)}
126 return p.TrySend(BlockchainChannel, msg)
129 func (p *peer) getPeerInfo() *PeerInfo {
131 defer p.mtx.RUnlock()
134 RemoteAddr: p.Addr().String(),
139 func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
140 var relatedTxs []*types.Tx
141 var relatedStatuses []*bc.TxVerifyResult
142 for i, tx := range txs {
143 if p.isRelatedTx(tx) {
144 relatedTxs = append(relatedTxs, tx)
145 relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
148 return relatedTxs, relatedStatuses
151 func (p *peer) isRelatedTx(tx *types.Tx) bool {
152 for _, input := range tx.Inputs {
153 switch inp := input.TypedInput.(type) {
154 case *types.SpendInput:
155 if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
160 for _, output := range tx.Outputs {
161 if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
168 func (p *peer) isSPVNode() bool {
169 return !p.services.IsEnable(consensus.SFFullNode)
172 func (p *peer) markBlock(hash *bc.Hash) {
176 for p.knownBlocks.Size() >= maxKnownBlocks {
179 p.knownBlocks.Add(hash.String())
182 func (p *peer) markTransaction(hash *bc.Hash) {
186 for p.knownTxs.Size() >= maxKnownTxs {
189 p.knownTxs.Add(hash.String())
192 func (p *peer) sendBlock(block *types.Block) (bool, error) {
193 msg, err := NewBlockMessage(block)
195 return false, errors.Wrap(err, "fail on NewBlockMessage")
198 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
200 blcokHash := block.Hash()
201 p.knownBlocks.Add(blcokHash.String())
206 func (p *peer) sendBlocks(blocks []*types.Block) (bool, error) {
207 msg, err := NewBlocksMessage(blocks)
209 return false, errors.Wrap(err, "fail on NewBlocksMessage")
212 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
216 for _, block := range blocks {
217 blcokHash := block.Hash()
218 p.knownBlocks.Add(blcokHash.String())
223 func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
224 msg, err := NewHeadersMessage(headers)
226 return false, errors.New("fail on NewHeadersMessage")
229 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
233 func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
234 msg := NewMerkleBlockMessage()
235 if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
239 relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
241 txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
242 if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
246 statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
247 if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
251 ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
255 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
256 for _, tx := range txs {
257 if p.isSPVNode() && !p.isRelatedTx(tx) {
260 msg, err := NewTransactionMessage(tx)
262 return false, errors.Wrap(err, "failed to tx msg")
265 if p.knownTxs.Has(tx.ID.String()) {
268 if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
271 p.knownTxs.Add(tx.ID.String())
276 func (p *peer) setStatus(height uint64, hash *bc.Hash) {
283 type peerSet struct {
286 peers map[string]*peer
289 // newPeerSet creates a new peer set to track the active participants.
290 func newPeerSet(basePeerSet BasePeerSet) *peerSet {
292 BasePeerSet: basePeerSet,
293 peers: make(map[string]*peer),
297 func (ps *peerSet) addBanScore(peerID string, persistent, transient uint64, reason string) {
299 peer := ps.peers[peerID]
305 if ban := peer.addBanScore(persistent, transient, reason); !ban {
308 if err := ps.AddBannedPeer(peer.Addr().String()); err != nil {
309 log.WithField("err", err).Error("fail on add ban peer")
311 ps.removePeer(peerID)
314 func (ps *peerSet) addPeer(peer BasePeer, height uint64, hash *bc.Hash) {
316 defer ps.mtx.Unlock()
318 if _, ok := ps.peers[peer.ID()]; !ok {
319 ps.peers[peer.ID()] = newPeer(height, hash, peer)
322 log.WithField("ID", peer.ID()).Warning("add existing peer to blockKeeper")
325 func (ps *peerSet) bestPeer(flag consensus.ServiceFlag) *peer {
327 defer ps.mtx.RUnlock()
330 for _, p := range ps.peers {
331 if !p.services.IsEnable(flag) {
334 if bestPeer == nil || p.height > bestPeer.height {
341 func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
342 msg, err := NewMinedBlockMessage(block)
344 return errors.Wrap(err, "fail on broadcast mined block")
348 peers := ps.peersWithoutBlock(&hash)
349 for _, peer := range peers {
350 if peer.isSPVNode() {
353 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
354 ps.removePeer(peer.ID())
357 peer.markBlock(&hash)
362 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
363 bestBlockHash := bestBlock.Hash()
364 peers := ps.peersWithoutBlock(&bestBlockHash)
366 genesisHash := genesisBlock.Hash()
367 msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
368 for _, peer := range peers {
369 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
370 ps.removePeer(peer.ID())
377 func (ps *peerSet) broadcastTx(tx *types.Tx) error {
378 msg, err := NewTransactionMessage(tx)
380 return errors.Wrap(err, "fail on broadcast tx")
383 peers := ps.peersWithoutTx(&tx.ID)
384 for _, peer := range peers {
385 if peer.isSPVNode() && !peer.isRelatedTx(tx) {
388 if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
389 ps.removePeer(peer.ID())
392 peer.markTransaction(&tx.ID)
397 func (ps *peerSet) errorHandler(peerID string, err error) {
398 if errors.Root(err) == errPeerMisbehave {
399 ps.addBanScore(peerID, 20, 0, err.Error())
401 ps.removePeer(peerID)
405 // Peer retrieves the registered peer with the given id.
406 func (ps *peerSet) getPeer(id string) *peer {
408 defer ps.mtx.RUnlock()
412 func (ps *peerSet) getPeerInfos() []*PeerInfo {
414 defer ps.mtx.RUnlock()
416 result := []*PeerInfo{}
417 for _, peer := range ps.peers {
418 result = append(result, peer.getPeerInfo())
423 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
425 defer ps.mtx.RUnlock()
428 for _, peer := range ps.peers {
429 if !peer.knownBlocks.Has(hash.String()) {
430 peers = append(peers, peer)
436 func (ps *peerSet) peersWithoutTx(hash *bc.Hash) []*peer {
438 defer ps.mtx.RUnlock()
441 for _, peer := range ps.peers {
442 if !peer.knownTxs.Has(hash.String()) {
443 peers = append(peers, peer)
449 func (ps *peerSet) removePeer(peerID string) {
451 delete(ps.peers, peerID)
453 ps.StopPeerGracefully(peerID)