OSDN Git Service

Add mempool new tx subscription support (#1578)
authoryahtoo <yahtoo.ma@gmail.com>
Fri, 22 Feb 2019 06:05:08 +0000 (14:05 +0800)
committerPaladz <yzhu101@uottawa.ca>
Fri, 22 Feb 2019 06:05:08 +0000 (14:05 +0800)
* Add mempool new tx subscription support

* Fix test file error

* Add test file

* Add test file

* Opz code format

* Del invalid test file

16 files changed:
account/accounts_test.go
asset/asset_test.go
net/websocket/wsnotificationmaneger.go
netsync/handle.go
netsync/tx_keeper.go
node/node.go
p2p/switch_test.go
protocol/txpool.go
protocol/txpool_test.go
test/bench_blockchain_test.go
test/util.go
test/wallet_test_util.go
wallet/unconfirmed.go
wallet/unconfirmed_test.go
wallet/wallet.go
wallet/wallet_test.go

index 7e4137d..7cb23b1 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/errors"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/testutil"
 )
@@ -211,11 +212,11 @@ func mockAccountManager(t *testing.T) *Manager {
        }
        defer os.RemoveAll(dirPath)
 
-       testDB := dbm.NewDB("testdb", "memdb", "temp")
-       defer os.RemoveAll("temp")
+       testDB := dbm.NewDB("testdb", "memdb", dirPath)
+       dispatcher := event.NewDispatcher()
 
        store := leveldb.NewStore(testDB)
-       txPool := protocol.NewTxPool(store)
+       txPool := protocol.NewTxPool(store, dispatcher)
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                t.Fatal(err)
index 2f17a31..17096ac 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/testutil"
 )
@@ -152,7 +153,8 @@ func TestListAssets(t *testing.T) {
 
 func mockChain(testDB dbm.DB) (*protocol.Chain, error) {
        store := leveldb.NewStore(testDB)
-       txPool := protocol.NewTxPool(store)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                return nil, err
index ff1e05f..fb17133 100644 (file)
@@ -7,6 +7,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -87,10 +88,12 @@ type WSNotificationManager struct {
        maxNumConcurrentReqs int
        status               statusInfo
        chain                *protocol.Chain
+       eventDispatcher      *event.Dispatcher
+       txMsgSub             *event.Subscription
 }
 
 // NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
-func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
        // init status
        var status statusInfo
        header := chain.BestBlockHeader()
@@ -106,6 +109,7 @@ func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, ch
                maxNumConcurrentReqs: maxNumConcurrentReqs,
                status:               status,
                chain:                chain,
+               eventDispatcher:      dispatcher,
        }
 }
 
@@ -214,14 +218,32 @@ func (m *WSNotificationManager) NotifyBlockDisconnected(block *types.Block) {
        }
 }
 
