maxtxsPending = 32768
maxQuitReq = 256
- maxTxChanSize = 1000 // txChanSize is the size of channel listening to Txpool newTxCh
+ maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
)
var (
blockKeeper *blockKeeper
peers *peerSet
- txCh chan *types.Tx
+ newTxCh chan *types.Tx
newBlockCh chan *bc.Hash
newPeerCh chan struct{}
txSyncCh chan *txsync
chain: chain,
privKey: crypto.GenPrivKeyEd25519(),
peers: newPeerSet(),
- txCh: make(chan *types.Tx, maxTxChanSize),
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
newBlockCh: newBlockCh,
newPeerCh: make(chan struct{}),
txSyncCh: make(chan *txsync),
func (sm *SyncManager) txBroadcastLoop() {
for {
select {
- case newTx := <-sm.txCh:
+ case newTx := <-sm.newTxCh:
peers, err := sm.peers.BroadcastTx(newTx)
if err != nil {
log.Errorf("Broadcast new tx error. %v", err)
return sm.sw
}
-//SetTxCh set SyncManager txCh
-func (sm *SyncManager) SetTxCh(txCh *types.Tx) {
- sm.txCh <- txCh
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+ return sm.newTxCh
}
syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
// get transaction from txPool and send it to syncManager and wallet
- go func() {
- newTxCh := txPool.GetNewTxCh()
- for {
- select {
- case newTx := <-newTxCh:
- syncManager.SetTxCh(newTx)
- if wallet != nil {
- wallet.SetTxCh(newTx)
- }
- default:
- }
- }
- }()
+ go syncTxPoolTransaction(txPool, syncManager, wallet)
// run the profile server
profileHost := config.ProfListenAddress
return node
}
+// syncTxPoolTransaction sync transaction from txPool, and send it to syncManager and wallet
+func syncTxPoolTransaction(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
+ newTxCh := txPool.GetNewTxCh()
+ for {
+ select {
+ case newTx := <-newTxCh:
+ txCh := syncManager.GetNewTxCh()
+ txCh <- newTx
+ if wallet != nil {
+ txCh := wallet.GetNewTxCh()
+ txCh <- newTx
+ }
+ default:
+ }
+ }
+}
+
// Lock data directory after daemonization
func lockDataDirectory(config *cfg.Config) error {
_, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
batch.Set(calcAnnotatedKey(formatKey(b.Height, uint32(tx.Position))), rawTx)
batch.Set(calcTxIndexKey(tx.ID.String()), []byte(formatKey(b.Height, uint32(tx.Position))))
+
+ // delete unconfirmed transaction
+ batch.Delete(calcUnconfirmedTxKey(tx.ID.String()))
}
return nil
}
annotateTxsAsset(w, []*query.AnnotatedTx{annotatedTx})
annotatedTxs = append(annotatedTxs, annotatedTx)
- if findTransactionsByAccount(annotatedTx, accountID) {
+ if accountID != "" && findTransactionsByAccount(annotatedTx, accountID) {
annotatedAccTxs = append(annotatedAccTxs, annotatedTx)
}
}
import (
"encoding/json"
- "strings"
log "github.com/sirupsen/logrus"
unconfirmedTxPrefix = "UTXS:"
)
-func calcUnconfirmedKey(formatKey string) []byte {
+func calcUnconfirmedTxKey(formatKey string) []byte {
return []byte(unconfirmedTxPrefix + formatKey)
}
rawTx, err := json.Marshal(annotatedTxs[0])
if err != nil {
- log.WithField("err", err).Error("inserting unconfirmed annotated transaction to db")
return err
}
- w.DB.Set(calcUnconfirmedKey(tx.ID.String()), rawTx)
- log.Infof("insert unconfirmed tx=%s into db", tx.ID.String())
+ w.DB.Set(calcUnconfirmedTxKey(tx.ID.String()), rawTx)
+ log.Debugf("insert unconfirmed tx=%s into db", tx.ID.String())
return nil
}
-// DeleteUnconfirmedTxs delete unconfirmed annotated transactions from the database when these transactions are not existed in txpool
-func (w *Wallet) DeleteUnconfirmedTxs(txIDs []string) error {
- var TxIDsStr string
- for i, txID := range txIDs {
- if i == 0 {
- TxIDsStr += txID
- }
- TxIDsStr = TxIDsStr + ":" + txID
- }
-
- txIter := w.DB.IteratorPrefix([]byte(unconfirmedTxPrefix))
- defer txIter.Release()
- for txIter.Next() {
- annotatedTx := &query.AnnotatedTx{}
- if err := json.Unmarshal(txIter.Value(), &annotatedTx); err != nil {
- return err
- }
-
- if !strings.Contains(TxIDsStr, annotatedTx.ID.String()) {
- w.DB.Delete(calcUnconfirmedKey(annotatedTx.ID.String()))
- log.Infof("delete unconfirmed tx=%s from db", annotatedTx.ID.String())
- }
- }
-
- return nil
-}
-
-// RescanWalletTxPool rescan txPool
-func (w *Wallet) RescanWalletTxPool() []string {
- txIDs := []string{}
-
- txPool := w.chain.GetTxPool()
- txs := txPool.GetTransactions()
- for _, txDesc := range txs {
- txIDs = append(txIDs, txDesc.Tx.ID.String())
- }
-
- return txIDs
-}
-
// GetUnconfirmedTxByTxID get unconfirmed transaction by txID
func (w *Wallet) GetUnconfirmedTxByTxID(txID string) (*query.AnnotatedTx, error) {
annotatedTx := &query.AnnotatedTx{}
- txInfo := w.DB.Get(calcUnconfirmedKey(txID))
+ txInfo := w.DB.Get(calcUnconfirmedTxKey(txID))
if txInfo == nil {
return nil, errors.WithData(ErrNotFoundTx, "not found tx=%s from txpool", txID)
}
//SINGLE single sign
SINGLE = 1
- maxTxChanSize = 1000 // txChanSize is the size of channel listening to Txpool newTxCh
+ maxTxChanSize = 10000 // txChanSize is the size of channel listening to Txpool newTxCh
)
var walletKey = []byte("walletInfo")
Hsm *pseudohsm.HSM
chain *protocol.Chain
rescanCh chan struct{}
- txCh chan *types.Tx
+ newTxCh chan *types.Tx
}
//NewWallet return a new wallet instance
chain: chain,
Hsm: hsm,
rescanCh: make(chan struct{}, 1),
- txCh: make(chan *types.Tx, maxTxChanSize),
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
}
if err := w.loadWalletInfo(); err != nil {
}
go w.walletUpdater()
- go w.walletTxPoolUpdater()
+ go w.UnconfirmedTxCollector()
return w, nil
}
}
}
-//SetTxCh set wallet txCh
-func (w *Wallet) SetTxCh(txCh *types.Tx) {
- w.txCh <- txCh
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (w *Wallet) GetNewTxCh() chan *types.Tx {
+ return w.newTxCh
}
-func (w *Wallet) walletTxPoolUpdater() {
+func (w *Wallet) UnconfirmedTxCollector() {
for {
- // rescan txpool transaction and delete unconfirmed transactions from database
- txIDs := w.RescanWalletTxPool()
- if err := w.DeleteUnconfirmedTxs(txIDs); err != nil {
- log.WithField("err", err).Error("DeleteUnconfirmedTxs unmarshal error")
- return
- }
-
- select {
- case newTx := <-w.txCh:
- w.SaveUnconfirmedTx(newTx)
- default:
- }
+ w.SaveUnconfirmedTx(<-w.newTxCh)
}
}