OSDN Git Service

Del txpool
authorYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 27 Aug 2018 09:24:10 +0000 (17:24 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Mon, 27 Aug 2018 09:24:10 +0000 (17:24 +0800)
api/api.go
api/query.go
netsync/handle.go
netsync/tx_keeper.go
node/node.go
protocol/protocol.go
protocol/tx.go
protocol/txpool.go [deleted file]
wallet/unconfirmed.go
wallet/wallet.go

index bd85426..d823186 100644 (file)
@@ -17,8 +17,6 @@ import (
        "github.com/bytom/dashboard"
        "github.com/bytom/equity"
        "github.com/bytom/errors"
-       "github.com/bytom/mining/cpuminer"
-       "github.com/bytom/mining/miningpool"
        "github.com/bytom/net/http/authn"
        "github.com/bytom/net/http/gzip"
        "github.com/bytom/net/http/httpjson"
@@ -111,8 +109,6 @@ type API struct {
        server        *http.Server
        handler       http.Handler
        txFeedTracker *txfeed.Tracker
-       cpuMiner      *cpuminer.CPUMiner
-       miningPool    *miningpool.MiningPool
 }
 
 func (a *API) initServer(config *cfg.Config) {
@@ -169,15 +165,13 @@ func (a *API) StartServer(address string) {
 }
 
 // NewAPI create and initialize the API
-func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
        api := &API{
                sync:          sync,
                wallet:        wallet,
                chain:         chain,
                accessTokens:  token,
                txFeedTracker: txfeeds,
-               cpuMiner:      cpuMiner,
-               miningPool:    miningPool,
        }
        api.buildHandler()
        api.initServer(config)
@@ -208,9 +202,6 @@ func (a *API) buildHandler() {
                m.Handle("/get-mining-address", jsonHandler(a.getMiningAddress))
                m.Handle("/set-mining-address", jsonHandler(a.setMiningAddress))
 
-               m.Handle("/get-coinbase-arbitrary", jsonHandler(a.getCoinbaseArbitrary))
-               m.Handle("/set-coinbase-arbitrary", jsonHandler(a.setCoinbaseArbitrary))
-
                m.Handle("/create-asset", jsonHandler(a.createAsset))
                m.Handle("/update-asset-alias", jsonHandler(a.updateAssetAlias))
                m.Handle("/get-asset", jsonHandler(a.getAsset))
@@ -257,8 +248,8 @@ func (a *API) buildHandler() {
        m.Handle("/submit-transaction", jsonHandler(a.submit))
        m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas))
 
-       m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
-       m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+       //m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
+       //m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
        m.Handle("/decode-raw-transaction", jsonHandler(a.decodeRawTransaction))
 
        m.Handle("/get-block", jsonHandler(a.getBlock))
@@ -268,14 +259,6 @@ func (a *API) buildHandler() {
        m.Handle("/get-difficulty", jsonHandler(a.getDifficulty))
        m.Handle("/get-hash-rate", jsonHandler(a.getHashRate))
 
-       m.Handle("/is-mining", jsonHandler(a.isMining))
-       m.Handle("/set-mining", jsonHandler(a.setMining))
-
-       m.Handle("/get-work", jsonHandler(a.getWork))
-       m.Handle("/get-work-json", jsonHandler(a.getWorkJSON))
-       m.Handle("/submit-work", jsonHandler(a.submitWork))
-       m.Handle("/submit-work-json", jsonHandler(a.submitWorkJSON))
-
        m.Handle("/verify-message", jsonHandler(a.verifyMessage))
        m.Handle("/decode-program", jsonHandler(a.decodeProgram))
        m.Handle("/compile", jsonHandler(a.compileEquity))
index 96a708b..86c66fd 100644 (file)
@@ -147,38 +147,38 @@ func (a *API) listTransactions(ctx context.Context, filter struct {
 }
 
 // POST /get-unconfirmed-transaction
-func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
-       TxID chainjson.HexBytes `json:"tx_id"`
-}) Response {
-       var tmpTxID [32]byte
-       copy(tmpTxID[:], filter.TxID[:])
-
-       txHash := bc.NewHash(tmpTxID)
-       txPool := a.chain.GetTxPool()
-       txDesc, err := txPool.GetTransaction(&txHash)
-       if err != nil {
-               return NewErrorResponse(err)
-       }
-
-       tx := &BlockTx{
-               ID:         txDesc.Tx.ID,
-               Version:    txDesc.Tx.Version,
-               Size:       txDesc.Tx.SerializedSize,
-               TimeRange:  txDesc.Tx.TimeRange,
-               Inputs:     []*query.AnnotatedInput{},
-               Outputs:    []*query.AnnotatedOutput{},
-               StatusFail: false,
-       }
-
-       for i := range txDesc.Tx.Inputs {
-               tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i)))
-       }
-       for i := range txDesc.Tx.Outputs {
-               tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i))
-       }
-
-       return NewSuccessResponse(tx)
-}
+//func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
+//     TxID chainjson.HexBytes `json:"tx_id"`
+//}) Response {
+//     var tmpTxID [32]byte
+//     copy(tmpTxID[:], filter.TxID[:])
+//
+//     txHash := bc.NewHash(tmpTxID)
+//     txPool := a.chain.GetTxPool()
+//     txDesc, err := txPool.GetTransaction(&txHash)
+//     if err != nil {
+//             return NewErrorResponse(err)
+//     }
+//
+//     tx := &BlockTx{
+//             ID:         txDesc.Tx.ID,
+//             Version:    txDesc.Tx.Version,
+//             Size:       txDesc.Tx.SerializedSize,
+//             TimeRange:  txDesc.Tx.TimeRange,
+//             Inputs:     []*query.AnnotatedInput{},
+//             Outputs:    []*query.AnnotatedOutput{},
+//             StatusFail: false,
+//     }
+//
+//     for i := range txDesc.Tx.Inputs {
+//             tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i)))
+//     }
+//     for i := range txDesc.Tx.Outputs {
+//             tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i))
+//     }
+//
+//     return NewSuccessResponse(tx)
+//}
 
 type unconfirmedTxsResp struct {
        Total uint64    `json:"total"`
@@ -186,20 +186,20 @@ type unconfirmedTxsResp struct {
 }
 
 // POST /list-unconfirmed-transactions
-func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
-       txIDs := []bc.Hash{}
-
-       txPool := a.chain.GetTxPool()
-       txs := txPool.GetTransactions()
-       for _, txDesc := range txs {
-               txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID))
-       }
-
-       return NewSuccessResponse(&unconfirmedTxsResp{
-               Total: uint64(len(txIDs)),
-               TxIDs: txIDs,
-       })
-}
+//func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
+//     txIDs := []bc.Hash{}
+//
+//     txPool := a.chain.GetTxPool()
+//     txs := txPool.GetTransactions()
+//     for _, txDesc := range txs {
+//             txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID))
+//     }
+//
+//     return NewSuccessResponse(&unconfirmedTxsResp{
+//             Total: uint64(len(txIDs)),
+//             TxIDs: txIDs,
+//     })
+//}
 
 // RawTx is the tx struct for getRawTransaction
 type RawTx struct {
index 85f0a16..41a3bc7 100644 (file)
@@ -18,7 +18,6 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/discover"
-       core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/version"
@@ -41,7 +40,6 @@ type Chain interface {
        GetHeaderByHeight(uint64) (*types.BlockHeader, error)
        InMainChain(bc.Hash) bool
        ProcessBlock(*types.Block) (bool, error)
-       ValidateTx(*types.Tx) (bool, error)
 }
 
 //SyncManager Sync Manager is responsible for the business layer information synchronization
@@ -49,13 +47,13 @@ type SyncManager struct {
        sw          *p2p.Switch
        genesisHash bc.Hash
 
-       privKey      crypto.PrivKeyEd25519 // local node's p2p key
-       chain        Chain
-       txPool       *core.TxPool
-       blockKeeper  *blockKeeper
-       peers        *peerSet
+       privKey     crypto.PrivKeyEd25519 // local node's p2p key
+       chain       Chain
+       blockKeeper *blockKeeper
+       peers       *peerSet
 
        newTxCh      chan *types.Tx
+       txNotifyCh   chan *types.Tx
        newBlockCh   chan *bc.Hash
        newAddrCh    chan *account.CtrlProgram
        spvAddresses []*account.CtrlProgram
@@ -66,7 +64,7 @@ type SyncManager struct {
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
@@ -75,19 +73,19 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo
        sw := p2p.NewSwitch(config)
        peers := newPeerSet(sw)
        manager := &SyncManager{
-               sw:           sw,
-               genesisHash:  genesisHeader.Hash(),
-               txPool:       txPool,
-               chain:        chain,
-               privKey:      crypto.GenPrivKeyEd25519(),
-               blockKeeper:  newBlockKeeper(chain, peers),
-               peers:        peers,
-               newTxCh:      make(chan *types.Tx, maxTxChanSize),
-               newBlockCh:   newBlockCh,
-               txSyncCh:     make(chan *txSyncMsg),
-               quitSync:     make(chan struct{}),
-               config:       config,
-               newAddrCh:    wallet.AccountMgr.NewAddrCh,
+               sw:          sw,
+               genesisHash: genesisHeader.Hash(),
+               chain:       chain,
+               privKey:     crypto.GenPrivKeyEd25519(),
+               blockKeeper: newBlockKeeper(chain, peers),
+               peers:       peers,
+               newTxCh:     make(chan *types.Tx, maxTxChanSize),
+               txNotifyCh:  wallet.GetTxCh(),
+               newBlockCh:  newBlockCh,
+               txSyncCh:    make(chan *txSyncMsg),
+               quitSync:    make(chan struct{}),
+               config:      config,
+               newAddrCh:   wallet.AccountMgr.NewAddrCh,
        }
        manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
        protocolReactor := NewProtocolReactor(manager, manager.peers)
@@ -295,10 +293,7 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
                sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
                return
        }
-
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
-               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
-       }
+       sm.txNotifyCh <- tx
 }
 
 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
