OSDN Git Service

fix suggestion
authoroysheng <oysheng@bytom.io>
Tue, 5 Jun 2018 05:29:11 +0000 (13:29 +0800)
committeroysheng <oysheng@bytom.io>
Tue, 5 Jun 2018 07:19:51 +0000 (15:19 +0800)
netsync/block_keeper.go
netsync/handle.go
node/node.go
wallet/indexer.go
wallet/unconfirmed.go
wallet/wallet.go

index 6ea62d6..e24fd94 100644 (file)
@@ -22,7 +22,7 @@ const (
        maxtxsPending    = 32768
        maxQuitReq       = 256
 
-       maxTxChanSize = 1000 // txChanSize is the size of channel listening to Txpool newTxCh
+       maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
 )
 
 var (
index 40d7a47..846590b 100644 (file)
@@ -30,7 +30,7 @@ type SyncManager struct {
        blockKeeper *blockKeeper
        peers       *peerSet
 
-       txCh          chan *types.Tx
+       newTxCh       chan *types.Tx
        newBlockCh    chan *bc.Hash
        newPeerCh     chan struct{}
        txSyncCh      chan *txsync
@@ -48,7 +48,7 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
                chain:      chain,
                privKey:    crypto.GenPrivKeyEd25519(),
                peers:      newPeerSet(),
-               txCh:       make(chan *types.Tx, maxTxChanSize),
+               newTxCh:    make(chan *types.Tx, maxTxChanSize),
                newBlockCh: newBlockCh,
                newPeerCh:  make(chan struct{}),
                txSyncCh:   make(chan *txsync),
@@ -151,7 +151,7 @@ func (sm *SyncManager) Stop() {
 func (sm *SyncManager) txBroadcastLoop() {
        for {
                select {
-               case newTx := <-sm.txCh:
+               case newTx := <-sm.newTxCh:
                        peers, err := sm.peers.BroadcastTx(newTx)
                        if err != nil {
                                log.Errorf("Broadcast new tx error. %v", err)
@@ -219,7 +219,7 @@ func (sm *SyncManager) Switch() *p2p.Switch {
        return sm.sw
 }
 
-//SetTxCh set SyncManager txCh
-func (sm *SyncManager) SetTxCh(txCh *types.Tx) {
-       sm.txCh <- txCh
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+       return sm.newTxCh
 }
index 591c658..a751f27 100644 (file)
@@ -126,19 +126,7 @@ func NewNode(config *cfg.Config) *Node {
        syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
        // get transaction from txPool and send it to syncManager and wallet
-       go func() {
-               newTxCh := txPool.GetNewTxCh()
-               for {
-                       select {
-                       case newTx := <-newTxCh:
-                               syncManager.SetTxCh(newTx)
-                               if wallet != nil {
-                                       wallet.SetTxCh(newTx)
-                               }
-                       default:
-                       }
-               }
-       }()
+       go syncTxPoolTransaction(txPool, syncManager, wallet)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -169,6 +157,23 @@ func NewNode(config *cfg.Config) *Node {
        return node
 }
 
+// syncTxPoolTransaction sync transaction from txPool, and send it to syncManager and wallet
+func syncTxPoolTransaction(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+       newTxCh := txPool.GetNewTxCh()
+       for {
+               select {
+               case newTx := <-newTxCh:
+                       txCh := syncManager.GetNewTxCh()
+                       txCh <- newTx
+                       if wallet != nil {
+                               txCh := wallet.GetNewTxCh()
+                               txCh <- newTx
+                       }
+               default:
+               }
+       }
+}
+
 // Lock data directory after daemonization
 func lockDataDirectory(config *cfg.Config) error {
        _, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
index 0073274..00a52a0 100644 (file)
@@ -204,6 +204,9 @@ func (w *Wallet) indexTransactions(batch db.Batch, b *types.Block, txStatus *bc.
 
                batch.Set(calcAnnotatedKey(formatKey(b.Height, uint32(tx.Position))), rawTx)
                batch.Set(calcTxIndexKey(tx.ID.String()), []byte(formatKey(b.Height, uint32(tx.Position))))
+
+               // delete unconfirmed transaction
+               batch.Delete(calcUnconfirmedTxKey(tx.ID.String()))
        }
        return nil
 }
@@ -509,7 +512,7 @@ func (w *Wallet) GetTransactions(accountID string) ([]*query.AnnotatedTx, error)
 
                annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx})
                annotatedTxs = append(annotatedTxs, annotatedTx)
-               if findTransactionsByAccount(annotatedTx, accountID) {
+               if accountID != "" && findTransactionsByAccount(annotatedTx, accountID) {
                        annotatedAccTxs = append(annotatedAccTxs, annotatedTx)
                }
        }
index 2941023..59f1b65 100644 (file)
@@ -2,7 +2,6 @@ package wallet
 
 import (
        "encoding/json"
-       "strings"
 
        log "github.com/sirupsen/logrus"
 
@@ -16,7 +15,7 @@ const (
        unconfirmedTxPrefix = "UTXS:"
 )
 
-func calcUnconfirmedKey(formatKey string) []byte {
+func calcUnconfirmedTxKey(formatKey string) []byte {
        return []byte(unconfirmedTxPrefix + formatKey)
 }
 
@@ -44,59 +43,18 @@ func (w *Wallet) SaveUnconfirmedTx(tx *types.Tx) error {
 
        rawTx, err := json.Marshal(annotatedTxs[0])
        if err != nil {
-               log.WithField("err", err).Error("inserting unconfirmed annotated transaction to db")
                return err
        }
 
-       w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
-       log.Infof("insert unconfirmed tx=%s into db", tx.ID.String())
+       w.DB.Set(calcUnconfirmedTxKey(tx.ID.String()), rawTx)
+       log.Debugf("insert unconfirmed tx=%s into db", tx.ID.String())
        return nil
 }
 
-// DeleteUnconfirmedTxs delete unconfirmed annotated transactions from the database when these transactions are not existed in txpool
-func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) error {
-       var TxIDsStr string
-       for i, txID := range txIDs {
-               if i == 0 {
-                       TxIDsStr += txID
-               }
-               TxIDsStr = TxIDsStr + ":" + txID
-       }
-
-       txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
-       defer txIter.Release()
-       for txIter.Next() {
-               annotatedTx := &query.AnnotatedTx{}
-               if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
-                       return err
-               }
-
-               if !strings.Contains(TxIDsStr, annotatedTx.ID.String()) {
-                       w.DB.Delete(calcUnconfirmedKey(annotatedTx.ID.String()))
-                       log.Infof("delete unconfirmed tx=%s from db", annotatedTx.ID.String())
-               }
-       }
-
-       return nil
-}
-
-// RescanWalletTxPool rescan txPool
-func (w *Wallet) RescanWalletTxPool() []string {
-       txIDs := []string{}
-
-       txPool := w.chain.GetTxPool()
-       txs := txPool.GetTransactions()
-       for _, txDesc := range txs {
-               txIDs = append(txIDs, txDesc.Tx.ID.String())
-       }
-
-       return txIDs
-}
-
 // GetUnconfirmedTxByTxID get unconfirmed transaction by txID
 func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
        annotatedTx := &query.AnnotatedTx{}
-       txInfo := w.DB.Get(calcUnconfirmedKey(txID))
+       txInfo := w.DB.Get(calcUnconfirmedTxKey(txID))
        if txInfo == nil {
                return nil, errors.WithData(ErrNotFoundTx, "not found tx=%s from txpool", txID)
        }
index 16236fc..028cfac 100644 (file)
@@ -18,7 +18,7 @@ const (
        //SINGLE single sign
        SINGLE = 1
 
-       maxTxChanSize = 1000 // txChanSize is the size of channel listening to Txpool newTxCh
+       maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
 )
 
 var walletKey = []byte("walletInfo")
@@ -40,7 +40,7 @@ type Wallet struct {
        Hsm        *pseudohsm.HSM
        chain      *protocol.Chain
        rescanCh   chan struct{}
-       txCh       chan *types.Tx
+       newTxCh    chan *types.Tx
 }
 
 //NewWallet return a new wallet instance
@@ -52,7 +52,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
                chain:      chain,
                Hsm:        hsm,
                rescanCh:   make(chan struct{}, 1),
-               txCh:       make(chan *types.Tx, maxTxChanSize),
+               newTxCh:    make(chan *types.Tx, maxTxChanSize),
        }
 
        if err := w.loadWalletInfo(); err != nil {
@@ -60,7 +60,7 @@ func NewWallet(walletDB db.DB, account *account.Manager, asset *asset.Registry,
        }
 
        go w.walletUpdater()
-       go w.walletTxPoolUpdater()
+       go w.UnconfirmedTxCollector()
 
        return w, nil
 }
@@ -190,25 +190,14 @@ func (w *Wallet) getRescanNotification() {
        }
 }
 
-//SetTxCh set wallet txCh
-func (w *Wallet) SetTxCh(txCh *types.Tx) {
-       w.txCh <- txCh
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (w *Wallet) GetNewTxCh() chan *types.Tx {
+       return w.newTxCh
 }
 
-func (w *Wallet) walletTxPoolUpdater() {
+func (w *Wallet) UnconfirmedTxCollector() {
        for {
-               // rescan txpool transaction and delete unconfirmed transactions from database
-               txIDs := w.RescanWalletTxPool()
-               if err := w.DeleteUnconfirmedTxs(txIDs); err != nil {
-                       log.WithField("err", err).Error("DeleteUnconfirmedTxs unmarshal error")
-                       return
-               }
-
-               select {
-               case newTx := <-w.txCh:
-                       w.SaveUnconfirmedTx(newTx)
-               default:
-               }
+               w.SaveUnconfirmedTx(<-w.newTxCh)
        }
 }