OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / peer.go
index 72191c1..eaa20ef 100644 (file)
@@ -1,6 +1,7 @@
 package netsync
 
 import (
+       "encoding/hex"
        "net"
        "sync"
 
@@ -51,6 +52,7 @@ type peer struct {
        banScore    trust.DynamicBanScore
        knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
        knownBlocks *set.Set // Set of block hashes known to be known by this peer
+       filterAdds  *set.Set // Set of addresses that the spv node cares about.
 }
 
 func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
@@ -61,6 +63,7 @@ func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
                hash:        hash,
                knownTxs:    set.New(),
                knownBlocks: set.New(),
+               filterAdds:  set.New(),
        }
 }
 
@@ -84,6 +87,30 @@ func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
        return false
 }
 
+func (p *peer) addFilterAddress(address []byte) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       if p.filterAdds.Size() >= maxFilterAddressCount {
+               log.Warn("the count of filter addresses is greater than limit")
+               return
+       }
+       if len(address) > maxFilterAddressSize {
+               log.Warn("the size of filter address is greater than limit")
+               return
+       }
+       p.filterAdds.Add(hex.EncodeToString(address))
+}
+
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+       if !p.filterAdds.IsEmpty() {
+               p.filterAdds.Clear()
+       }
+       for _, address := range addresses {
+               p.addFilterAddress(address)
+       }
+}
+
 func (p *peer) getBlockByHeight(height uint64) bool {
        msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
        return p.TrySend(BlockchainChannel, msg)
@@ -109,6 +136,39 @@ func (p *peer) getPeerInfo() *PeerInfo {
        }
 }
 
+func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+       var relatedTxs []*types.Tx
+       var relatedStatuses []*bc.TxVerifyResult
+       for i, tx := range txs {
+               if p.isRelatedTx(tx) {
+                       relatedTxs = append(relatedTxs, tx)
+                       relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+               }
+       }
+       return relatedTxs, relatedStatuses
+}
+
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+       for _, input := range tx.Inputs {
+               switch inp := input.TypedInput.(type) {
+               case *types.SpendInput:
+                       if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+                               return true
+                       }
+               }
+       }
+       for _, output := range tx.Outputs {
+               if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+                       return true
+               }
+       }
+       return false
+}
+
+func (p *peer) isSPVNode() bool {
+       return !p.services.IsEnable(consensus.SFFullNode)
+}
+
 func (p *peer) markBlock(hash *bc.Hash) {
        p.mtx.Lock()
        defer p.mtx.Unlock()
@@ -170,8 +230,33 @@ func (p *peer) sendHeaders(headers []*types.BlockHeader) (bool, error) {
        return ok, nil
 }
 
+func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+       msg := NewMerkleBlockMessage()
+       if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+               return false, err
+       }
+
+       relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
+
+       txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+       if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+               return false, nil
+       }
+
+       statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
+       if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+               return false, nil
+       }
+
+       ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return ok, nil
+}
+
 func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
        for _, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) {
+                       continue
+               }
                msg, err := NewTransactionMessage(tx)
                if err != nil {
                        return false, errors.Wrap(err, "failed to tx msg")
@@ -262,6 +347,9 @@ func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
        hash := block.Hash()
        peers := ps.peersWithoutBlock(&hash)
        for _, peer := range peers {
+               if peer.isSPVNode() {
+                       continue
+               }
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
                        ps.removePeer(peer.ID())
                        continue
@@ -272,9 +360,12 @@ func (ps *peerSet) broadcastMinedBlock(block *types.Block) error {
 }
 
 func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
+       bestBlockHash := bestBlock.Hash()
+       peers := ps.peersWithoutBlock(&bestBlockHash)
+
        genesisHash := genesisBlock.Hash()
        msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
-       for _, peer := range ps.peers {
+       for _, peer := range peers {
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
                        ps.removePeer(peer.ID())
                        continue
@@ -291,6 +382,9 @@ func (ps *peerSet) broadcastTx(tx *types.Tx) error {
 
        peers := ps.peersWithoutTx(&tx.ID)
        for _, peer := range peers {
+               if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+                       continue
+               }
                if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
                        ps.removePeer(peer.ID())
                        continue