--- /dev/null
+package common
+
+import (
+ "errors"
+ "sync"
+)
+
+// OrderedSet is a set with limited capacity.
+// Items are evicted according to their insertion order.
+type OrderedSet struct {
+ capacity int
+ set map[interface{}]struct{}
+ queue []interface{}
+ start int
+ end int
+
+ lock sync.RWMutex
+}
+
+// NewOrderedSet creates an ordered set with given capacity
+func NewOrderedSet(capacity int) (*OrderedSet, error) {
+ if capacity < 1 {
+ return nil, errors.New("capacity must be a positive integer")
+ }
+
+ return &OrderedSet{
+ capacity: capacity,
+ set: map[interface{}]struct{}{},
+ queue: make([]interface{}, capacity),
+ end: -1,
+ }, nil
+}
+
+// Add inserts items into the set.
+// If capacity is reached, oldest items are evicted
+func (os *OrderedSet) Add(items ...interface{}) {
+ os.lock.Lock()
+ defer os.lock.Unlock()
+
+ for _, item := range items {
+ if _, ok := os.set[item]; ok {
+ continue
+ }
+
+ next := (os.end + 1) % os.capacity
+ if os.end != -1 && next == os.start {
+ delete(os.set, os.queue[os.start])
+ os.start = (os.start + 1) % os.capacity
+ }
+ os.end = next
+ os.queue[os.end] = item
+ os.set[item] = struct{}{}
+ }
+}
+
+// Has checks if certain items exists in the set
+func (os *OrderedSet) Has(item interface{}) bool {
+ os.lock.RLock()
+ defer os.lock.RUnlock()
+
+ _, ok := os.set[item]
+ return ok
+}
+
+// Size returns the size of the set
+func (os *OrderedSet) Size() int {
+ os.lock.RLock()
+ defer os.lock.RUnlock()
+
+ return len(os.set)
+}
txMsgSub *event.Subscription
}
-//NewChainManager create a chain sync manager.
+//NewManager create a chain sync manager.
func NewManager(config *cfg.Config, sw Switch, chain Chain, mempool Mempool, dispatcher *event.Dispatcher, peers *peers.PeerSet, fastSyncDB dbm.DB) (*Manager, error) {
manager := &Manager{
sw: sw,
if err != nil {
return
}
+
m.blockKeeper.processBlock(peer.ID(), block)
}
// the number of restored Tx should be very small or most of time ZERO
// Error returned from validation is ignored, tx could still be lost if validation fails.
// TODO: adjust tx timestamp so that it won't starve in pool.
- if _, err := c.ValidateTx(tx); err != nil {
+ if _, err := c.validateTx(tx); err != nil {
log.WithFields(log.Fields{"module": logModule, "tx_id": tx.Tx.ID.String(), "error": err}).Info("restore tx fail")
}
}
return c.orphanManage.BlockExist(&blockHash), nil
}
+ c.markTransactions(block.Transactions...)
+
if _, err := c.store.GetBlockHeader(&block.PreviousBlockHash); err != nil {
c.orphanManage.Add(block)
return true, nil
"github.com/vapor/protocol/state"
)
-const maxProcessBlockChSize = 1024
+const (
+ maxProcessBlockChSize = 1024
+ maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+)
// Chain provides functions for working with the Bytom block chain.
type Chain struct {
cond sync.Cond
bestBlockHeader *types.BlockHeader // the last block on current main chain
lastIrrBlockHeader *types.BlockHeader // the last irreversible block
+
+ knownTxs *common.OrderedSet
}
// NewChain returns a new Chain using store as the underlying storage.
func NewChain(store Store, txPool *TxPool, eventDispatcher *event.Dispatcher) (*Chain, error) {
+ knownTxs, _ := common.NewOrderedSet(maxKnownTxs)
c := &Chain{
orphanManage: NewOrphanManage(),
txPool: txPool,
signatureCache: common.NewCache(maxSignatureCacheSize),
eventDispatcher: eventDispatcher,
processBlockCh: make(chan *processBlockMsg, maxProcessBlockChSize),
+ knownTxs: knownTxs,
}
c.cond.L = new(sync.Mutex)
return longestTail, nil
}
+func (c *Chain) hasSeenTx(tx *types.Tx) bool {
+ return c.knownTxs.Has(tx.ID.String())
+}
+
+func (c *Chain) markTransactions(txs ...*types.Tx) {
+ for _, tx := range txs {
+ c.knownTxs.Add(tx.ID.String())
+ }
+}
+
// This function must be called with mu lock in above level
func (c *Chain) setState(blockHeader, irrBlockHeader *types.BlockHeader, mainBlockHeaders []*types.BlockHeader, view *state.UtxoViewpoint, consensusResults []*state.ConsensusResult) error {
if err := c.store.SaveChainStatus(blockHeader, irrBlockHeader, mainBlockHeaders, view, consensusResults); err != nil {
// per-transaction validation results and is consulted before
// performing full validation.
func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
+ if c.hasSeenTx(tx) {
+ return false, nil
+ }
+
+ c.markTransactions(tx)
+
+ return c.validateTx(tx)
+}
+
+// validateTx validates the given transaction without checking duplication.
+func (c *Chain) validateTx(tx *types.Tx) (bool, error) {
if ok := c.txPool.HaveTransaction(&tx.ID); ok {
return false, c.txPool.GetErrCache(&tx.ID)
}