package netsync
import (
+ "encoding/hex"
"net"
"sync"
banScore trust.DynamicBanScore
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
+ filterAdds *set.Set // Set of addresses that the spv node cares about.
}
func newPeer(height uint64, hash *bc.Hash, basePeer BasePeer) *peer {
hash: hash,
knownTxs: set.New(),
knownBlocks: set.New(),
+ filterAdds: set.New(),
}
}
return false
}
+func (p *peer) addFilterAddress(address []byte) {
+ p.mtx.Lock()
+ defer p.mtx.Unlock()
+
+ if p.filterAdds.Size() >= maxFilterAddressCount {
+ log.Warn("the count of filter addresses is greater than limit")
+ return
+ }
+ if len(address) > maxFilterAddressSize {
+ log.Warn("the size of filter address is greater than limit")
+ return
+ }
+ p.filterAdds.Add(hex.EncodeToString(address))
+}
+
+func (p *peer) addFilterAddresses(addresses [][]byte) {
+ if !p.filterAdds.IsEmpty() {
+ p.filterAdds.Clear()
+ }
+ for _, address := range addresses {
+ p.addFilterAddress(address)
+ }
+}
+
func (p *peer) getBlockByHeight(height uint64) bool {
msg := struct{ BlockchainMessage }{&GetBlockMessage{Height: height}}
return p.TrySend(BlockchainChannel, msg)
}
}
+func (p *peer) getRelatedTxAndStatus(txs []*types.Tx, txStatuses *bc.TransactionStatus) ([]*types.Tx, []*bc.TxVerifyResult) {
+ var relatedTxs []*types.Tx
+ var relatedStatuses []*bc.TxVerifyResult
+ for i, tx := range txs {
+ if p.isRelatedTx(tx) {
+ relatedTxs = append(relatedTxs, tx)
+ relatedStatuses = append(relatedStatuses, txStatuses.VerifyStatus[i])
+ }
+ }
+ return relatedTxs, relatedStatuses
+}
+
+func (p *peer) isRelatedTx(tx *types.Tx) bool {
+ for _, input := range tx.Inputs {
+ switch inp := input.TypedInput.(type) {
+ case *types.SpendInput:
+ if p.filterAdds.Has(hex.EncodeToString(inp.ControlProgram)) {
+ return true
+ }
+ }
+ }
+ for _, output := range tx.Outputs {
+ if p.filterAdds.Has(hex.EncodeToString(output.ControlProgram)) {
+ return true
+ }
+ }
+ return false
+}
+
+func (p *peer) isSPVNode() bool {
+ return !p.services.IsEnable(consensus.SFFullNode)
+}
+
func (p *peer) markBlock(hash *bc.Hash) {
p.mtx.Lock()
defer p.mtx.Unlock()
return ok, nil
}
+func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionStatus) (bool, error) {
+ msg := NewMerkleBlockMessage()
+ if err := msg.setRawBlockHeader(block.BlockHeader); err != nil {
+ return false, err
+ }
+
+ relatedTxs, relatedStatuses := p.getRelatedTxAndStatus(block.Transactions, txStatuses)
+
+ txHashes, txFlags := types.GetTxMerkleTreeProof(block.Transactions, relatedTxs)
+ if err := msg.setTxInfo(txHashes, txFlags, relatedTxs); err != nil {
+ return false, nil
+ }
+
+ statusHashes := types.GetStatusMerkleTreeProof(txStatuses.VerifyStatus, txFlags)
+ if err := msg.setStatusInfo(statusHashes, relatedStatuses); err != nil {
+ return false, nil
+ }
+
+ ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ return ok, nil
+}
+
func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
for _, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) {
+ continue
+ }
msg, err := NewTransactionMessage(tx)
if err != nil {
return false, errors.Wrap(err, "failed to tx msg")
hash := block.Hash()
peers := ps.peersWithoutBlock(&hash)
for _, peer := range peers {
+ if peer.isSPVNode() {
+ continue
+ }
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
ps.removePeer(peer.ID())
continue
return nil
}
+func (ps *peerSet) broadcastNewStatus(bestBlock, genesisBlock *types.Block) error {
+ bestBlockHash := bestBlock.Hash()
+ peers := ps.peersWithoutBlock(&bestBlockHash)
+
+ genesisHash := genesisBlock.Hash()
+ msg := NewStatusResponseMessage(&bestBlock.BlockHeader, &genesisHash)
+ for _, peer := range peers {
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ ps.removePeer(peer.ID())
+ continue
+ }
+ }
+ return nil
+}
+
func (ps *peerSet) broadcastTx(tx *types.Tx) error {
msg, err := NewTransactionMessage(tx)
if err != nil {
peers := ps.peersWithoutTx(&tx.ID)
for _, peer := range peers {
+ if peer.isSPVNode() && !peer.isRelatedTx(tx) {
+ continue
+ }
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
ps.removePeer(peer.ID())
continue