-// NotifyMempoolTx passes a transaction desc accepted by mempool to the
-// notification manager for transaction notification processing.  If
-// isNew is true, the tx is is a new transaction, rather than one
-// added to the mempool during a reorg.
-func (m *WSNotificationManager) NotifyMempoolTx(txDesc *protocol.TxDesc) {
-       select {
-       case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(txDesc):
-       case <-m.quit:
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+       for {
+               select {
+               case obj, ok := <-m.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+                               return
+                       }
+
+                       ev, ok := obj.Data.(protocol.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if ev.TxMsg.MsgType == protocol.MsgNewTx {
+                               select {
+                               case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+                               default:
+                               }
+                       }
+               case <-m.quit:
+                       return
+               }
        }
 }
 
@@ -437,11 +459,19 @@ func (m *WSNotificationManager) blockWaiter() {
 }
 
 // Start starts the goroutines required for the manager to queue and process websocket client notifications.
-func (m *WSNotificationManager) Start() {
-       m.wg.Add(3)
+func (m *WSNotificationManager) Start() error {
+       var err error
+       m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+       if err != nil {
+               return err
+       }
+
+       m.wg.Add(4)
        go m.blockNotify()
        go m.queueHandler()
        go m.notificationHandler()
+       go m.memPoolTxQueryLoop()
+       return nil
 }
 
 // WaitForShutdown blocks until all notification manager goroutines have finished.
index df4d54f..e8857c0 100644 (file)
@@ -64,13 +64,13 @@ type SyncManager struct {
        blockKeeper  *blockKeeper
        peers        *peerSet
 
-       newTxCh  chan *types.Tx
        txSyncCh chan *txSyncMsg
        quitSync chan struct{}
        config   *cfg.Config
 
        eventDispatcher *event.Dispatcher
        minedBlockSub   *event.Subscription
+       txMsgSub        *event.Subscription
 }
 
 // CreateSyncManager create sync manager and set switch.
@@ -98,7 +98,6 @@ func newSyncManager(config *cfg.Config, sw Switch, chain Chain, txPool *core.TxP
                blockFetcher:    newBlockFetcher(chain, peers),
                blockKeeper:     newBlockKeeper(chain, peers),
                peers:           peers,
-               newTxCh:         make(chan *types.Tx, maxTxChanSize),
                txSyncCh:        make(chan *txSyncMsg),
                quitSync:        make(chan struct{}),
                config:          config,
@@ -129,11 +128,6 @@ func (sm *SyncManager) DialPeerWithAddress(addr *p2p.NetAddress) error {
        return sm.sw.DialPeerWithAddress(addr)
 }
 
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
-       return sm.newTxCh
-}
-
 func (sm *SyncManager) GetNetwork() string {
        return sm.config.ChainID
 }
@@ -448,14 +442,18 @@ func (sm *SyncManager) Start() error {
                return err
        }
 
-       // broadcast transactions
-       go sm.txBroadcastLoop()
-
        sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
        if err != nil {
                return err
        }
 
+       sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       if err != nil {
+               return err
+       }
+
+       // broadcast transactions
+       go sm.txBroadcastLoop()
        go sm.minedBroadcastLoop()
        go sm.txSyncLoop()
 
index 2770ce3..e1b5fbb 100644 (file)
@@ -5,6 +5,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
+       core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc/types"
 )
 
@@ -35,11 +36,24 @@ func (sm *SyncManager) syncTransactions(peerID string) {
 func (sm *SyncManager) txBroadcastLoop() {
        for {
                select {
-               case newTx := <-sm.newTxCh:
-                       if err := sm.peers.broadcastTx(newTx); err != nil {
-                               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
+               case obj, ok := <-sm.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
                                return
                        }
+
+                       ev, ok := obj.Data.(core.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       if ev.TxMsg.MsgType == core.MsgNewTx {
+                               if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
+                                       log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
+                                       continue
+                               }
+                       }
                case <-sm.quitSync:
                        return
                }
index f75814d..fbc2c94 100644 (file)
@@ -76,7 +76,8 @@ func NewNode(config *cfg.Config) *Node {
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       txPool := protocol.NewTxPool(store)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
@@ -104,7 +105,7 @@ func NewNode(config *cfg.Config) *Node {
                walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
                accounts = account.NewManager(walletDB, chain)
                assets = asset.NewRegistry(walletDB, chain)
-               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
+               wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher)
                if err != nil {
                        log.WithField("error", err).Error("init NewWallet")
                }
@@ -115,16 +116,12 @@ func NewNode(config *cfg.Config) *Node {
                }
        }
 
-       dispatcher := event.NewDispatcher()
        syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher)
        if err != nil {
                cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
        }
 
-       notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
-
-       // get transaction from txPool and send it to syncManager and wallet
-       go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
+       notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -163,30 +160,6 @@ func NewNode(config *cfg.Config) *Node {
        return node
 }
 
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
-       txMsgCh := txPool.GetMsgCh()
-       syncManagerTxCh := syncManager.GetNewTxCh()
-
-       for {
-               msg := <-txMsgCh
-               switch msg.MsgType {
-               case protocol.MsgNewTx:
-                       syncManagerTxCh <- msg.Tx
-                       if wallet != nil {
-                               wallet.AddUnconfirmedTx(msg.TxDesc)
-                       }
-                       notificationMgr.NotifyMempoolTx(msg.TxDesc)
-               case protocol.MsgRemoveTx:
-                       if wallet != nil {
-                               wallet.RemoveUnconfirmedTx(msg.TxDesc)
-                       }
-               default:
-                       log.Warn("got unknow message type from the txPool channel")
-               }
-       }
-}
-
 // Lock data directory after daemonization
 func lockDataDirectory(config *cfg.Config) error {
        _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
@@ -256,7 +229,10 @@ func (n *Node) OnStart() error {
        }
 
        n.initAndstartAPIServer()
-       n.notificationMgr.Start()
+       if err := n.notificationMgr.Start(); err != nil {
+               return err
+       }
+
        if !n.config.Web.Closed {
                _, port, err := net.SplitHostPort(n.config.ApiAddress)
                if err != nil {
index 3851521..642a52e 100644 (file)
@@ -230,7 +230,7 @@ func TestDuplicateInBoundPeer(t *testing.T) {
        }
 
        if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
-               t.Fatal("TestSwitchAddInboundPeer peer size error", outbound, inbound, dialing)
+               t.Fatal("TestDuplicateInBoundPeer peer size error", outbound, inbound, dialing)
        }
 }
 
@@ -266,12 +266,12 @@ func TestAddInboundPeer(t *testing.T) {
        }
 
        if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
-               t.Fatal("TestSwitchAddInboundPeer peer size error")
+               t.Fatal("TestAddInboundPeer peer size error")
        }
        inp2 := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
 
        if err := inp2.dial(addr); err == nil {
-               t.Fatal("TestSwitchAddInboundPeer MaxNumPeers limit error")
+               t.Fatal("TestAddInboundPeer MaxNumPeers limit error")
        }
 }
 
@@ -307,16 +307,16 @@ func TestStopPeer(t *testing.T) {
        }
 
        if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
-               t.Fatal("TestSwitchAddInboundPeer peer size error")
+               t.Fatal("TestStopPeer peer size error")
        }
 
        s1.StopPeerGracefully(s1.peers.list[0].Key)
        if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
