"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
"github.com/bytom/errors"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/testutil"
)
}
defer os.RemoveAll(dirPath)
- testDB := dbm.NewDB("testdb", "memdb", "temp")
- defer os.RemoveAll("temp")
+ testDB := dbm.NewDB("testdb", "memdb", dirPath)
+ dispatcher := event.NewDispatcher()
store := leveldb.NewStore(testDB)
- txPool := protocol.NewTxPool(store)
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
if err != nil {
t.Fatal(err)
"github.com/bytom/consensus"
"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/testutil"
)
func mockChain(testDB dbm.DB) (*protocol.Chain, error) {
store := leveldb.NewStore(testDB)
- txPool := protocol.NewTxPool(store)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
if err != nil {
return nil, err
log "github.com/sirupsen/logrus"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
maxNumConcurrentReqs int
status statusInfo
chain *protocol.Chain
+ eventDispatcher *event.Dispatcher
+ txMsgSub *event.Subscription
}
// NewWsNotificationManager returns a new notification manager ready for use. See WSNotificationManager for more details.
-func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain) *WSNotificationManager {
+func NewWsNotificationManager(maxNumWebsockets int, maxNumConcurrentReqs int, chain *protocol.Chain, dispatcher *event.Dispatcher) *WSNotificationManager {
// init status
var status statusInfo
header := chain.BestBlockHeader()
maxNumConcurrentReqs: maxNumConcurrentReqs,
status: status,
chain: chain,
+ eventDispatcher: dispatcher,
}
}
}
}
-// NotifyMempoolTx passes a transaction desc accepted by mempool to the
-// notification manager for transaction notification processing. If
-// isNew is true, the tx is is a new transaction, rather than one
-// added to the mempool during a reorg.
-func (m *WSNotificationManager) NotifyMempoolTx(txDesc *protocol.TxDesc) {
- select {
- case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(txDesc):
- case <-m.quit:
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the
+// notification manager for transaction notification processing.
+func (m *WSNotificationManager) memPoolTxQueryLoop() {
+ for {
+ select {
+ case obj, ok := <-m.txMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+ return
+ }
+
+ ev, ok := obj.Data.(protocol.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ if ev.TxMsg.MsgType == protocol.MsgNewTx {
+ select {
+ case m.queueNotification <- (*notificationTxDescAcceptedByMempool)(ev.TxMsg.TxDesc):
+ default:
+ }
+ }
+ case <-m.quit:
+ return
+ }
}
}
}
// Start starts the goroutines required for the manager to queue and process websocket client notifications.
-func (m *WSNotificationManager) Start() {
- m.wg.Add(3)
+func (m *WSNotificationManager) Start() error {
+ var err error
+ m.txMsgSub, err = m.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+ if err != nil {
+ return err
+ }
+
+ m.wg.Add(4)
go m.blockNotify()
go m.queueHandler()
go m.notificationHandler()
+ go m.memPoolTxQueryLoop()
+ return nil
}
// WaitForShutdown blocks until all notification manager goroutines have finished.
blockKeeper *blockKeeper
peers *peerSet
- newTxCh chan *types.Tx
txSyncCh chan *txSyncMsg
quitSync chan struct{}
config *cfg.Config
eventDispatcher *event.Dispatcher
minedBlockSub *event.Subscription
+ txMsgSub *event.Subscription
}
// CreateSyncManager create sync manager and set switch.
blockFetcher: newBlockFetcher(chain, peers),
blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
- newTxCh: make(chan *types.Tx, maxTxChanSize),
txSyncCh: make(chan *txSyncMsg),
quitSync: make(chan struct{}),
config: config,
return sm.sw.DialPeerWithAddress(addr)
}
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
- return sm.newTxCh
-}
-
func (sm *SyncManager) GetNetwork() string {
return sm.config.ChainID
}
return err
}
- // broadcast transactions
- go sm.txBroadcastLoop()
-
sm.minedBlockSub, err = sm.eventDispatcher.Subscribe(event.NewMinedBlockEvent{})
if err != nil {
return err
}
+ sm.txMsgSub, err = sm.eventDispatcher.Subscribe(core.TxMsgEvent{})
+ if err != nil {
+ return err
+ }
+
+ // broadcast transactions
+ go sm.txBroadcastLoop()
go sm.minedBroadcastLoop()
go sm.txSyncLoop()
log "github.com/sirupsen/logrus"
+ core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc/types"
)
func (sm *SyncManager) txBroadcastLoop() {
for {
select {
- case newTx := <-sm.newTxCh:
- if err := sm.peers.broadcastTx(newTx); err != nil {
- log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
+ case obj, ok := <-sm.txMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("mempool tx msg subscription channel closed")
return
}
+
+ ev, ok := obj.Data.(core.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ if ev.TxMsg.MsgType == core.MsgNewTx {
+ if err := sm.peers.broadcastTx(ev.TxMsg.Tx); err != nil {
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on broadcast new tx.")
+ continue
+ }
+ }
case <-sm.quitSync:
return
}
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)
- txPool := protocol.NewTxPool(store)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
accounts = account.NewManager(walletDB, chain)
assets = asset.NewRegistry(walletDB, chain)
- wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
+ wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain, dispatcher)
if err != nil {
log.WithField("error", err).Error("init NewWallet")
}
}
}
- dispatcher := event.NewDispatcher()
syncManager, err := netsync.NewSyncManager(config, chain, txPool, dispatcher)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create sync manager: %v", err))
}
- notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain)
-
- // get transaction from txPool and send it to syncManager and wallet
- go newPoolTxListener(txPool, syncManager, wallet, notificationMgr)
+ notificationMgr := websocket.NewWsNotificationManager(config.Websocket.MaxNumWebsockets, config.Websocket.MaxNumConcurrentReqs, chain, dispatcher)
// run the profile server
profileHost := config.ProfListenAddress
return node
}
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet, notificationMgr *websocket.WSNotificationManager) {
- txMsgCh := txPool.GetMsgCh()
- syncManagerTxCh := syncManager.GetNewTxCh()
-
- for {
- msg := <-txMsgCh
- switch msg.MsgType {
- case protocol.MsgNewTx:
- syncManagerTxCh <- msg.Tx
- if wallet != nil {
- wallet.AddUnconfirmedTx(msg.TxDesc)
- }
- notificationMgr.NotifyMempoolTx(msg.TxDesc)
- case protocol.MsgRemoveTx:
- if wallet != nil {
- wallet.RemoveUnconfirmedTx(msg.TxDesc)
- }
- default:
- log.Warn("got unknow message type from the txPool channel")
- }
- }
-}
-
// Lock data directory after daemonization
func lockDataDirectory(config *cfg.Config) error {
_, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
}
n.initAndstartAPIServer()
- n.notificationMgr.Start()
+ if err := n.notificationMgr.Start(); err != nil {
+ return err
+ }
+
if !n.config.Web.Closed {
_, port, err := net.SplitHostPort(n.config.ApiAddress)
if err != nil {
}
if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
- t.Fatal("TestSwitchAddInboundPeer peer size error", outbound, inbound, dialing)
+ t.Fatal("TestDuplicateInBoundPeer peer size error", outbound, inbound, dialing)
}
}
}
if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
- t.Fatal("TestSwitchAddInboundPeer peer size error")
+ t.Fatal("TestAddInboundPeer peer size error")
}
inp2 := &inboundPeer{PrivKey: crypto.GenPrivKeyEd25519(), config: testCfg}
if err := inp2.dial(addr); err == nil {
- t.Fatal("TestSwitchAddInboundPeer MaxNumPeers limit error")
+ t.Fatal("TestAddInboundPeer MaxNumPeers limit error")
}
}
}
if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 2 {
- t.Fatal("TestSwitchAddInboundPeer peer size error")
+ t.Fatal("TestStopPeer peer size error")
}
s1.StopPeerGracefully(s1.peers.list[0].Key)
if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 1 {
- t.Fatal("TestSwitchAddInboundPeer peer size error")
+ t.Fatal("TestStopPeer peer size error")
}
s1.StopPeerForError(s1.peers.list[0], "stop for test")
if outbound, inbound, dialing := s1.NumPeers(); outbound+inbound+dialing != 0 {
- t.Fatal("TestSwitchAddInboundPeer peer size error")
+ t.Fatal("TestStopPeer peer size error")
}
}
log "github.com/sirupsen/logrus"
"github.com/bytom/consensus"
+ "github.com/bytom/event"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/protocol/state"
ErrPoolIsFull = errors.New("transaction pool reach the max number")
)
+type TxMsgEvent struct{ TxMsg *TxPoolMsg }
+
// TxDesc store tx and related info for mining strategy
type TxDesc struct {
Tx *types.Tx `json:"transaction"`
// TxPool is use for store the unconfirmed transaction
type TxPool struct {
- lastUpdated int64
- mtx sync.RWMutex
- store Store
- pool map[bc.Hash]*TxDesc
- utxo map[bc.Hash]*types.Tx
- orphans map[bc.Hash]*orphanTx
- orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
- errCache *lru.Cache
- msgCh chan *TxPoolMsg
+ lastUpdated int64
+ mtx sync.RWMutex
+ store Store
+ pool map[bc.Hash]*TxDesc
+ utxo map[bc.Hash]*types.Tx
+ orphans map[bc.Hash]*orphanTx
+ orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
+ errCache *lru.Cache
+ eventDispatcher *event.Dispatcher
}
// NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
+func NewTxPool(store Store, dispatcher *event.Dispatcher) *TxPool {
tp := &TxPool{
- lastUpdated: time.Now().Unix(),
- store: store,
- pool: make(map[bc.Hash]*TxDesc),
- utxo: make(map[bc.Hash]*types.Tx),
- orphans: make(map[bc.Hash]*orphanTx),
- orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
- errCache: lru.New(maxCachedErrTxs),
- msgCh: make(chan *TxPoolMsg, maxMsgChSize),
+ lastUpdated: time.Now().Unix(),
+ store: store,
+ pool: make(map[bc.Hash]*TxDesc),
+ utxo: make(map[bc.Hash]*types.Tx),
+ orphans: make(map[bc.Hash]*orphanTx),
+ orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
+ errCache: lru.New(maxCachedErrTxs),
+ eventDispatcher: dispatcher,
}
go tp.orphanExpireWorker()
return tp
return v.(error)
}
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
- return tp.msgCh
-}
-
// RemoveTransaction remove a transaction from the pool
func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
tp.mtx.Lock()
delete(tp.pool, *txHash)
atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}})
log.WithField("tx_id", txHash).Debug("remove tx from mempool")
}
}
atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
+ tp.eventDispatcher.Post(TxMsgEvent{TxMsg: &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}})
log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
return nil
}
"github.com/bytom/consensus"
"github.com/bytom/database/storage"
+ "github.com/bytom/event"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/protocol/state"
},
after: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
{
before: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
after: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
TxDesc: &TxDesc{
Tx: testTxs[1],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
TxDesc: &TxDesc{
Tx: testTxs[1],
},
},
after: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[2].ID: &orphanTx{
+ testTxs[2].ID: {
TxDesc: &TxDesc{
Tx: testTxs[2],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[2].SpentOutputIDs[1]: map[bc.Hash]*orphanTx{
- testTxs[2].ID: &orphanTx{
+ testTxs[2].SpentOutputIDs[1]: {
+ testTxs[2].ID: {
TxDesc: &TxDesc{
Tx: testTxs[2],
},
}
func TestAddTransaction(t *testing.T) {
+ dispatcher := event.NewDispatcher()
cases := []struct {
before *TxPool
after *TxPool
}{
{
before: &TxPool{
- pool: map[bc.Hash]*TxDesc{},
- utxo: map[bc.Hash]*types.Tx{},
- msgCh: make(chan *TxPoolMsg, 1),
+ pool: map[bc.Hash]*TxDesc{},
+ utxo: map[bc.Hash]*types.Tx{},
+ eventDispatcher: dispatcher,
},
after: &TxPool{
pool: map[bc.Hash]*TxDesc{
- testTxs[2].ID: &TxDesc{
+ testTxs[2].ID: {
Tx: testTxs[2],
StatusFail: false,
},
},
{
before: &TxPool{
- pool: map[bc.Hash]*TxDesc{},
- utxo: map[bc.Hash]*types.Tx{},
- msgCh: make(chan *TxPoolMsg, 1),
+ pool: map[bc.Hash]*TxDesc{},
+ utxo: map[bc.Hash]*types.Tx{},
+ eventDispatcher: dispatcher,
},
after: &TxPool{
pool: map[bc.Hash]*TxDesc{
- testTxs[2].ID: &TxDesc{
+ testTxs[2].ID: {
Tx: testTxs[2],
StatusFail: true,
},
func TestExpireOrphan(t *testing.T) {
before := &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
expiration: time.Unix(1633489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
expiration: time.Unix(1633489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
want := &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
expiration: time.Unix(1633489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[1].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[1].ID: {
expiration: time.Unix(1633489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
}
func TestProcessOrphans(t *testing.T) {
+ dispatcher := event.NewDispatcher()
cases := []struct {
before *TxPool
after *TxPool
}{
{
before: &TxPool{
- pool: map[bc.Hash]*TxDesc{},
- utxo: map[bc.Hash]*types.Tx{},
+ pool: map[bc.Hash]*TxDesc{},
+ utxo: map[bc.Hash]*types.Tx{},
+ eventDispatcher: dispatcher,
orphans: map[bc.Hash]*orphanTx{
- testTxs[3].ID: &orphanTx{
+ testTxs[3].ID: {
TxDesc: &TxDesc{
Tx: testTxs[3],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[3].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[3].ID: &orphanTx{
+ testTxs[3].SpentOutputIDs[0]: {
+ testTxs[3].ID: {
TxDesc: &TxDesc{
Tx: testTxs[3],
},
},
},
},
- msgCh: make(chan *TxPoolMsg, 10),
},
after: &TxPool{
pool: map[bc.Hash]*TxDesc{
- testTxs[3].ID: &TxDesc{
+ testTxs[3].ID: {
Tx: testTxs[3],
StatusFail: false,
},
*testTxs[3].ResultIds[0]: testTxs[3],
*testTxs[3].ResultIds[1]: testTxs[3],
},
- orphans: map[bc.Hash]*orphanTx{},
- orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
+ eventDispatcher: dispatcher,
+ orphans: map[bc.Hash]*orphanTx{},
+ orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
},
processTx: &TxDesc{Tx: testTxs[2]},
},
{
before: &TxPool{
- pool: map[bc.Hash]*TxDesc{},
- utxo: map[bc.Hash]*types.Tx{},
+ pool: map[bc.Hash]*TxDesc{},
+ utxo: map[bc.Hash]*types.Tx{},
+ eventDispatcher: dispatcher,
orphans: map[bc.Hash]*orphanTx{
- testTxs[3].ID: &orphanTx{
+ testTxs[3].ID: {
TxDesc: &TxDesc{
Tx: testTxs[3],
},
},
- testTxs[4].ID: &orphanTx{
+ testTxs[4].ID: {
TxDesc: &TxDesc{
Tx: testTxs[4],
},
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[3].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[3].ID: &orphanTx{
+ testTxs[3].SpentOutputIDs[0]: {
+ testTxs[3].ID: {
TxDesc: &TxDesc{
Tx: testTxs[3],
},
},
},
- testTxs[4].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[4].ID: &orphanTx{
+ testTxs[4].SpentOutputIDs[0]: {
+ testTxs[4].ID: {
TxDesc: &TxDesc{
Tx: testTxs[4],
},
},
},
},
- msgCh: make(chan *TxPoolMsg, 10),
},
after: &TxPool{
pool: map[bc.Hash]*TxDesc{
- testTxs[3].ID: &TxDesc{
+ testTxs[3].ID: {
Tx: testTxs[3],
StatusFail: false,
},
- testTxs[4].ID: &TxDesc{
+ testTxs[4].ID: {
Tx: testTxs[4],
StatusFail: false,
},
*testTxs[4].ResultIds[0]: testTxs[4],
*testTxs[4].ResultIds[1]: testTxs[4],
},
- orphans: map[bc.Hash]*orphanTx{},
- orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
+ eventDispatcher: dispatcher,
+ orphans: map[bc.Hash]*orphanTx{},
+ orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{},
},
processTx: &TxDesc{Tx: testTxs[2]},
},
c.before.processOrphans(c.processTx)
c.before.RemoveTransaction(&c.processTx.Tx.ID)
c.before.store = nil
- c.before.msgCh = nil
c.before.lastUpdated = 0
for _, txD := range c.before.pool {
txD.Added = time.Time{}
{
before: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
{
before: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
- testTxs[1].ID: &orphanTx{
+ testTxs[1].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[1],
},
after: &TxPool{
orphans: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
},
},
orphansByPrev: map[bc.Hash]map[bc.Hash]*orphanTx{
- testTxs[0].SpentOutputIDs[0]: map[bc.Hash]*orphanTx{
- testTxs[0].ID: &orphanTx{
+ testTxs[0].SpentOutputIDs[0]: {
+ testTxs[0].ID: {
expiration: time.Unix(1533489701, 0),
TxDesc: &TxDesc{
Tx: testTxs[0],
"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
"github.com/bytom/database/storage"
+ "github.com/bytom/event"
"github.com/bytom/mining"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
}
store := leveldb.NewStore(testDB)
- txPool := protocol.NewTxPool(store)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
if err != nil {
return nil, nil, nil, err
}
func processNewTxch(txPool *protocol.TxPool) {
- newTxCh := txPool.GetMsgCh()
- for tx := range newTxCh {
- if tx == nil {
- }
- }
}
func SolveBlock(seed *bc.Hash, block *types.Block) error {
"github.com/bytom/consensus"
"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
// MockChain mock chain with genesis block
func MockChain(testDB dbm.DB) (*protocol.Chain, *leveldb.Store, *protocol.TxPool, error) {
store := leveldb.NewStore(testDB)
- txPool := protocol.NewTxPool(store)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
return chain, store, txPool, err
}
"github.com/bytom/blockchain/pseudohsm"
"github.com/bytom/blockchain/signers"
"github.com/bytom/crypto/ed25519/chainkd"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc/types"
w "github.com/bytom/wallet"
walletDB := dbm.NewDB("wallet", "leveldb", path.Join(dirPath, "wallet_db"))
accountManager := account.NewManager(walletDB, chain)
assets := asset.NewRegistry(walletDB, chain)
- wallet, err := w.NewWallet(walletDB, accountManager, assets, hsm, chain)
+ dispatcher := event.NewDispatcher()
+ wallet, err := w.NewWallet(walletDB, accountManager, assets, hsm, chain, dispatcher)
if err != nil {
return err
}
// RemoveUnconfirmedTx handle wallet status update when tx removed from txpool
func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) {
+ if !w.checkRelatedTransaction(txD.Tx) {
+ return
+ }
+ w.DB.Delete(calcUnconfirmedTxKey(txD.Tx.ID.String()))
w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds)
}
"github.com/bytom/blockchain/signers"
"github.com/bytom/consensus"
"github.com/bytom/crypto/ed25519/chainkd"
+ "github.com/bytom/event"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/testutil"
)
t.Fatal(err)
}
- w := mockWallet(testDB, accountManager, reg, nil)
+ dispatcher := event.NewDispatcher()
+ w := mockWallet(testDB, accountManager, reg, nil, dispatcher)
utxos := []*account.UTXO{}
btmUtxo := mockUTXO(controlProg, consensus.BTMAssetID)
utxos = append(utxos, btmUtxo)
"github.com/bytom/account"
"github.com/bytom/asset"
"github.com/bytom/blockchain/pseudohsm"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
const (
//SINGLE single sign
- SINGLE = 1
+ SINGLE = 1
+ logModule = "wallet"
)
var walletKey = []byte("walletInfo")
//Wallet is related to storing account unspent outputs
type Wallet struct {
- DB db.DB
- rw sync.RWMutex
- status StatusInfo
- AccountMgr *account.Manager
- AssetReg *asset.Registry
- Hsm *pseudohsm.HSM
- chain *protocol.Chain
- RecoveryMgr *recoveryManager
- rescanCh chan struct{}
+ DB db.DB
+ rw sync.RWMutex
+ status StatusInfo
+ AccountMgr *account.Manager
+ AssetReg *asset.Registry
+ Hsm *pseudohsm.HSM
+ chain *protocol.Chain
+ RecoveryMgr *recoveryManager
+ eventDispatcher *event.Dispatcher
+ txMsgSub *event.Subscription
+
+ rescanCh chan struct{}
}
//NewWallet return a new wallet instance
-func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain) (*Wallet, error) {
+func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry, hsm *pseudohsm.HSM, chain *protocol.Chain, dispatcher *event.Dispatcher) (*Wallet, error) {
w := &Wallet{
- DB: walletDB,
- AccountMgr: account,
- AssetReg: asset,
- chain: chain,
- Hsm: hsm,
- RecoveryMgr: newRecoveryManager(walletDB, account),
- rescanCh: make(chan struct{}, 1),
+ DB: walletDB,
+ AccountMgr: account,
+ AssetReg: asset,
+ chain: chain,
+ Hsm: hsm,
+ RecoveryMgr: newRecoveryManager(walletDB, account),
+ eventDispatcher: dispatcher,
+ rescanCh: make(chan struct{}, 1),
}
if err := w.loadWalletInfo(); err != nil {
return nil, err
}
+ var err error
+ w.txMsgSub, err = w.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
+ if err != nil {
+ return nil, err
+ }
+
go w.walletUpdater()
go w.delUnconfirmedTx()
+ go w.memPoolTxQueryLoop()
return w, nil
}
+// memPoolTxQueryLoop constantly pass a transaction accepted by mempool to the wallet.
+func (w *Wallet) memPoolTxQueryLoop() {
+ for {
+ select {
+ case obj, ok := <-w.txMsgSub.Chan():
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Warning("tx pool tx msg subscription channel closed")
+ return
+ }
+
+ ev, ok := obj.Data.(protocol.TxMsgEvent)
+ if !ok {
+ log.WithFields(log.Fields{"module": logModule}).Error("event type error")
+ continue
+ }
+
+ switch ev.TxMsg.MsgType {
+ case protocol.MsgNewTx:
+ w.AddUnconfirmedTx(ev.TxMsg.TxDesc)
+ case protocol.MsgRemoveTx:
+ w.RemoveUnconfirmedTx(ev.TxMsg.TxDesc)
+ default:
+ log.WithFields(log.Fields{"module": logModule}).Warn("got unknow message type from the txPool channel")
+ }
+ }
+ }
+}
+
//GetWalletInfo return stored wallet info and nil,if error,
//return initial wallet info and err
func (w *Wallet) loadWalletInfo() error {
"github.com/bytom/consensus"
"github.com/bytom/crypto/ed25519/chainkd"
"github.com/bytom/database/leveldb"
+ "github.com/bytom/event"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
defer os.RemoveAll("temp")
store := leveldb.NewStore(testDB)
- txPool := protocol.NewTxPool(store)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
chain, err := protocol.NewChain(store, txPool)
if err != nil {
txStatus.SetStatus(0, false)
store.SaveBlock(block, txStatus)
- w := mockWallet(testDB, accountManager, reg, chain)
+ w := mockWallet(testDB, accountManager, reg, chain, dispatcher)
err = w.AttachBlock(block)
if err != nil {
t.Fatal(err)
}
}
+func TestMemPoolTxQueryLoop(t *testing.T) {
+ dirPath, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer os.RemoveAll(dirPath)
+
+ testDB := dbm.NewDB("testdb", "leveldb", dirPath)
+
+ store := leveldb.NewStore(testDB)
+ dispatcher := event.NewDispatcher()
+ txPool := protocol.NewTxPool(store, dispatcher)
+
+ chain, err := protocol.NewChain(store, txPool)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ accountManager := account.NewManager(testDB, chain)
+ hsm, err := pseudohsm.New(dirPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ xpub1, _, err := hsm.XCreate("test_pub1", "password", "en")
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ testAccount, err := accountManager.Create([]chainkd.XPub{xpub1.XPub}, 1, "testAccount", signers.BIP0044)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ controlProg, err := accountManager.CreateAddress(testAccount.ID, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ controlProg.KeyIndex = 1
+
+ reg := asset.NewRegistry(testDB, chain)
+ asset, err := reg.Define([]chainkd.XPub{xpub1.XPub}, 1, nil, "TESTASSET", nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ utxos := []*account.UTXO{}
+ btmUtxo := mockUTXO(controlProg, consensus.BTMAssetID)
+ utxos = append(utxos, btmUtxo)
+ OtherUtxo := mockUTXO(controlProg, &asset.AssetID)
+ utxos = append(utxos, OtherUtxo)
+
+ _, txData, err := mockTxData(utxos, testAccount)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ tx := types.NewTx(*txData)
+ //block := mockSingleBlock(tx)
+ txStatus := bc.NewTransactionStatus()
+ txStatus.SetStatus(0, false)
+ w, err := NewWallet(testDB, accountManager, reg, hsm, chain, dispatcher)
+ go w.memPoolTxQueryLoop()
+ w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: protocol.MsgNewTx}})
+ time.Sleep(time.Millisecond * 10)
+ if _, err = w.GetUnconfirmedTxByTxID(tx.ID.String()); err != nil {
+ t.Fatal("disaptch new tx msg error:", err)
+ }
+ w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: protocol.MsgRemoveTx}})
+ time.Sleep(time.Millisecond * 10)
+ txs, err := w.GetUnconfirmedTxs(testAccount.ID)
+ if err != nil {
+ t.Fatal("get unconfirmed tx error:", err)
+ }
+
+ if len(txs) != 0 {
+ t.Fatal("disaptch remove tx msg error")
+ }
+
+ w.eventDispatcher.Post(protocol.TxMsgEvent{TxMsg: &protocol.TxPoolMsg{TxDesc: &protocol.TxDesc{Tx: tx}, MsgType: 2}})
+}
+
func mockUTXO(controlProg *account.CtrlProgram, assetID *bc.AssetID) *account.UTXO {
utxo := &account.UTXO{}
utxo.OutputID = bc.Hash{V0: 1}
return tplBuilder.Build()
}
-func mockWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, chain *protocol.Chain) *Wallet {
+func mockWallet(walletDB dbm.DB, account *account.Manager, asset *asset.Registry, chain *protocol.Chain, dispatcher *event.Dispatcher) *Wallet {
wallet := &Wallet{
- DB: walletDB,
- AccountMgr: account,
- AssetReg: asset,
- chain: chain,
- RecoveryMgr: newRecoveryManager(walletDB, account),
+ DB: walletDB,
+ AccountMgr: account,
+ AssetReg: asset,
+ chain: chain,
+ RecoveryMgr: newRecoveryManager(walletDB, account),
+ eventDispatcher: dispatcher,
}
+ wallet.txMsgSub, _ = wallet.eventDispatcher.Subscribe(protocol.TxMsgEvent{})
return wallet
}