OSDN Git Service

set proposal have timeout (#444)
authorPaladz <yzhu101@uottawa.ca>
Thu, 21 Nov 2019 08:56:37 +0000 (16:56 +0800)
committerGitHub <noreply@github.com>
Thu, 21 Nov 2019 08:56:37 +0000 (16:56 +0800)
* set proposal have timeout

* edit for code review

application/mov/mov_core.go
proposal/blockproposer/blockproposer.go
proposal/proposal.go
protocol/protocol.go
protocol/validation/tx.go
test/bench_blockchain_test.go
test/performance/mining_test.go

index 3a4061b..9eb5b99 100644 (file)
@@ -76,14 +76,14 @@ func (m *MovCore) ApplyBlock(block *types.Block) error {
     become an infinite loop and DDoS attacks the whole network?
 */
 // BeforeProposalBlock return all transactions than can be matched, and the number of transactions cannot exceed the given capacity.
-func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error) {
+func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, error) {
        if blockHeight <= m.startBlockHeight {
-               return nil, 0, nil
+               return nil, nil
        }
 
        orderTable, err := buildOrderTable(m.movStore, txs)
        if err != nil {
-               return nil, 0, err
+               return nil, err
        }
 
        matchEngine := match.NewEngine(orderTable, maxFeeRate, nodeProgram)
@@ -91,7 +91,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
        tradePairIterator := database.NewTradePairIterator(m.movStore)
 
        var packagedTxs []*types.Tx
-       for gasLeft > 0 && tradePairIterator.HasNext() {
+       for gasLeft > 0 && !isTimeout() && tradePairIterator.HasNext() {
                tradePair := tradePairIterator.Next()
                if tradePairMap[tradePair.Key()] {
                        continue
@@ -99,10 +99,10 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
                tradePairMap[tradePair.Key()] = true
                tradePairMap[tradePair.Reverse().Key()] = true
 
-               for gasLeft > 0 && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) {
+               for gasLeft > 0 && !isTimeout() && matchEngine.HasMatchedTx(tradePair, tradePair.Reverse()) {
                        matchedTx, err := matchEngine.NextMatchedTx(tradePair, tradePair.Reverse())
                        if err != nil {
-                               return nil, 0, err
+                               return nil, err
                        }
 
                        gasUsed := calcMatchedTxGasUsed(matchedTx)
@@ -112,7 +112,7 @@ func (m *MovCore) BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, block
                        gasLeft -= gasUsed
                }
        }
-       return packagedTxs, gasLeft, nil
+       return packagedTxs, nil
 }
 
 // ChainStatus return the current block height and block hash in dex core
index 7cfe24e..7b7c0c4 100644 (file)
@@ -16,7 +16,8 @@ import (
 )
 
 const (
-       logModule     = "blockproposer"
+       logModule                 = "blockproposer"
+       timeProportionDenominator = 3
 )
 
 // BlockProposer propose several block in specified time range
@@ -74,7 +75,8 @@ func (b *BlockProposer) generateBlocks() {
                        continue
                }
 
-               block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime)
+               timeoutDuration := time.Duration(consensus.ActiveNetParams.BlockTimeInterval/timeProportionDenominator) * time.Millisecond
+               block, err := proposal.NewBlockTemplate(b.chain, b.accountManager, nextBlockTime, timeoutDuration)
                if err != nil {
                        log.WithFields(log.Fields{"module": logModule, "error": err}).Error("failed on create NewBlockTemplate")
                        continue
index b620465..c754412 100644 (file)
@@ -20,184 +20,253 @@ import (
 )
 
 const (
-       logModule = "mining"
+       logModule     = "mining"
+       batchApplyNum = 64
 )
 
-// createCoinbaseTx returns a coinbase transaction paying an appropriate subsidy
-// based on the passed block height to the provided address.  When the address
-// is nil, the coinbase transaction will instead be redeemable by anyone.
-func createCoinbaseTx(accountManager *account.Manager, chain *protocol.Chain, preBlockHeader *types.BlockHeader) (tx *types.Tx, err error) {
-       preBlockHash := preBlockHeader.Hash()
-       consensusResult, err := chain.GetConsensusResultByHash(&preBlockHash)
-       if err != nil {
-               return nil, err
-       }
+// NewBlockTemplate returns a new block template that is ready to be solved
+func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) (*types.Block, error) {
+       builder := newBlockBuilder(chain, accountManager, timestamp, timeoutDuration)
+       return builder.build()
+}
 
-       rewards, err := consensusResult.GetCoinbaseRewards(preBlockHeader.Height)
-       if err != nil {
-               return nil, err
-       }
+type blockBuilder struct {
+       chain          *protocol.Chain
+       accountManager *account.Manager
+
+       block    *types.Block
+       txStatus *bc.TransactionStatus
+       utxoView *state.UtxoViewpoint
 
-       return createCoinbaseTxByReward(accountManager, preBlockHeader.Height + 1, rewards)
+       timeoutCh   <-chan time.Time
+       gasLeft     int64
+       timeoutFlag bool
 }
 
-func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint64, rewards []state.CoinbaseReward) (tx *types.Tx, err error) {
-       arbitrary := append([]byte{0x00}, []byte(strconv.FormatUint(blockHeight, 10))...)
-       var script []byte
-       if accountManager == nil {
-               script, err = vmutil.DefaultCoinbaseProgram()
-       } else {
-               script, err = accountManager.GetCoinbaseControlProgram()
-               arbitrary = append(arbitrary, accountManager.GetCoinbaseArbitrary()...)
+func newBlockBuilder(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64, timeoutDuration time.Duration) *blockBuilder {
+       preBlockHeader := chain.BestBlockHeader()
+       block := &types.Block{
+               BlockHeader: types.BlockHeader{
+                       Version:           1,
+                       Height:            preBlockHeader.Height + 1,
+                       PreviousBlockHash: preBlockHeader.Hash(),
+                       Timestamp:         timestamp,
+                       BlockCommitment:   types.BlockCommitment{},
+                       BlockWitness:      types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)},
+               },
        }
-       if err != nil {
-               return nil, err
+
+       builder := &blockBuilder{
+               chain:          chain,
+               accountManager: accountManager,
+               block:          block,
+               txStatus:       bc.NewTransactionStatus(),
+               utxoView:       state.NewUtxoViewpoint(),
+               timeoutCh:      time.After(timeoutDuration),
+               gasLeft:        int64(consensus.ActiveNetParams.MaxBlockGas),
        }
+       return builder
+}
 
-       if len(arbitrary) > consensus.ActiveNetParams.CoinbaseArbitrarySizeLimit {
-               return nil, validation.ErrCoinbaseArbitraryOversize
+func (b *blockBuilder) applyCoinbaseTransaction() error {
+       coinbaseTx, err := b.createCoinbaseTx()
+       if err != nil {
+               return errors.Wrap(err, "fail on create coinbase tx")
        }
 
-       builder := txbuilder.NewBuilder(time.Now())
-       if err = builder.AddInput(types.NewCoinbaseInput(arbitrary), &txbuilder.SigningInstruction{}); err != nil {
-               return nil, err
+       gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: b.block.Height}, Transactions: []*bc.Tx{coinbaseTx.Tx}})
+       if err != nil {
+               return err
        }
-       if err = builder.AddOutput(types.NewIntraChainOutput(*consensus.BTMAssetID, 0, script)); err != nil {
-               return nil, err
+
+       b.block.Transactions = append(b.block.Transactions, coinbaseTx)
+       if err := b.txStatus.SetStatus(0, false); err != nil {
+               return err
        }
 
-       for _, r := range rewards {
-               if err = builder.AddOutput(types.NewIntraChainOutput(*consensus.BTMAssetID, r.Amount, r.ControlProgram)); err != nil {
-                       return nil, err
+       b.gasLeft -= gasState.GasUsed
+       return nil
+}
+func (b *blockBuilder) applyTransactions(txs []*types.Tx) error {
+       tempTxs := []*types.Tx{}
+       for i := 0; i < len(txs); i++ {
+               if tempTxs = append(tempTxs, txs[i]); len(tempTxs) < batchApplyNum && i != len(txs)-1 {
+                       continue
+               }
+
+               results, gasLeft := preValidateTxs(tempTxs, b.chain, b.utxoView, b.gasLeft)
+               for _, result := range results {
+                       if result.err != nil && !result.gasOnly {
+                               log.WithFields(log.Fields{"module": logModule, "error": result.err}).Error("mining block generation: skip tx due to")
+                               b.chain.GetTxPool().RemoveTransaction(&result.tx.ID)
+                               continue
+                       }
+
+                       if err := b.txStatus.SetStatus(len(b.block.Transactions), result.gasOnly); err != nil {
+                               return err
+                       }
+
+                       b.block.Transactions = append(b.block.Transactions, result.tx)
+               }
+
+               b.gasLeft = gasLeft
+               tempTxs = []*types.Tx{}
+               if b.isTimeout() {
+                       break
                }
        }
+       return nil
+}
 
-       _, txData, err := builder.Build()
-       if err != nil {
-               return nil, err
+func (b *blockBuilder) applyTransactionFromPool() error {
+       txDescList := b.chain.GetTxPool().GetTransactions()
+       sort.Sort(byTime(txDescList))
+
+       poolTxs := make([]*types.Tx, len(txDescList))
+       for i, txDesc := range txDescList {
+               poolTxs[i] = txDesc.Tx
        }
 
-       byteData, err := txData.MarshalText()
+       return b.applyTransactions(poolTxs)
+}
+
+func (b *blockBuilder) applyTransactionFromSubProtocol() error {
+       cp, err := b.accountManager.GetCoinbaseControlProgram()
        if err != nil {
-               return nil, err
+               return err
        }
 
-       txData.SerializedSize = uint64(len(byteData))
-       tx = &types.Tx{
-               TxData: *txData,
-               Tx:     types.MapTx(txData),
+       for i, p := range b.chain.SubProtocols() {
+               if b.gasLeft <= 0 || b.isTimeout() {
+                       break
+               }
+
+               subTxs, err := p.BeforeProposalBlock(b.block.Transactions, cp, b.block.Height, b.gasLeft, b.isTimeout)
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
+                       continue
+               }
+
+               if err := b.applyTransactions(subTxs); err != nil {
+                       return err
+               }
        }
-       return tx, nil
+       return nil
 }
 
-// NewBlockTemplate returns a new block template that is ready to be solved
-func NewBlockTemplate(chain *protocol.Chain, accountManager *account.Manager, timestamp uint64) (*types.Block, error) {
-       block := createBasicBlock(chain, timestamp)
-
-       view := state.NewUtxoViewpoint()
-       txStatus := bc.NewTransactionStatus()
+func (b *blockBuilder) build() (*types.Block, error) {
+       if err := b.applyCoinbaseTransaction(); err != nil {
+               return nil, err
+       }
 
-       gasLeft, err := applyCoinbaseTransaction(chain, block, txStatus, accountManager, int64(consensus.ActiveNetParams.MaxBlockGas))
-       if err != nil {
+       if err := b.applyTransactionFromPool(); err != nil {
                return nil, err
        }
 
-       gasLeft, err = applyTransactionFromPool(chain, view, block, txStatus, gasLeft)
-       if err != nil {
+       if err := b.applyTransactionFromSubProtocol(); err != nil {
                return nil, err
        }
-       
-       if err := applyTransactionFromSubProtocol(chain, view, block, txStatus, accountManager, gasLeft); err != nil {
+
+       if err := b.calcBlockCommitment(); err != nil {
                return nil, err
        }
 
+       _, err := b.chain.SignBlock(b.block)
+       return b.block, err
+}
+
+func (b *blockBuilder) calcBlockCommitment() (err error) {
        var txEntries []*bc.Tx
-       for _, tx := range block.Transactions {
+       for _, tx := range b.block.Transactions {
                txEntries = append(txEntries, tx.Tx)
        }
 
-       block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries)
+       b.block.BlockHeader.BlockCommitment.TransactionsMerkleRoot, err = types.TxMerkleRoot(txEntries)
        if err != nil {
-               return nil, err
+               return err
        }
 
-       block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(txStatus.VerifyStatus)
-
-       _, err = chain.SignBlock(block)
-       return block, err
+       b.block.BlockHeader.BlockCommitment.TransactionStatusHash, err = types.TxStatusMerkleRoot(b.txStatus.VerifyStatus)
+       return err
 }
 
-func createBasicBlock(chain *protocol.Chain, timestamp uint64) *types.Block {
-       preBlockHeader := chain.BestBlockHeader()
-       return &types.Block{
-               BlockHeader: types.BlockHeader{
-                       Version:           1,
-                       Height:            preBlockHeader.Height + 1,
-                       PreviousBlockHash: preBlockHeader.Hash(),
-                       Timestamp:         timestamp,
-                       BlockCommitment:   types.BlockCommitment{},
-                       BlockWitness:      types.BlockWitness{Witness: make([][]byte, consensus.ActiveNetParams.NumOfConsensusNode)},
-               },
-       }
-}
-
-func applyCoinbaseTransaction(chain *protocol.Chain, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) (int64, error) {
-       coinbaseTx, err := createCoinbaseTx(accountManager, chain, chain.BestBlockHeader())
+// createCoinbaseTx returns a coinbase transaction paying an appropriate subsidy
+// based on the passed block height to the provided address.  When the address
+// is nil, the coinbase transaction will instead be redeemable by anyone.
+func (b *blockBuilder) createCoinbaseTx() (*types.Tx, error) {
+       consensusResult, err := b.chain.GetConsensusResultByHash(&b.block.PreviousBlockHash)
        if err != nil {
-               return 0, errors.Wrap(err, "fail on create coinbase tx")
+               return nil, err
        }
 
-       gasState, err := validation.ValidateTx(coinbaseTx.Tx, &bc.Block{BlockHeader: &bc.BlockHeader{Height: chain.BestBlockHeight() + 1}, Transactions: []*bc.Tx{coinbaseTx.Tx}})
+       rewards, err := consensusResult.GetCoinbaseRewards(b.block.Height - 1)
        if err != nil {
-               return 0, err
+               return nil, err
        }
 
-       block.Transactions = append(block.Transactions, coinbaseTx)
-       if err := txStatus.SetStatus(0, false); err != nil {
-               return 0, err
+       return createCoinbaseTxByReward(b.accountManager, b.block.Height, rewards)
+}
+
+func (b *blockBuilder) isTimeout() bool {
+       if b.timeoutFlag {
+               return true
        }
 
-       return gasLeft - gasState.GasUsed, nil
+       select {
+       case <-b.timeoutCh:
+               b.timeoutFlag = true
+       default:
+       }
+       return b.timeoutFlag
 }
 
+func createCoinbaseTxByReward(accountManager *account.Manager, blockHeight uint64, rewards []state.CoinbaseReward) (tx *types.Tx, err error) {
+       arbitrary := append([]byte{0x00}, []byte(strconv.FormatUint(blockHeight, 10))...)
+       var script []byte
+       if accountManager == nil {
+               script, err = vmutil.DefaultCoinbaseProgram()
+       } else {
+               script, err = accountManager.GetCoinbaseControlProgram()
+               arbitrary = append(arbitrary, accountManager.GetCoinbaseArbitrary()...)
+       }
+       if err != nil {
+               return nil, err
+       }
 
-func applyTransactionFromPool(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, gasLeft int64) (int64, error) {
-       poolTxs := getAllTxsFromPool(chain.GetTxPool())
-       results, gasLeft := preValidateTxs(poolTxs, chain, view, gasLeft)
-       for _, result := range results {
-               if result.err != nil && !result.gasOnly {
-                       blkGenSkipTxForErr(chain.GetTxPool(), &result.tx.ID, result.err)
-                       continue
-               }
+       if len(arbitrary) > consensus.ActiveNetParams.CoinbaseArbitrarySizeLimit {
+               return nil, validation.ErrCoinbaseArbitraryOversize
+       }
 
-               if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil {
-                       return 0, err
-               }
+       builder := txbuilder.NewBuilder(time.Now())
+       if err = builder.AddInput(types.NewCoinbaseInput(arbitrary), &txbuilder.SigningInstruction{}); err != nil {
+               return nil, err
+       }
+       if err = builder.AddOutput(types.NewIntraChainOutput(*consensus.BTMAssetID, 0, script)); err != nil {
+               return nil, err
+       }
 
-               block.Transactions = append(block.Transactions, result.tx)
+       for _, r := range rewards {
+               if err = builder.AddOutput(types.NewIntraChainOutput(*consensus.BTMAssetID, r.Amount, r.ControlProgram)); err != nil {
+                       return nil, err
+               }
        }
-       return gasLeft, nil
-}
 
-func applyTransactionFromSubProtocol(chain *protocol.Chain, view *state.UtxoViewpoint, block *types.Block, txStatus *bc.TransactionStatus, accountManager *account.Manager, gasLeft int64) error {
-       txs, err := getTxsFromSubProtocols(chain, accountManager, block.Transactions, gasLeft)
+       _, txData, err := builder.Build()
        if err != nil {
-               return err
+               return nil, err
        }
 
-       results, gasLeft := preValidateTxs(txs, chain, view, gasLeft)
-       for _, result := range results {
-               if result.err != nil {
-                       return err
-               }
-
-               if err := txStatus.SetStatus(len(block.Transactions), result.gasOnly); err != nil {
-                       return err
-               }
+       byteData, err := txData.MarshalText()
+       if err != nil {
+               return nil, err
+       }
 
-               block.Transactions = append(block.Transactions, result.tx)
+       txData.SerializedSize = uint64(len(byteData))
+       tx = &types.Tx{
+               TxData: *txData,
+               Tx:     types.MapTx(txData),
        }
-       return nil
+       return tx, nil
 }
 
 type validateTxResult struct {
@@ -261,43 +330,3 @@ func validateBySubProtocols(tx *types.Tx, statusFail bool, subProtocols []protoc
        }
        return nil
 }
-
-func getAllTxsFromPool(txPool *protocol.TxPool) []*types.Tx {
-       txDescList := txPool.GetTransactions()
-       sort.Sort(byTime(txDescList))
-
-       poolTxs := make([]*types.Tx, len(txDescList))
-       for i, txDesc := range txDescList {
-               poolTxs[i] = txDesc.Tx
-       }
-       return poolTxs
-}
-
-func getTxsFromSubProtocols(chain *protocol.Chain, accountManager *account.Manager, poolTxs []*types.Tx, gasLeft int64) ([]*types.Tx, error) {
-       cp, err := accountManager.GetCoinbaseControlProgram()
-       if err != nil {
-               return nil, err
-       }
-
-       var result []*types.Tx
-       var subTxs []*types.Tx
-       for i, p := range chain.SubProtocols() {
-               if gasLeft <= 0 {
-                       break
-               }
-
-               subTxs, gasLeft, err = p.BeforeProposalBlock(poolTxs, cp, chain.BestBlockHeight() + 1, gasLeft)
-               if err != nil {
-                       log.WithFields(log.Fields{"module": logModule, "index": i, "error": err}).Error("failed on sub protocol txs package")
-                       continue
-               }
-
-               result = append(result, subTxs...)
-       }
-       return result, nil
-}
-
-func blkGenSkipTxForErr(txPool *protocol.TxPool, txHash *bc.Hash, err error) {
-       log.WithFields(log.Fields{"module": logModule, "error": err}).Error("mining block generation: skip tx due to")
-       txPool.RemoveTransaction(txHash)
-}
index 095a588..b67194a 100644 (file)
@@ -22,7 +22,7 @@ const (
 type Protocoler interface {
        Name() string
        StartHeight() uint64
-       BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64) ([]*types.Tx, int64, error)
+       BeforeProposalBlock(txs []*types.Tx, nodeProgram []byte, blockHeight uint64, gasLeft int64, isTimeout func() bool) ([]*types.Tx, error)
        ChainStatus() (uint64, *bc.Hash, error)
        ValidateBlock(block *types.Block, verifyResults []*bc.TxVerifyResult) error
        ValidateTxs(txs []*types.Tx, verifyResults []*bc.TxVerifyResult) error
@@ -238,7 +238,7 @@ func (c *Chain) syncProtocolStatus(subProtocol Protocoler) error {
                        return errors.Wrap(err, subProtocol.Name(), "sub protocol detach block err")
                }
 
-               protocolHeight, protocolHash = block.Height -1, &block.PreviousBlockHash
+               protocolHeight, protocolHash = block.Height-1, &block.PreviousBlockHash
        }
 
        for height := protocolHeight + 1; height <= c.BestBlockHeight(); height++ {
index 392c46e..af58cba 100644 (file)
@@ -3,6 +3,7 @@ package validation
 import (
        "fmt"
        "math"
+       "runtime"
        "sync"
 
        "github.com/vapor/common"
@@ -14,10 +15,6 @@ import (
        "github.com/vapor/protocol/vm"
 )
 
-const (
-       validateWorkerNum = 32
-)
-
 // validate transaction error
 var (
        ErrTxVersion                 = errors.New("invalid transaction version")
@@ -665,6 +662,7 @@ func validateTxWorker(workCh chan *validateTxWork, resultCh chan *ValidateTxResu
 // ValidateTxs validates txs in async mode
 func ValidateTxs(txs []*bc.Tx, block *bc.Block) []*ValidateTxResult {
        txSize := len(txs)
+       validateWorkerNum := runtime.NumCPU()
        //init the goroutine validate worker
        var wg sync.WaitGroup
        workCh := make(chan *validateTxWork, txSize)
index 40365d0..2d24dc7 100644 (file)
@@ -159,7 +159,7 @@ func InsertChain(chain *protocol.Chain, txPool *protocol.TxPool, txs []*types.Tx
                }
        }
 
-       block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6))
+       block, err := proposal.NewBlockTemplate(chain, nil, uint64(time.Now().UnixNano()/1e6), time.Minute)
        if err != nil {
                return err
        }
index b8cbfef..72cb004 100644 (file)
@@ -26,6 +26,6 @@ func BenchmarkNewBlockTpl(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6))
+               proposal.NewBlockTemplate(chain, accountManager, uint64(time.Now().UnixNano()/1e6), time.Minute)
        }
 }