-               t.Fatal("TestSwitchAddInboundPeer peer size error")
+               t.Fatal("TestStopPeer peer size error")
        }
 
        s1.StopPeerForError(s1.peers.list[0], "stop for test")
        if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
-               t.Fatal("TestSwitchAddInboundPeer peer size error")
+               t.Fatal("TestStopPeer peer size error")
        }
 }
index 605e718..0916f16 100644 (file)
@@ -10,6 +10,7 @@ import (
        log "github.com/sirupsen/logrus"
 
        "github.com/bytom/consensus"
+       "github.com/bytom/event"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/protocol/state"
@@ -36,6 +37,8 @@ var (
        ErrPoolIsFull = errors.New("transaction pool reach the max number")
 )
 
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
 // TxDesc store tx and related info for mining strategy
 type TxDesc struct {
        Tx         *types.Tx `json:"transaction"`
@@ -59,28 +62,28 @@ type orphanTx struct {
 
 // TxPool is use for store the unconfirmed transaction
 type TxPool struct {
-       lastUpdated   int64
-       mtx           sync.RWMutex
-       store         Store
-       pool          map[bc.Hash]*TxDesc
-       utxo          map[bc.Hash]*types.Tx
-       orphans       map[bc.Hash]*orphanTx
-       orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
-       errCache      *lru.Cache
-       msgCh         chan *TxPoolMsg
+       lastUpdated     int64
+       mtx             sync.RWMutex
+       store           Store
+       pool            map[bc.Hash]*TxDesc
+       utxo            map[bc.Hash]*types.Tx
+       orphans         map[bc.Hash]*orphanTx
+       orphansByPrev   map[bc.Hash]map[bc.Hash]*orphanTx
+       errCache        *lru.Cache
+       eventDispatcher *event.Dispatcher
 }
 
 // NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
        tp := &TxPool{
-               lastUpdated:   time.Now().Unix(),
-               store:         store,
-               pool:          make(map[bc.Hash]*TxDesc),
-               utxo:          make(map[bc.Hash]*types.Tx),
-               orphans:       make(map[bc.Hash]*orphanTx),
-               orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
-               errCache:      lru.New(maxCachedErrTxs),
-               msgCh:         make(chan *TxPoolMsg, maxMsgChSize),
+               lastUpdated:     time.Now().Unix(),
+               store:           store,
+               pool:            make(map[bc.Hash]*TxDesc),
+               utxo:            make(map[bc.Hash]*types.Tx),
+               orphans:         make(map[bc.Hash]*orphanTx),
+               orphansByPrev:   make(map[bc.Hash]map[bc.Hash]*orphanTx),
+               errCache:        lru.New(maxCachedErrTxs),
+               eventDispatcher: dispatcher,
        }
        go tp.orphanExpireWorker()
        return tp
@@ -118,11 +121,6 @@ func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
        return v.(error)
 }
 
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
-       return tp.msgCh
-}
-
 // RemoveTransaction remove a transaction from the pool
 func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        tp.mtx.Lock()
@@ -139,7 +137,7 @@ func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
        delete(tp.pool, *txHash)
 
        atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
+       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
        log.WithField("tx_id", txHash).Debug("remove tx from mempool")
 }
 
@@ -256,7 +254,7 @@ func (tp *TxPool) addTransaction(txD *TxDesc) error {
        }
 
        atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
-       tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
+       tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
        log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
        return nil
 }
