OSDN Git Service

filter out known txs (#280)
authorapolloww <32606824+apolloww@users.noreply.github.com>
Mon, 15 Jul 2019 10:58:18 +0000 (18:58 +0800)
committerPaladz <yzhu101@uottawa.ca>
Mon, 15 Jul 2019 10:58:18 +0000 (18:58 +0800)
* filter out known txs

* move filtering logic into chain level

* skip filtering when restoring tx back to pool

* refactor

common/ordered_set.go [new file with mode: 0644]
netsync/chainmgr/handle.go
protocol/block.go
protocol/protocol.go
protocol/tx.go

diff --git a/common/ordered_set.go b/common/ordered_set.go
new file mode 100644 (file)
index 0000000..ee9ec39
--- /dev/null
@@ -0,0 +1,71 @@
+package common
+
+import (
+       "errors"
+       "sync"
+)
+
+// OrderedSet is a set with limited capacity.
+// Items are evicted according to their insertion order.
+type OrderedSet struct {
+       capacity int
+       set      map[interface{}]struct{}
+       queue    []interface{}
+       start    int
+       end      int
+
+       lock sync.RWMutex
+}
+
+// NewOrderedSet creates an ordered set with given capacity
+func NewOrderedSet(capacity int) (*OrderedSet, error) {
+       if capacity < 1 {
+               return nil, errors.New("capacity must be a positive integer")
+       }
+
+       return &OrderedSet{
+               capacity: capacity,
+               set:      map[interface{}]struct{}{},
+               queue:    make([]interface{}, capacity),
+               end:      -1,
+       }, nil
+}
+
+// Add inserts items into the set.
+// If capacity is reached, oldest items are evicted
+func (os *OrderedSet) Add(items ...interface{}) {
+       os.lock.Lock()
+       defer os.lock.Unlock()
+
+       for _, item := range items {
+               if _, ok := os.set[item]; ok {
+                       continue
+               }
+
+               next := (os.end + 1) % os.capacity
+               if os.end != -1 && next == os.start {
+                       delete(os.set, os.queue[os.start])
+                       os.start = (os.start + 1) % os.capacity
+               }
+               os.end = next
+               os.queue[os.end] = item
+               os.set[item] = struct{}{}
+       }
+}
+
+// Has checks if certain items exists in the set
+func (os *OrderedSet) Has(item interface{}) bool {
+       os.lock.RLock()
+       defer os.lock.RUnlock()
+
+       _, ok := os.set[item]
+       return ok
+}
+
+// Size returns the size of the set
+func (os *OrderedSet) Size() int {
+       os.lock.RLock()
+       defer os.lock.RUnlock()
+
+       return len(os.set)
+}
index 91f3949..656a950 100644 (file)
@@ -68,7 +68,7 @@ type Manager struct {
        txMsgSub        *event.Subscription
 }
 
-//NewChainManager create a chain sync manager.
+//NewManager create a chain sync manager.
 func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
        manager := &Manager{
                sw:              sw,
@@ -104,6 +104,7 @@ func (m *Manager) handleBlockMsg(peer *peers.Peer, msg *msgs.BlockMessage) {
        if err != nil {
                return
        }
+
        m.blockKeeper.processBlock(peer.ID(), block)
 }
 
index 4870de3..9552a01 100644 (file)
@@ -238,7 +238,7 @@ func (c *Chain) reorganizeChain(blockHeader *types.BlockHeader) error {
                // the number of restored Tx should be very small or most of time ZERO
                // Error returned from validation is ignored, tx could still be lost if validation fails.
                // TODO: adjust tx timestamp so that it won't starve in pool.
-               if _, err := c.ValidateTx(tx); err != nil {
+               if _, err := c.validateTx(tx); err != nil {
                        log.WithFields(log.Fields{"module": logModule, "tx_id": tx.Tx.ID.String(), "error": err}).Info("restore tx fail")
                }
        }
@@ -357,6 +357,8 @@ func (c *Chain) processBlock(block *types.Block) (bool, error) {
                return c.orphanManage.BlockExist(&blockHash), nil
        }
 
+       c.markTransactions(block.Transactions...)
+
        if _, err := c.store.GetBlockHeader(&block.PreviousBlockHash); err != nil {
                c.orphanManage.Add(block)
                return true, nil
index 38cab47..d7ea944 100644 (file)
@@ -13,7 +13,10 @@ import (
        "github.com/vapor/protocol/state"
 )
 
-const maxProcessBlockChSize = 1024
+const (
+       maxProcessBlockChSize = 1024
+       maxKnownTxs           = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+)
 
 // Chain provides functions for working with the Bytom block chain.
 type Chain struct {
@@ -28,10 +31,13 @@ type Chain struct {
        cond               sync.Cond
        bestBlockHeader    *types.BlockHeader // the last block on current main chain
        lastIrrBlockHeader *types.BlockHeader // the last irreversible block
+
+       knownTxs *common.OrderedSet
 }
 
 // NewChain returns a new Chain using store as the underlying storage.
 func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*Chain, error) {
+       knownTxs, _ := common.NewOrderedSet(maxKnownTxs)
        c := &Chain{
                orphanManage:    NewOrphanManage(),
                txPool:          txPool,
@@ -39,6 +45,7 @@ func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*
                signatureCache:  common.NewCache(maxSignatureCacheSize),
                eventDispatcher: eventDispatcher,
                processBlockCh:  make(chan *processBlockMsg, maxProcessBlockChSize),
+               knownTxs:        knownTxs,
        }
        c.cond.L = new(sync.Mutex)
 
@@ -165,6 +172,16 @@ func (c *Chain) traceLongestChainTail(blockHeader *types.BlockHeader) (*types.Bl
        return longestTail, nil
 }
 
+func (c *Chain) hasSeenTx(tx *types.Tx) bool {
+       return c.knownTxs.Has(tx.ID.String())
+}
+
+func (c *Chain) markTransactions(txs ...*types.Tx) {
+       for _, tx := range txs {
+               c.knownTxs.Add(tx.ID.String())
+       }
+}
+
 // This function must be called with mu lock in above level
 func (c *Chain) setState(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error {
        if err := c.store.SaveChainStatus(blockHeader, irrBlockHeader, mainBlockHeaders, view, consensusResults); err != nil {
index 6eadc3a..966b292 100644 (file)
@@ -27,6 +27,17 @@ func (c *Chain) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) err
 // per-transaction validation results and is consulted before
 // performing full validation.
 func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
+       if c.hasSeenTx(tx) {
+               return false, nil
+       }
+
+       c.markTransactions(tx)
+
+       return c.validateTx(tx)
+}
+
+// validateTx validates the given transaction without checking duplication.
+func (c *Chain) validateTx(tx *types.Tx) (bool, error) {
        if ok := c.txPool.HaveTransaction(&tx.ID); ok {
                return false, c.txPool.GetErrCache(&tx.ID)
        }