@@ -308,12 +303,6 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        }
 
        switch msg := msg.(type) {
-       //case *GetBlockMessage:
-       //      sm.handleGetBlockMsg(peer, msg)
-       //
-       //case *BlockMessage:
-       //      sm.handleBlockMsg(peer, msg)
-
        case *StatusRequestMessage:
                sm.handleStatusRequestMsg(basePeer)
 
@@ -323,21 +312,9 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *TransactionMessage:
                sm.handleTransactionMsg(peer, msg)
 
-       //case *MineBlockMessage:
-       //      sm.handleMineBlockMsg(peer, msg)
-
-       //case *GetHeadersMessage:
-       //      sm.handleGetHeadersMsg(peer, msg)
-
        case *HeadersMessage:
                sm.handleHeadersMsg(peer, msg)
 
-       //case *GetBlocksMessage:
-       //      sm.handleGetBlocksMsg(peer, msg)
-
-       //case *BlocksMessage:
-       //      sm.handleBlocksMsg(peer, msg)
-
        case *MerkleBlockMessage:
                sm.handleMerkelBlockMsg(peer, msg)
 
index eefe2b8..9246bac 100644 (file)
@@ -19,23 +19,11 @@ type txSyncMsg struct {
        txs    []*types.Tx
 }
 
-func (sm *SyncManager) syncTransactions(peerID string) {
-       pending := sm.txPool.GetTransactions()
-       if len(pending) == 0 {
-               return
-       }
-
-       txs := make([]*types.Tx, len(pending))
-       for i, batch := range pending {
-               txs[i] = batch.Tx
-       }
-       sm.txSyncCh <- &txSyncMsg{peerID, txs}
-}
-
 func (sm *SyncManager) txBroadcastLoop() {
        for {
                select {
                case newTx := <-sm.newTxCh:
+                       sm.txNotifyCh <- newTx
                        if err := sm.peers.broadcastTx(newTx); err != nil {
                                log.Errorf("Broadcast new tx error. %v", err)
                                return
index ce972d2..a4545b0 100644 (file)
@@ -25,8 +25,6 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
-       "github.com/bytom/mining/cpuminer"
-       "github.com/bytom/mining/miningpool"
        "github.com/bytom/mining/tensority"
        "github.com/bytom/netsync"
        "github.com/bytom/protocol"
@@ -47,14 +45,11 @@ type Node struct {
 
        syncManager *netsync.SyncManager
 
-       //bcReactor    *bc.BlockchainReactor
        wallet       *w.Wallet
        accessTokens *accesstoken.CredentialStore
        api          *api.API
        chain        *protocol.Chain
        txfeed       *txfeed.Tracker
-       cpuMiner     *cpuminer.CPUMiner
-       miningPool   *miningpool.MiningPool
        miningEnable bool
 }
 
@@ -72,8 +67,7 @@ func NewNode(config *cfg.Config) *Node {
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       txPool := protocol.NewTxPool(store)
-       chain, err := protocol.NewChain(store, txPool)
+       chain, err := protocol.NewChain(store)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
        }
@@ -96,26 +90,22 @@ func NewNode(config *cfg.Config) *Node {
                cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
        }
 
-       if !config.Wallet.Disable {
-               walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
-               accounts = account.NewManager(walletDB, chain)
-               assets = asset.NewRegistry(walletDB, chain)
-               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
-               if err != nil {
-                       log.WithField("error", err).Error("init NewWallet")
-               }
+       walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+       accounts = account.NewManager(walletDB, chain)
+       assets = asset.NewRegistry(walletDB, chain)
+       wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
+       if err != nil {
+               log.WithField("error", err).Error("init NewWallet")
+       }
 
-               // trigger rescan wallet
-               if config.Wallet.Rescan {
-                       wallet.RescanBlocks()
-               }
+       // trigger rescan wallet
+       if config.Wallet.Rescan {
+               wallet.RescanBlocks()
        }
-       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh, wallet)
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       // get transaction from txPool and send it to syncManager and wallet
-       go newPoolTxListener(txPool, syncManager, wallet)
+       syncManager, _ := netsync.NewSyncManager(config, chain, newBlockCh, wallet)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -137,9 +127,6 @@ func NewNode(config *cfg.Config) *Node {
                miningEnable: config.Mining,
        }
 
-       node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
-       node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
-
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
 
        if config.Simd.Enable {
@@ -149,29 +136,6 @@ func NewNode(config *cfg.Config) *Node {
        return node
 }
 
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
-       txMsgCh := txPool.GetMsgCh()
-       syncManagerTxCh := syncManager.GetNewTxCh()
-
-       for {
-               msg := <-txMsgCh
-               switch msg.MsgType {
-               case protocol.MsgNewTx:
-                       syncManagerTxCh <- msg.Tx
-                       if wallet != nil {
-                               wallet.AddUnconfirmedTx(msg.TxDesc)
-                       }
-               case protocol.MsgRemoveTx:
-                       if wallet != nil {
-                               wallet.RemoveUnconfirmedTx(msg.TxDesc)
-                       }
-               default:
-                       log.Warn("got unknow message type from the txPool channel")
-               }
-       }
-}
-
 // Lock data directory after daemonization
 func lockDataDirectory(config *cfg.Config) error {
        _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
@@ -214,7 +178,7 @@ func launchWebBrowser(port string) {
 }
 
 func (n *Node) initAndstartApiServer() {
-       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.chain, n.config, n.accessTokens)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
        env.Parse()
@@ -222,14 +186,6 @@ func (n *Node) initAndstartApiServer() {
 }
 
 func (n *Node) OnStart() error {
-       if n.miningEnable {
-               if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
-                       n.miningEnable = false
-                       log.Error(err)
-               } else {
-                       n.cpuMiner.Start()
-               }
-       }
        if !n.config.VaultMode {
                n.syncManager.Start()
        }
@@ -246,9 +202,6 @@ func (n *Node) OnStart() error {
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
-       if n.miningEnable {
-               n.cpuMiner.Stop()
-       }
        if !n.config.VaultMode {
                n.syncManager.Stop()
        }
@@ -264,7 +217,3 @@ func (n *Node) RunForever() {
 func (n *Node) SyncManager() *netsync.SyncManager {
        return n.syncManager
 }
-
-func (n *Node) MiningPool() *miningpool.MiningPool {
-       return n.miningPool
-}
index 015cb94..d2cf7b5 100644 (file)
@@ -22,7 +22,6 @@ var ErrTheDistantFuture = errors.New("block height too far in future")
 type Chain struct {
        index          *state.BlockIndex
        orphanManage   *OrphanManage
-       txPool         *TxPool
        store          Store
        processBlockCh chan *processBlockMsg
 
@@ -31,10 +30,9 @@ type Chain struct {
 }
 
 // NewChain returns a new Chain using store as the underlying storage.
-func NewChain(store Store, txPool *TxPool) (*Chain, error) {
+func NewChain(store Store) (*Chain, error) {
        c := &Chain{
                orphanManage:   NewOrphanManage(),
-               txPool:         txPool,
                store:          store,
                processBlockCh: make(chan *processBlockMsg, maxProcessBlockChSize),
        }
@@ -158,7 +156,3 @@ func (c *Chain) BlockWaiter(height uint64) <-chan struct{} {
        return ch
 }
 
-// GetTxPool return chain txpool.
-func (c *Chain) GetTxPool() *TxPool {
-       return c.txPool
-}
index 8198e18..ef07276 100644 (file)
@@ -3,9 +3,6 @@ package protocol
 import (
        "github.com/bytom/errors"
        "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/protocol/state"
-       "github.com/bytom/protocol/validation"
 )
 
 // ErrBadTx is returned for transactions failing validation
@@ -15,27 +12,3 @@ var ErrBadTx = errors.New("invalid transaction")
 func (c *Chain) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
        return c.store.GetTransactionStatus(hash)
 }
-
-// GetTransactionsUtxo return all the utxos that related to the txs' inputs
-func (c *Chain) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
-       return c.store.GetTransactionsUtxo(view, txs)
-}
-
-// ValidateTx validates the given transaction. A cache holds
-// per-transaction validation results and is consulted before
-// performing full validation.
-func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
-       if ok := c.txPool.HaveTransaction(&tx.ID); ok {
-               return false, c.txPool.GetErrCache(&tx.ID)
-       }
-
-       bh := c.BestBlockHeader()
-       block := types.MapBlock(&types.Block{BlockHeader: *bh})
-       gasStatus, err := validation.ValidateTx(tx.Tx, block)
-       if gasStatus.GasValid == false {
-               c.txPool.AddErrCache(&tx.ID, err)
-               return false, err
-       }
-
-       return c.txPool.ProcessTransaction(tx, err != nil, block.BlockHeader.Height, gasStatus.BTMValue)
-}
diff --git a/protocol/txpool.go b/protocol/txpool.go
deleted file mode 100644 (file)
index 6924e5b..0000000
+++ /dev/null
@@ -1,336 +0,0 @@
-package protocol
-
-import (
-       "errors"
-       "sync"
-       "sync/atomic"
-       "time"
-
-       "github.com/golang/groupcache/lru"
-       log "github.com/sirupsen/logrus"
-
-       "github.com/bytom/consensus"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/protocol/state"
-)
-
-// msg type
-const (
-       MsgNewTx = iota
-       MsgRemoveTx
-)
-
-var (
-       maxCachedErrTxs = 1000
-       maxMsgChSize    = 1000
-       maxNewTxNum     = 10000
-       maxOrphanNum    = 2000
-
-       orphanTTL                = 10 * time.Minute
-       orphanExpireScanInterval = 3 * time.Minute
-
-       // ErrTransactionNotExist is the pre-defined error message
-       ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
-       // ErrPoolIsFull indicates the pool is full
-       ErrPoolIsFull = errors.New("transaction pool reach the max number")
-)
-
-// TxDesc store tx and related info for mining strategy
-type TxDesc struct {
-       Tx         *types.Tx
-       Added      time.Time
-       StatusFail bool
-       Height     uint64
-       Weight     uint64
-       Fee        uint64
-}
-
-// TxPoolMsg is use for notify pool changes
-type TxPoolMsg struct {
-       *TxDesc
-       MsgType int
-}
-
-type orphanTx struct {
-       *TxDesc
-       expiration time.Time
-}
-
-// TxPool is use for store the unconfirmed transaction
-type TxPool struct {
-       lastUpdated   int64
-       mtx           sync.RWMutex
-       store         Store
-       pool          map[bc.Hash]*TxDesc
-       utxo          map[bc.Hash]*types.Tx
-       orphans       map[bc.Hash]*orphanTx
-       orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
-       errCache      *lru.Cache
-       msgCh         chan *TxPoolMsg
-}
-
-// NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
-       tp := &TxPool{
-               lastUpdated:   time.Now().Unix(),
-               store:         store,
-               pool:          make(map[bc.Hash]*TxDesc),
-               utxo:          make(map[bc.Hash]*types.Tx),
-               orphans:       make(map[bc.Hash]*orphanTx),
-               orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
-               errCache:      lru.New(maxCachedErrTxs),
-               msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
-       }
-       go tp.orphanExpireWorker()
-       return tp
-}
-
-// AddErrCache add a failed transaction record to lru cache
-func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       tp.errCache.Add(txHash, err)
-}
-
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       for hash, orphan := range tp.orphans {
-               if orphan.expiration.Before(now) {
-                       tp.removeOrphan(&hash)
-               }
-       }
-}
-
-// GetErrCache return the error of the transaction
-func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       v, ok := tp.errCache.Get(txHash)
-       if !ok {
-               return nil
-       }
-       return v.(error)
-}
-
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
-       return tp.msgCh
-}
-
-// RemoveTransaction remove a transaction from the pool
-func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       txD, ok := tp.pool[*txHash]
-       if !ok {
-               return
-       }
-
-       for _, output := range txD.Tx.ResultIds {
-               delete(tp.utxo, *output)
-       }
-       delete(tp.pool, *txHash)
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
-       log.WithField("tx_id", txHash).Debug("remove tx from mempool")
-}
-
-// GetTransaction return the TxDesc by hash
-func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       if txD, ok := tp.pool[*txHash]; ok {
-               return txD, nil
-       }
-       return nil, ErrTransactionNotExist
-}
-
-// GetTransactions return all the transactions in the pool
-func (tp *TxPool) GetTransactions() []*TxDesc {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       txDs := make([]*TxDesc, len(tp.pool))
-       i := 0
-       for _, desc := range tp.pool {
-               txDs[i] = desc
-               i++
-       }
-       return txDs
-}
-
-// IsTransactionInPool check wheather a transaction in pool or not
-func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       _, ok := tp.pool[*txHash]
-       return ok
-}
-
-// IsTransactionInErrCache check wheather a transaction in errCache or not
-func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
-       tp.mtx.RLock()
-       defer tp.mtx.RUnlock()
-
-       _, ok := tp.errCache.Get(txHash)
-       return ok
-}
-
-// HaveTransaction IsTransactionInErrCache check is  transaction in errCache or pool
-func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
-       return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
-}
-
-// ProcessTransaction is the main entry for txpool handle new tx
-func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
-       tp.mtx.Lock()
-       defer tp.mtx.Unlock()
-
-       txD := &TxDesc{
-               Tx:         tx,
-               StatusFail: statusFail,
-               Weight:     tx.SerializedSize,
-               Height:     height,
-               Fee:        fee,
-       }
-       requireParents, err := tp.checkOrphanUtxos(tx)
-       if err != nil {
-               return false, err
-       }
-
-       if len(requireParents) > 0 {
-               return true, tp.addOrphan(txD, requireParents)
-       }
-
-       if err := tp.addTransaction(txD); err != nil {
-               return false, err
-       }
-
-       tp.processOrphans(txD)
-       return false, nil
-}
-
-func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
-       if len(tp.orphans) >= maxOrphanNum {
-               return ErrPoolIsFull
-       }
-
-       orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
-       tp.orphans[txD.Tx.ID] = orphan
-       for _, hash := range requireParents {
-               if _, ok := tp.orphansByPrev[*hash]; !ok {
-                       tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
-               }
-               tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
-       }
-       return nil
-}
-
-func (tp *TxPool) addTransaction(txD *TxDesc) error {
-       if len(tp.pool) >= maxNewTxNum {
-               return ErrPoolIsFull
-       }
-
-       tx := txD.Tx
-       txD.Added = time.Now()
-       tp.pool[tx.ID] = txD
-       for _, id := range tx.ResultIds {
-               output, err := tx.Output(*id)
-               if err != nil {
-                       // error due to it's a retirement, utxo doesn't care this output type so skip it
-                       continue
-               }
-               if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
-                       tp.utxo[*id] = tx
-               }
-       }
-
-       atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
-       log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
-       return nil
-}
-
-func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
-       view := state.NewUtxoViewpoint()
-       if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
-               return nil, err
-       }
-
-       hashes := []*bc.Hash{}
-       for _, hash := range tx.SpentOutputIDs {
-               if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
-                       hashes = append(hashes, &hash)
-               }
-       }
-       return hashes, nil
-}
-
-func (tp *TxPool) orphanExpireWorker() {
-       ticker := time.NewTicker(orphanExpireScanInterval)
-       for now := range ticker.C {
-               tp.ExpireOrphan(now)
-       }
-}
-
-func (tp *TxPool) processOrphans(txD *TxDesc) {
-       processOrphans := []*orphanTx{}
-       addRely := func(tx *types.Tx) {
-               for _, outHash := range tx.ResultIds {
-                       orphans, ok := tp.orphansByPrev[*outHash]
-                       if !ok {
-                               continue
-                       }
-
-                       for _, orphan := range orphans {
-                               processOrphans = append(processOrphans, orphan)
-                       }
-                       delete(tp.orphansByPrev, *outHash)
-               }
-       }
-
-       addRely(txD.Tx)
-       for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
-               processOrphan := processOrphans[0]
-               requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
-               if err != nil {
-                       log.WithField("err", err).Error("processOrphans got unexpect error")
-                       continue
-               }
-
-               if len(requireParents) == 0 {
-                       addRely(processOrphan.Tx)
-                       tp.removeOrphan(&processOrphan.Tx.ID)
-                       tp.addTransaction(processOrphan.TxDesc)
-               }
-       }
-}
-
-func (tp *TxPool) removeOrphan(hash *bc.Hash) {
-       orphan, ok := tp.orphans[*hash]
-       if !ok {
-               return
-       }
-
-       for _, spend := range orphan.Tx.SpentOutputIDs {
-               orphans, ok := tp.orphansByPrev[spend]
-               if !ok {
-                       continue
-               }
-
-               if delete(orphans, *hash); len(orphans) == 0 {
-                       delete(tp.orphansByPrev, spend)
-               }
-       }
-       delete(tp.orphans, *hash)
-}
index 771d7dd..68fe5cb 100644 (file)
@@ -11,7 +11,6 @@ import (
        "github.com/bytom/account"
        "github.com/bytom/blockchain/query"
        "github.com/bytom/crypto/sha3pool"
-       "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc/types"
 )
 