index 1d7af8d..34986ac 100644 (file)
@@ -6,6 +6,7 @@ import (
 
        "github.com/bytom/consensus"
        "github.com/bytom/database/storage"
+       "github.com/bytom/event"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/protocol/state"
@@ -91,15 +92,15 @@ func TestAddOrphan(t *testing.T) {
                        },
                        after: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
                                                        },
@@ -113,15 +114,15 @@ func TestAddOrphan(t *testing.T) {
                {
                        before: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
                                                        },
@@ -131,25 +132,25 @@ func TestAddOrphan(t *testing.T) {
                        },
                        after: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
                                                },
                                        },
-                                       testTxs[1].ID: &orphanTx{
+                                       testTxs[1].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[1],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
                                                        },
                                                },
-                                               testTxs[1].ID: &orphanTx{
+                                               testTxs[1].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[1],
                                                        },
@@ -167,15 +168,15 @@ func TestAddOrphan(t *testing.T) {
                        },
                        after: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[2].ID: &orphanTx{
+                                       testTxs[2].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[2],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[2].SpentOutputIDs[1]: map[bc.Hash]*orphanTx{
-                                               testTxs[2].ID: &orphanTx{
+                                       testTxs[2].SpentOutputIDs[1]: {
+                                               testTxs[2].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[2],
                                                        },
@@ -205,6 +206,7 @@ func TestAddOrphan(t *testing.T) {
 }
 
 func TestAddTransaction(t *testing.T) {
+       dispatcher := event.NewDispatcher()
        cases := []struct {
                before *TxPool
                after  *TxPool
@@ -212,13 +214,13 @@ func TestAddTransaction(t *testing.T) {
        }{
                {
                        before: &TxPool{
-                               pool:  map[bc.Hash]*TxDesc{},
-                               utxo:  map[bc.Hash]*types.Tx{},
-                               msgCh: make(chan *TxPoolMsg, 1),
+                               pool:            map[bc.Hash]*TxDesc{},
+                               utxo:            map[bc.Hash]*types.Tx{},
+                               eventDispatcher: dispatcher,
                        },
                        after: &TxPool{
                                pool: map[bc.Hash]*TxDesc{
-                                       testTxs[2].ID: &TxDesc{
+                                       testTxs[2].ID: {
                                                Tx:         testTxs[2],
                                                StatusFail: false,
                                        },
@@ -235,13 +237,13 @@ func TestAddTransaction(t *testing.T) {
                },
                {
                        before: &TxPool{
-                               pool:  map[bc.Hash]*TxDesc{},
-                               utxo:  map[bc.Hash]*types.Tx{},
-                               msgCh: make(chan *TxPoolMsg, 1),
+                               pool:            map[bc.Hash]*TxDesc{},
+                               utxo:            map[bc.Hash]*types.Tx{},
+                               eventDispatcher: dispatcher,
                        },
                        after: &TxPool{
                                pool: map[bc.Hash]*TxDesc{
-                                       testTxs[2].ID: &TxDesc{
+                                       testTxs[2].ID: {
                                                Tx:         testTxs[2],
                                                StatusFail: true,
                                        },
@@ -274,13 +276,13 @@ func TestAddTransaction(t *testing.T) {
 func TestExpireOrphan(t *testing.T) {
        before := &TxPool{
                orphans: map[bc.Hash]*orphanTx{
-                       testTxs[0].ID: &orphanTx{
+                       testTxs[0].ID: {
                                expiration: time.Unix(1533489701, 0),
                                TxDesc: &TxDesc{
                                        Tx: testTxs[0],
                                },
                        },
-                       testTxs[1].ID: &orphanTx{
+                       testTxs[1].ID: {
                                expiration: time.Unix(1633489701, 0),
                                TxDesc: &TxDesc{
                                        Tx: testTxs[1],
@@ -288,14 +290,14 @@ func TestExpireOrphan(t *testing.T) {
                        },
                },
                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                               testTxs[0].ID: &orphanTx{
+                       testTxs[0].SpentOutputIDs[0]: {
+                               testTxs[0].ID: {
                                        expiration: time.Unix(1533489701, 0),
                                        TxDesc: &TxDesc{
                                                Tx: testTxs[0],
                                        },
                                },
-                               testTxs[1].ID: &orphanTx{
+                               testTxs[1].ID: {
                                        expiration: time.Unix(1633489701, 0),
                                        TxDesc: &TxDesc{
                                                Tx: testTxs[1],
@@ -307,7 +309,7 @@ func TestExpireOrphan(t *testing.T) {
 
        want := &TxPool{
                orphans: map[bc.Hash]*orphanTx{
-                       testTxs[1].ID: &orphanTx{
+                       testTxs[1].ID: {
                                expiration: time.Unix(1633489701, 0),
                                TxDesc: &TxDesc{
                                        Tx: testTxs[1],
@@ -315,8 +317,8 @@ func TestExpireOrphan(t *testing.T) {
                        },
                },
                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                               testTxs[1].ID: &orphanTx{
+                       testTxs[0].SpentOutputIDs[0]: {
+                               testTxs[1].ID: {
                                        expiration: time.Unix(1633489701, 0),
                                        TxDesc: &TxDesc{
                                                Tx: testTxs[1],
@@ -333,6 +335,7 @@ func TestExpireOrphan(t *testing.T) {
 }
 
 func TestProcessOrphans(t *testing.T) {
+       dispatcher := event.NewDispatcher()
        cases := []struct {
                before    *TxPool
                after     *TxPool
@@ -340,29 +343,29 @@ func TestProcessOrphans(t *testing.T) {
        }{
                {
                        before: &TxPool{
-                               pool: map[bc.Hash]*TxDesc{},
-                               utxo: map[bc.Hash]*types.Tx{},
+                               pool:            map[bc.Hash]*TxDesc{},
+                               utxo:            map[bc.Hash]*types.Tx{},
+                               eventDispatcher: dispatcher,
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[3].ID: &orphanTx{
+                                       testTxs[3].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[3],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[3].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[3].ID: &orphanTx{
+                                       testTxs[3].SpentOutputIDs[0]: {
+                                               testTxs[3].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[3],
                                                        },
                                                },
                                        },
                                },
-                               msgCh: make(chan *TxPoolMsg, 10),
                        },
                        after: &TxPool{
                                pool: map[bc.Hash]*TxDesc{
-                                       testTxs[3].ID: &TxDesc{
+                                       testTxs[3].ID: {
                                                Tx:         testTxs[3],
                                                StatusFail: false,
                                        },
@@ -371,52 +374,53 @@ func TestProcessOrphans(t *testing.T) {
                                        *testTxs[3].ResultIds[0]: testTxs[3],
                                        *testTxs[3].ResultIds[1]: testTxs[3],
                                },
-                               orphans:       map[bc.Hash]*orphanTx{},
-                               orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
+                               eventDispatcher: dispatcher,
+                               orphans:         map[bc.Hash]*orphanTx{},
+                               orphansByPrev:   map[bc.Hash]map[bc.Hash]*orphanTx{},
                        },
                        processTx: &TxDesc{Tx: testTxs[2]},
                },
                {
                        before: &TxPool{
-                               pool: map[bc.Hash]*TxDesc{},
-                               utxo: map[bc.Hash]*types.Tx{},
+                               pool:            map[bc.Hash]*TxDesc{},
+                               utxo:            map[bc.Hash]*types.Tx{},
+                               eventDispatcher: dispatcher,
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[3].ID: &orphanTx{
+                                       testTxs[3].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[3],
                                                },
                                        },
-                                       testTxs[4].ID: &orphanTx{
+                                       testTxs[4].ID: {
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[4],
                                                },
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[3].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[3].ID: &orphanTx{
+                                       testTxs[3].SpentOutputIDs[0]: {
+                                               testTxs[3].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[3],
                                                        },
                                                },
                                        },
-                                       testTxs[4].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[4].ID: &orphanTx{
+                                       testTxs[4].SpentOutputIDs[0]: {
+                                               testTxs[4].ID: {
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[4],
                                                        },
                                                },
                                        },
                                },
-                               msgCh: make(chan *TxPoolMsg, 10),
                        },
                        after: &TxPool{
                                pool: map[bc.Hash]*TxDesc{
-                                       testTxs[3].ID: &TxDesc{
+                                       testTxs[3].ID: {
                                                Tx:         testTxs[3],
                                                StatusFail: false,
                                        },
-                                       testTxs[4].ID: &TxDesc{
+                                       testTxs[4].ID: {
                                                Tx:         testTxs[4],
                                                StatusFail: false,
                                        },
@@ -427,8 +431,9 @@ func TestProcessOrphans(t *testing.T) {
                                        *testTxs[4].ResultIds[0]: testTxs[4],
                                        *testTxs[4].ResultIds[1]: testTxs[4],
                                },
-                               orphans:       map[bc.Hash]*orphanTx{},
-                               orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
+                               eventDispatcher: dispatcher,
+                               orphans:         map[bc.Hash]*orphanTx{},
+                               orphansByPrev:   map[bc.Hash]map[bc.Hash]*orphanTx{},
                        },
                        processTx: &TxDesc{Tx: testTxs[2]},
                },
@@ -440,7 +445,6 @@ func TestProcessOrphans(t *testing.T) {
                c.before.processOrphans(c.processTx)
                c.before.RemoveTransaction(&c.processTx.Tx.ID)
                c.before.store = nil
-               c.before.msgCh = nil
                c.before.lastUpdated = 0
                for _, txD := range c.before.pool {
                        txD.Added = time.Time{}
@@ -461,7 +465,7 @@ func TestRemoveOrphan(t *testing.T) {
                {
                        before: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                expiration: time.Unix(1533489701, 0),
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
@@ -469,8 +473,8 @@ func TestRemoveOrphan(t *testing.T) {
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        expiration: time.Unix(1533489701, 0),
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
@@ -490,13 +494,13 @@ func TestRemoveOrphan(t *testing.T) {
                {
                        before: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                expiration: time.Unix(1533489701, 0),
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
                                                },
                                        },
-                                       testTxs[1].ID: &orphanTx{
+                                       testTxs[1].ID: {
                                                expiration: time.Unix(1533489701, 0),
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[1],
@@ -504,14 +508,14 @@ func TestRemoveOrphan(t *testing.T) {
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        expiration: time.Unix(1533489701, 0),
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
                                                        },
                                                },
-                                               testTxs[1].ID: &orphanTx{
+                                               testTxs[1].ID: {
                                                        expiration: time.Unix(1533489701, 0),
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[1],
@@ -522,7 +526,7 @@ func TestRemoveOrphan(t *testing.T) {
                        },
                        after: &TxPool{
                                orphans: map[bc.Hash]*orphanTx{
-                                       testTxs[0].ID: &orphanTx{
+                                       testTxs[0].ID: {
                                                expiration: time.Unix(1533489701, 0),
                                                TxDesc: &TxDesc{
                                                        Tx: testTxs[0],
@@ -530,8 +534,8 @@ func TestRemoveOrphan(t *testing.T) {
                                        },
                                },
                                orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
-                                       testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
-                                               testTxs[0].ID: &orphanTx{
+                                       testTxs[0].SpentOutputIDs[0]: {
+                                               testTxs[0].ID: {
                                                        expiration: time.Unix(1533489701, 0),
                                                        TxDesc: &TxDesc{
                                                                Tx: testTxs[0],
index 5a3192e..307e624 100644 (file)
@@ -18,6 +18,7 @@ import (
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/database/storage"
+       "github.com/bytom/event"
        "github.com/bytom/mining"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
@@ -139,7 +140,8 @@ func GenerateChainData(dirPath string, testDB dbm.DB, txNumber, otherAssetNum in
        }
 
        store := leveldb.NewStore(testDB)
-       txPool := protocol.NewTxPool(store)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
                return nil, nil, nil, err
@@ -191,11 +193,6 @@ func InsertChain(chain *protocol.Chain, txPool *protocol.TxPool, txs []*types.Tx
 }
 
 func processNewTxch(txPool *protocol.TxPool) {
-       newTxCh := txPool.GetMsgCh()
-       for tx := range newTxCh {
-               if tx == nil {
-               }
-       }
 }
 
 func SolveBlock(seed *bc.Hash, block *types.Block) error {
index 3e927eb..e062dc8 100644 (file)
@@ -12,6 +12,7 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -26,7 +27,8 @@ const (
 // MockChain mock chain with genesis block
 func MockChain(testDB dbm.DB) (*protocol.Chain, *leveldb.Store, *protocol.TxPool, error) {
        store := leveldb.NewStore(testDB)
-       txPool := protocol.NewTxPool(store)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
        chain, err := protocol.NewChain(store, txPool)
        return chain, store, txPool, err
 }
index 1d35ee4..6d486a3 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/signers"
        "github.com/bytom/crypto/ed25519/chainkd"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc/types"
        w "github.com/bytom/wallet"
@@ -259,7 +260,8 @@ func (cfg *walletTestConfig) Run() error {
        walletDB := dbm.NewDB("wallet", "leveldb", path.Join(dirPath, "wallet_db"))
        accountManager := account.NewManager(walletDB, chain)
        assets := asset.NewRegistry(walletDB, chain)
-       wallet, err := w.NewWallet(walletDB, accountManager, assets, hsm, chain)
+       dispatcher := event.NewDispatcher()
+       wallet, err := w.NewWallet(walletDB, accountManager, assets, hsm, chain, dispatcher)
        if err != nil {
                return err
        }
index 771d7dd..2aba1e6 100644 (file)
@@ -84,6 +84,10 @@ func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error)
 
 // RemoveUnconfirmedTx handle wallet status update when tx removed from txpool
 func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) {
+       if !w.checkRelatedTransaction(txD.Tx) {
+               return
+       }
+       w.DB.Delete(calcUnconfirmedTxKey(txD.Tx.ID.String()))
        w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds)
 }
 
index 573822a..f8f8979 100644 (file)
@@ -14,6 +14,7 @@ import (
        "github.com/bytom/blockchain/signers"
        "github.com/bytom/consensus"
        "github.com/bytom/crypto/ed25519/chainkd"
+       "github.com/bytom/event"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/testutil"
 )
@@ -57,7 +58,8 @@ func TestWalletUnconfirmedTxs(t *testing.T) {
                t.Fatal(err)
        }
 
-       w := mockWallet(testDB, accountManager, reg, nil)
+       dispatcher := event.NewDispatcher()
+       w := mockWallet(testDB, accountManager, reg, nil, dispatcher)
        utxos := []*account.UTXO{}
        btmUtxo := mockUTXO(controlProg, consensus.BTMAssetID)
        utxos = append(utxos, btmUtxo)
index 6b6f20a..08b2f56 100644 (file)
@@ -10,6 +10,7 @@ import (
        "github.com/bytom/account"
        "github.com/bytom/asset"
        "github.com/bytom/blockchain/pseudohsm"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -17,7 +18,8 @@ import (
 
 const (
        //SINGLE single sign
-       SINGLE = 1
+       SINGLE    = 1
+       logModule = "wallet"
 )
 
 var walletKey = []byte("walletInfo")
@@ -32,27 +34,31 @@ type StatusInfo struct {
 
 //Wallet is related to storing account unspent outputs
 type Wallet struct {
-       DB          db.DB
-       rw          sync.RWMutex
-       status      StatusInfo
-       AccountMgr  *account.Manager
-       AssetReg    *asset.Registry
-       Hsm         *pseudohsm.HSM
-       chain       *protocol.Chain
-       RecoveryMgr *recoveryManager
-       rescanCh    chan struct{}
+       DB              db.DB
+       rw              sync.RWMutex
+       status          StatusInfo
+       AccountMgr      *account.Manager
+       AssetReg        *asset.Registry
+       Hsm             *pseudohsm.HSM
+       chain           *protocol.Chain
+       RecoveryMgr     *recoveryManager
+       eventDispatcher *event.Dispatcher
+       txMsgSub        *event.Subscription
+
+       rescanCh chan struct{}
 }
 
 //NewWallet return a new wallet instance
-func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain) (*Wallet, error) {
+func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher) (*Wallet, error) {
        w := &Wallet{
-               DB:          walletDB,
-               AccountMgr:  account,
-               AssetReg:    asset,
-               chain:       chain,
-               Hsm:         hsm,
-               RecoveryMgr: newRecoveryManager(walletDB, account),
-               rescanCh:    make(chan struct{}, 1),
+               DB:              walletDB,
+               AccountMgr:      account,
+               AssetReg:        asset,
+               chain:           chain,
+               Hsm:             hsm,
+               RecoveryMgr:     newRecoveryManager(walletDB, account),
+               eventDispatcher: dispatcher,
+               rescanCh:        make(chan struct{}, 1),
        }
 
        if err := w.loadWalletInfo(); err != nil {
@@ -63,11 +69,46 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
                return nil, err
        }
 
+       var err error
+       w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+       if err != nil {
+               return nil, err
+       }
+
        go w.walletUpdater()
        go w.delUnconfirmedTx()
+       go w.memPoolTxQueryLoop()
        return w, nil
 }
 
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
+func (w *Wallet) memPoolTxQueryLoop() {
+       for {
+               select {
+               case obj, ok := <-w.txMsgSub.Chan():
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+                               return
+                       }
+
+                       ev, ok := obj.Data.(protocol.TxMsgEvent)
+                       if !ok {
+                               log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+                               continue
+                       }
+
+                       switch ev.TxMsg.MsgType {
+                       case protocol.MsgNewTx:
+                               w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
+                       case protocol.MsgRemoveTx:
+                               w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
+                       default:
+                               log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
+                       }
+               }
+       }
+}
+
 //GetWalletInfo return stored wallet info and nil,if error,
 //return initial wallet info and err
 func (w *Wallet) loadWalletInfo() error {
index 1914016..5481bfd 100644 (file)
@@ -16,6 +16,7 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
+       "github.com/bytom/event"
        "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -32,7 +33,8 @@ func TestWalletUpdate(t *testing.T) {
        defer os.RemoveAll("temp")
 
        store := leveldb.NewStore(testDB)
-       txPool := protocol.NewTxPool(store)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
 
        chain, err := protocol.NewChain(store, txPool)
        if err != nil {
@@ -85,7 +87,7 @@ func TestWalletUpdate(t *testing.T) {
        txStatus.SetStatus(0, false)
        store.SaveBlock(block, txStatus)
 
-       w := mockWallet(testDB, accountManager, reg, chain)
+       w := mockWallet(testDB, accountManager, reg, chain, dispatcher)
        err = w.AttachBlock(block)
        if err != nil {
                t.Fatal(err)
@@ -101,6 +103,89 @@ func TestWalletUpdate(t *testing.T) {
        }
 }
 
+func TestMemPoolTxQueryLoop(t *testing.T) {
+       dirPath, err := ioutil.TempDir(".", "")
+       if err != nil {
+               t.Fatal(err)
+       }
+       defer os.RemoveAll(dirPath)
+
+       testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+
+       store := leveldb.NewStore(testDB)
+       dispatcher := event.NewDispatcher()
+       txPool := protocol.NewTxPool(store, dispatcher)
+
+       chain, err := protocol.NewChain(store, txPool)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       accountManager := account.NewManager(testDB, chain)
+       hsm, err := pseudohsm.New(dirPath)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       xpub1, _, err := hsm.XCreate("test_pub1", "password", "en")
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       testAccount, err := accountManager.Create([]chainkd.XPub{xpub1.XPub}, 1, "testAccount", signers.BIP0044)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       controlProg, err := accountManager.CreateAddress(testAccount.ID, false)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       controlProg.KeyIndex = 1
+
+       reg := asset.NewRegistry(testDB, chain)
+       asset, err := reg.Define([]chainkd.XPub{xpub1.XPub}, 1, nil, "TESTASSET", nil)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       utxos := []*account.UTXO{}
+       btmUtxo := mockUTXO(controlProg, consensus.BTMAssetID)
+       utxos = append(utxos, btmUtxo)
+       OtherUtxo := mockUTXO(controlProg, &asset.AssetID)
+       utxos = append(utxos, OtherUtxo)
+
+       _, txData, err := mockTxData(utxos, testAccount)
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       tx := types.NewTx(*txData)
+       //block := mockSingleBlock(tx)
+       txStatus := bc.NewTransactionStatus()
+       txStatus.SetStatus(0, false)
+       w, err := NewWallet(testDB, accountManager, reg, hsm, chain, dispatcher)
+       go w.memPoolTxQueryLoop()
+       w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: protocol.MsgNewTx}})
+       time.Sleep(time.Millisecond * 10)
+       if _, err = w.GetUnconfirmedTxByTxID(tx.ID.String()); err != nil {
+               t.Fatal("disaptch new tx msg error:", err)
+       }
+       w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: protocol.MsgRemoveTx}})
+       time.Sleep(time.Millisecond * 10)
+       txs, err := w.GetUnconfirmedTxs(testAccount.ID)
+       if err != nil {
+               t.Fatal("get unconfirmed tx error:", err)
+       }
+
+       if len(txs) != 0 {
+               t.Fatal("disaptch remove tx msg error")
+       }
+
+       w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: 2}})
+}
+
 func mockUTXO(controlProg *account.CtrlProgram, assetID *bc.AssetID) *account.UTXO {
        utxo := &account.UTXO{}
        utxo.OutputID = bc.Hash{V0: 1}
@@ -137,14 +222,16 @@ func mockTxData(utxos []*account.UTXO, testAccount *account.Account) (*txbuilder
        return tplBuilder.Build()
 }
 
-func mockWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, chain *protocol.Chain) *Wallet {
+func mockWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, chain *protocol.Chain, dispatcher *event.Dispatcher) *Wallet {
        wallet := &Wallet{
-               DB:          walletDB,
-               AccountMgr:  account,
-               AssetReg:    asset,
-               chain:       chain,
-               RecoveryMgr: newRecoveryManager(walletDB, account),
+               DB:              walletDB,
+               AccountMgr:      account,
+               AssetReg:        asset,
+               chain:           chain,
+               RecoveryMgr:     newRecoveryManager(walletDB, account),
+               eventDispatcher: dispatcher,
        }
+       wallet.txMsgSub, _ = wallet.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
        return wallet
 }