@@ -34,14 +33,10 @@ func (a SortByTimestamp) Swap(i, j int)      { a[i], a[j] = a[j], a[i] }
 func (a SortByTimestamp) Less(i, j int) bool { return a[i].Timestamp > a[j].Timestamp }
 
 // AddUnconfirmedTx handle wallet status update when tx add into txpool
-func (w *Wallet) AddUnconfirmedTx(txD *protocol.TxDesc) {
-       if err := w.saveUnconfirmedTx(txD.Tx); err != nil {
+func (w *Wallet) AddUnconfirmedTx(tx *types.Tx) {
+       if err := w.saveUnconfirmedTx(tx); err != nil {
                log.WithField("err", err).Error("wallet fail on saveUnconfirmedTx")
        }
-
-       utxos := txOutToUtxos(txD.Tx, txD.StatusFail, 0)
-       utxos = w.filterAccountUtxo(utxos)
-       w.AccountMgr.AddUnconfirmedUtxo(utxos)
 }
 
 // GetUnconfirmedTxs get account unconfirmed transactions, filter transactions by accountID when accountID is not empty
@@ -83,8 +78,8 @@ func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error)
 }
 
 // RemoveUnconfirmedTx handle wallet status update when tx removed from txpool
-func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) {
-       w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds)
+func (w *Wallet) RemoveUnconfirmedTx(tx *types.Tx) {
+       w.AccountMgr.RemoveUnconfirmedUtxo(tx.ResultIds)
 }
 
 func (w *Wallet) buildAnnotatedUnconfirmedTx(tx *types.Tx) *query.AnnotatedTx {
@@ -176,3 +171,11 @@ func (w *Wallet) delUnconfirmedTx() {
                }
        }
 }
+
+// newTxListener listener transaction from txPool, and send it to syncManager and wallet
+func (w *Wallet) newTxListener() {
+       for {
+               tx := <-w.newTxCh
+               w.AddUnconfirmedTx(tx)
+       }
+}
index c00651a..d64a075 100644 (file)
@@ -18,6 +18,8 @@ import (
 const (
        //SINGLE single sign
        SINGLE = 1
+       //channel size for notifying tx msg
+       NewTxChSize = 1024
 )
 
 var walletKey = []byte("walletInfo")
@@ -40,6 +42,7 @@ type Wallet struct {
        Hsm        *pseudohsm.HSM
        chain      *protocol.Chain
        rescanCh   chan struct{}
+       newTxCh    chan *types.Tx
 }
 
 //NewWallet return a new wallet instance
@@ -51,6 +54,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
                chain:      chain,
                Hsm:        hsm,
                rescanCh:   make(chan struct{}, 1),
+               newTxCh:    make(chan *types.Tx, NewTxChSize),
        }
 
        if err := w.loadWalletInfo(); err != nil {
@@ -58,6 +62,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
        }
 
        go w.walletUpdater()
+       go w.newTxListener()
        go w.delUnconfirmedTx()
        return w, nil
 }
@@ -212,3 +217,7 @@ func (w *Wallet) GetWalletStatusInfo() StatusInfo {
 
        return w.status
 }
+
+func (w *Wallet) GetTxCh() chan *types.Tx {
+       return w.newTxCh
+}