"github.com/bytom/dashboard"
"github.com/bytom/equity"
"github.com/bytom/errors"
- "github.com/bytom/mining/cpuminer"
- "github.com/bytom/mining/miningpool"
"github.com/bytom/net/http/authn"
"github.com/bytom/net/http/gzip"
"github.com/bytom/net/http/httpjson"
server *http.Server
handler http.Handler
txFeedTracker *txfeed.Tracker
- cpuMiner *cpuminer.CPUMiner
- miningPool *miningpool.MiningPool
}
func (a *API) initServer(config *cfg.Config) {
}
// NewAPI create and initialize the API
-func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
api := &API{
sync: sync,
wallet: wallet,
chain: chain,
accessTokens: token,
txFeedTracker: txfeeds,
- cpuMiner: cpuMiner,
- miningPool: miningPool,
}
api.buildHandler()
api.initServer(config)
m.Handle("/get-mining-address", jsonHandler(a.getMiningAddress))
m.Handle("/set-mining-address", jsonHandler(a.setMiningAddress))
- m.Handle("/get-coinbase-arbitrary", jsonHandler(a.getCoinbaseArbitrary))
- m.Handle("/set-coinbase-arbitrary", jsonHandler(a.setCoinbaseArbitrary))
-
m.Handle("/create-asset", jsonHandler(a.createAsset))
m.Handle("/update-asset-alias", jsonHandler(a.updateAssetAlias))
m.Handle("/get-asset", jsonHandler(a.getAsset))
m.Handle("/submit-transaction", jsonHandler(a.submit))
m.Handle("/estimate-transaction-gas", jsonHandler(a.estimateTxGas))
- m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
- m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
+ //m.Handle("/get-unconfirmed-transaction", jsonHandler(a.getUnconfirmedTx))
+ //m.Handle("/list-unconfirmed-transactions", jsonHandler(a.listUnconfirmedTxs))
m.Handle("/decode-raw-transaction", jsonHandler(a.decodeRawTransaction))
m.Handle("/get-block", jsonHandler(a.getBlock))
m.Handle("/get-difficulty", jsonHandler(a.getDifficulty))
m.Handle("/get-hash-rate", jsonHandler(a.getHashRate))
- m.Handle("/is-mining", jsonHandler(a.isMining))
- m.Handle("/set-mining", jsonHandler(a.setMining))
-
- m.Handle("/get-work", jsonHandler(a.getWork))
- m.Handle("/get-work-json", jsonHandler(a.getWorkJSON))
- m.Handle("/submit-work", jsonHandler(a.submitWork))
- m.Handle("/submit-work-json", jsonHandler(a.submitWorkJSON))
-
m.Handle("/verify-message", jsonHandler(a.verifyMessage))
m.Handle("/decode-program", jsonHandler(a.decodeProgram))
m.Handle("/compile", jsonHandler(a.compileEquity))
}
// POST /get-unconfirmed-transaction
-func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
- TxID chainjson.HexBytes `json:"tx_id"`
-}) Response {
- var tmpTxID [32]byte
- copy(tmpTxID[:], filter.TxID[:])
-
- txHash := bc.NewHash(tmpTxID)
- txPool := a.chain.GetTxPool()
- txDesc, err := txPool.GetTransaction(&txHash)
- if err != nil {
- return NewErrorResponse(err)
- }
-
- tx := &BlockTx{
- ID: txDesc.Tx.ID,
- Version: txDesc.Tx.Version,
- Size: txDesc.Tx.SerializedSize,
- TimeRange: txDesc.Tx.TimeRange,
- Inputs: []*query.AnnotatedInput{},
- Outputs: []*query.AnnotatedOutput{},
- StatusFail: false,
- }
-
- for i := range txDesc.Tx.Inputs {
- tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i)))
- }
- for i := range txDesc.Tx.Outputs {
- tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i))
- }
-
- return NewSuccessResponse(tx)
-}
+//func (a *API) getUnconfirmedTx(ctx context.Context, filter struct {
+// TxID chainjson.HexBytes `json:"tx_id"`
+//}) Response {
+// var tmpTxID [32]byte
+// copy(tmpTxID[:], filter.TxID[:])
+//
+// txHash := bc.NewHash(tmpTxID)
+// txPool := a.chain.GetTxPool()
+// txDesc, err := txPool.GetTransaction(&txHash)
+// if err != nil {
+// return NewErrorResponse(err)
+// }
+//
+// tx := &BlockTx{
+// ID: txDesc.Tx.ID,
+// Version: txDesc.Tx.Version,
+// Size: txDesc.Tx.SerializedSize,
+// TimeRange: txDesc.Tx.TimeRange,
+// Inputs: []*query.AnnotatedInput{},
+// Outputs: []*query.AnnotatedOutput{},
+// StatusFail: false,
+// }
+//
+// for i := range txDesc.Tx.Inputs {
+// tx.Inputs = append(tx.Inputs, a.wallet.BuildAnnotatedInput(txDesc.Tx, uint32(i)))
+// }
+// for i := range txDesc.Tx.Outputs {
+// tx.Outputs = append(tx.Outputs, a.wallet.BuildAnnotatedOutput(txDesc.Tx, i))
+// }
+//
+// return NewSuccessResponse(tx)
+//}
type unconfirmedTxsResp struct {
Total uint64 `json:"total"`
}
// POST /list-unconfirmed-transactions
-func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
- txIDs := []bc.Hash{}
-
- txPool := a.chain.GetTxPool()
- txs := txPool.GetTransactions()
- for _, txDesc := range txs {
- txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID))
- }
-
- return NewSuccessResponse(&unconfirmedTxsResp{
- Total: uint64(len(txIDs)),
- TxIDs: txIDs,
- })
-}
+//func (a *API) listUnconfirmedTxs(ctx context.Context) Response {
+// txIDs := []bc.Hash{}
+//
+// txPool := a.chain.GetTxPool()
+// txs := txPool.GetTransactions()
+// for _, txDesc := range txs {
+// txIDs = append(txIDs, bc.Hash(txDesc.Tx.ID))
+// }
+//
+// return NewSuccessResponse(&unconfirmedTxsResp{
+// Total: uint64(len(txIDs)),
+// TxIDs: txIDs,
+// })
+//}
// RawTx is the tx struct for getRawTransaction
type RawTx struct {
"github.com/bytom/consensus"
"github.com/bytom/p2p"
"github.com/bytom/p2p/discover"
- core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/version"
GetHeaderByHeight(uint64) (*types.BlockHeader, error)
InMainChain(bc.Hash) bool
ProcessBlock(*types.Block) (bool, error)
- ValidateTx(*types.Tx) (bool, error)
}
//SyncManager Sync Manager is responsible for the business layer information synchronization
sw *p2p.Switch
genesisHash bc.Hash
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain Chain
- txPool *core.TxPool
- blockKeeper *blockKeeper
- peers *peerSet
+ privKey crypto.PrivKeyEd25519 // local node's p2p key
+ chain Chain
+ blockKeeper *blockKeeper
+ peers *peerSet
newTxCh chan *types.Tx
+ txNotifyCh chan *types.Tx
newBlockCh chan *bc.Hash
newAddrCh chan *account.CtrlProgram
spvAddresses []*account.CtrlProgram
}
//NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
genesisHeader, err := chain.GetHeaderByHeight(0)
if err != nil {
return nil, err
sw := p2p.NewSwitch(config)
peers := newPeerSet(sw)
manager := &SyncManager{
- sw: sw,
- genesisHash: genesisHeader.Hash(),
- txPool: txPool,
- chain: chain,
- privKey: crypto.GenPrivKeyEd25519(),
- blockKeeper: newBlockKeeper(chain, peers),
- peers: peers,
- newTxCh: make(chan *types.Tx, maxTxChanSize),
- newBlockCh: newBlockCh,
- txSyncCh: make(chan *txSyncMsg),
- quitSync: make(chan struct{}),
- config: config,
- newAddrCh: wallet.AccountMgr.NewAddrCh,
+ sw: sw,
+ genesisHash: genesisHeader.Hash(),
+ chain: chain,
+ privKey: crypto.GenPrivKeyEd25519(),
+ blockKeeper: newBlockKeeper(chain, peers),
+ peers: peers,
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
+ txNotifyCh: wallet.GetTxCh(),
+ newBlockCh: newBlockCh,
+ txSyncCh: make(chan *txSyncMsg),
+ quitSync: make(chan struct{}),
+ config: config,
+ newAddrCh: wallet.AccountMgr.NewAddrCh,
}
manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
protocolReactor := NewProtocolReactor(manager, manager.peers)
sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
return
}
-
- if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
- sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
- }
+ sm.txNotifyCh <- tx
}
func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
}
switch msg := msg.(type) {
- //case *GetBlockMessage:
- // sm.handleGetBlockMsg(peer, msg)
- //
- //case *BlockMessage:
- // sm.handleBlockMsg(peer, msg)
-
case *StatusRequestMessage:
sm.handleStatusRequestMsg(basePeer)
case *TransactionMessage:
sm.handleTransactionMsg(peer, msg)
- //case *MineBlockMessage:
- // sm.handleMineBlockMsg(peer, msg)
-
- //case *GetHeadersMessage:
- // sm.handleGetHeadersMsg(peer, msg)
-
case *HeadersMessage:
sm.handleHeadersMsg(peer, msg)
- //case *GetBlocksMessage:
- // sm.handleGetBlocksMsg(peer, msg)
-
- //case *BlocksMessage:
- // sm.handleBlocksMsg(peer, msg)
-
case *MerkleBlockMessage:
sm.handleMerkelBlockMsg(peer, msg)
txs []*types.Tx
}
-func (sm *SyncManager) syncTransactions(peerID string) {
- pending := sm.txPool.GetTransactions()
- if len(pending) == 0 {
- return
- }
-
- txs := make([]*types.Tx, len(pending))
- for i, batch := range pending {
- txs[i] = batch.Tx
- }
- sm.txSyncCh <- &txSyncMsg{peerID, txs}
-}
-
func (sm *SyncManager) txBroadcastLoop() {
for {
select {
case newTx := <-sm.newTxCh:
+ sm.txNotifyCh <- newTx
if err := sm.peers.broadcastTx(newTx); err != nil {
log.Errorf("Broadcast new tx error. %v", err)
return
"github.com/bytom/consensus"
"github.com/bytom/database/leveldb"
"github.com/bytom/env"
- "github.com/bytom/mining/cpuminer"
- "github.com/bytom/mining/miningpool"
"github.com/bytom/mining/tensority"
"github.com/bytom/netsync"
"github.com/bytom/protocol"
syncManager *netsync.SyncManager
- //bcReactor *bc.BlockchainReactor
wallet *w.Wallet
accessTokens *accesstoken.CredentialStore
api *api.API
chain *protocol.Chain
txfeed *txfeed.Tracker
- cpuMiner *cpuminer.CPUMiner
- miningPool *miningpool.MiningPool
miningEnable bool
}
tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
accessTokens := accesstoken.NewStore(tokenDB)
- txPool := protocol.NewTxPool(store)
- chain, err := protocol.NewChain(store, txPool)
+ chain, err := protocol.NewChain(store)
if err != nil {
cmn.Exit(cmn.Fmt("Failed to create chain structure: %v", err))
}
cmn.Exit(cmn.Fmt("initialize HSM failed: %v", err))
}
- if !config.Wallet.Disable {
- walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
- accounts = account.NewManager(walletDB, chain)
- assets = asset.NewRegistry(walletDB, chain)
- wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
- if err != nil {
- log.WithField("error", err).Error("init NewWallet")
- }
+ walletDB := dbm.NewDB("wallet", config.DBBackend, config.DBDir())
+ accounts = account.NewManager(walletDB, chain)
+ assets = asset.NewRegistry(walletDB, chain)
+ wallet, err = w.NewWallet(walletDB, accounts, assets, hsm, chain)
+ if err != nil {
+ log.WithField("error", err).Error("init NewWallet")
+ }
- // trigger rescan wallet
- if config.Wallet.Rescan {
- wallet.RescanBlocks()
- }
+ // trigger rescan wallet
+ if config.Wallet.Rescan {
+ wallet.RescanBlocks()
}
- newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
- syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh, wallet)
+ newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
- // get transaction from txPool and send it to syncManager and wallet
- go newPoolTxListener(txPool, syncManager, wallet)
+ syncManager, _ := netsync.NewSyncManager(config, chain, newBlockCh, wallet)
// run the profile server
profileHost := config.ProfListenAddress
miningEnable: config.Mining,
}
- node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
- node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
-
node.BaseService = *cmn.NewBaseService(nil, "Node", node)
if config.Simd.Enable {
return node
}
-// newPoolTxListener listener transaction from txPool, and send it to syncManager and wallet
-func newPoolTxListener(txPool *protocol.TxPool, syncManager *netsync.SyncManager, wallet *w.Wallet) {
- txMsgCh := txPool.GetMsgCh()
- syncManagerTxCh := syncManager.GetNewTxCh()
-
- for {
- msg := <-txMsgCh
- switch msg.MsgType {
- case protocol.MsgNewTx:
- syncManagerTxCh <- msg.Tx
- if wallet != nil {
- wallet.AddUnconfirmedTx(msg.TxDesc)
- }
- case protocol.MsgRemoveTx:
- if wallet != nil {
- wallet.RemoveUnconfirmedTx(msg.TxDesc)
- }
- default:
- log.Warn("got unknow message type from the txPool channel")
- }
- }
-}
-
// Lock data directory after daemonization
func lockDataDirectory(config *cfg.Config) error {
_, _, err := flock.New(filepath.Join(config.RootDir, "LOCK"))
}
func (n *Node) initAndstartApiServer() {
- n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
+ n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.chain, n.config, n.accessTokens)
listenAddr := env.String("LISTEN", n.config.ApiAddress)
env.Parse()
}
func (n *Node) OnStart() error {
- if n.miningEnable {
- if _, err := n.wallet.AccountMgr.GetMiningAddress(); err != nil {
- n.miningEnable = false
- log.Error(err)
- } else {
- n.cpuMiner.Start()
- }
- }
if !n.config.VaultMode {
n.syncManager.Start()
}
func (n *Node) OnStop() {
n.BaseService.OnStop()
- if n.miningEnable {
- n.cpuMiner.Stop()
- }
if !n.config.VaultMode {
n.syncManager.Stop()
}
func (n *Node) SyncManager() *netsync.SyncManager {
return n.syncManager
}
-
-func (n *Node) MiningPool() *miningpool.MiningPool {
- return n.miningPool
-}
type Chain struct {
index *state.BlockIndex
orphanManage *OrphanManage
- txPool *TxPool
store Store
processBlockCh chan *processBlockMsg
}
// NewChain returns a new Chain using store as the underlying storage.
-func NewChain(store Store, txPool *TxPool) (*Chain, error) {
+func NewChain(store Store) (*Chain, error) {
c := &Chain{
orphanManage: NewOrphanManage(),
- txPool: txPool,
store: store,
processBlockCh: make(chan *processBlockMsg, maxProcessBlockChSize),
}
return ch
}
-// GetTxPool return chain txpool.
-func (c *Chain) GetTxPool() *TxPool {
- return c.txPool
-}
import (
"github.com/bytom/errors"
"github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
- "github.com/bytom/protocol/state"
- "github.com/bytom/protocol/validation"
)
// ErrBadTx is returned for transactions failing validation
func (c *Chain) GetTransactionStatus(hash *bc.Hash) (*bc.TransactionStatus, error) {
return c.store.GetTransactionStatus(hash)
}
-
-// GetTransactionsUtxo return all the utxos that related to the txs' inputs
-func (c *Chain) GetTransactionsUtxo(view *state.UtxoViewpoint, txs []*bc.Tx) error {
- return c.store.GetTransactionsUtxo(view, txs)
-}
-
-// ValidateTx validates the given transaction. A cache holds
-// per-transaction validation results and is consulted before
-// performing full validation.
-func (c *Chain) ValidateTx(tx *types.Tx) (bool, error) {
- if ok := c.txPool.HaveTransaction(&tx.ID); ok {
- return false, c.txPool.GetErrCache(&tx.ID)
- }
-
- bh := c.BestBlockHeader()
- block := types.MapBlock(&types.Block{BlockHeader: *bh})
- gasStatus, err := validation.ValidateTx(tx.Tx, block)
- if gasStatus.GasValid == false {
- c.txPool.AddErrCache(&tx.ID, err)
- return false, err
- }
-
- return c.txPool.ProcessTransaction(tx, err != nil, block.BlockHeader.Height, gasStatus.BTMValue)
-}
+++ /dev/null
-package protocol
-
-import (
- "errors"
- "sync"
- "sync/atomic"
- "time"
-
- "github.com/golang/groupcache/lru"
- log "github.com/sirupsen/logrus"
-
- "github.com/bytom/consensus"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
- "github.com/bytom/protocol/state"
-)
-
-// msg type
-const (
- MsgNewTx = iota
- MsgRemoveTx
-)
-
-var (
- maxCachedErrTxs = 1000
- maxMsgChSize = 1000
- maxNewTxNum = 10000
- maxOrphanNum = 2000
-
- orphanTTL = 10 * time.Minute
- orphanExpireScanInterval = 3 * time.Minute
-
- // ErrTransactionNotExist is the pre-defined error message
- ErrTransactionNotExist = errors.New("transaction are not existed in the mempool")
- // ErrPoolIsFull indicates the pool is full
- ErrPoolIsFull = errors.New("transaction pool reach the max number")
-)
-
-// TxDesc store tx and related info for mining strategy
-type TxDesc struct {
- Tx *types.Tx
- Added time.Time
- StatusFail bool
- Height uint64
- Weight uint64
- Fee uint64
-}
-
-// TxPoolMsg is use for notify pool changes
-type TxPoolMsg struct {
- *TxDesc
- MsgType int
-}
-
-type orphanTx struct {
- *TxDesc
- expiration time.Time
-}
-
-// TxPool is use for store the unconfirmed transaction
-type TxPool struct {
- lastUpdated int64
- mtx sync.RWMutex
- store Store
- pool map[bc.Hash]*TxDesc
- utxo map[bc.Hash]*types.Tx
- orphans map[bc.Hash]*orphanTx
- orphansByPrev map[bc.Hash]map[bc.Hash]*orphanTx
- errCache *lru.Cache
- msgCh chan *TxPoolMsg
-}
-
-// NewTxPool init a new TxPool
-func NewTxPool(store Store) *TxPool {
- tp := &TxPool{
- lastUpdated: time.Now().Unix(),
- store: store,
- pool: make(map[bc.Hash]*TxDesc),
- utxo: make(map[bc.Hash]*types.Tx),
- orphans: make(map[bc.Hash]*orphanTx),
- orphansByPrev: make(map[bc.Hash]map[bc.Hash]*orphanTx),
- errCache: lru.New(maxCachedErrTxs),
- msgCh: make(chan *TxPoolMsg, maxMsgChSize),
- }
- go tp.orphanExpireWorker()
- return tp
-}
-
-// AddErrCache add a failed transaction record to lru cache
-func (tp *TxPool) AddErrCache(txHash *bc.Hash, err error) {
- tp.mtx.Lock()
- defer tp.mtx.Unlock()
-
- tp.errCache.Add(txHash, err)
-}
-
-// ExpireOrphan expire all the orphans that before the input time range
-func (tp *TxPool) ExpireOrphan(now time.Time) {
- tp.mtx.Lock()
- defer tp.mtx.Unlock()
-
- for hash, orphan := range tp.orphans {
- if orphan.expiration.Before(now) {
- tp.removeOrphan(&hash)
- }
- }
-}
-
-// GetErrCache return the error of the transaction
-func (tp *TxPool) GetErrCache(txHash *bc.Hash) error {
- tp.mtx.Lock()
- defer tp.mtx.Unlock()
-
- v, ok := tp.errCache.Get(txHash)
- if !ok {
- return nil
- }
- return v.(error)
-}
-
-// GetMsgCh return a unconfirmed transaction feed channel
-func (tp *TxPool) GetMsgCh() <-chan *TxPoolMsg {
- return tp.msgCh
-}
-
-// RemoveTransaction remove a transaction from the pool
-func (tp *TxPool) RemoveTransaction(txHash *bc.Hash) {
- tp.mtx.Lock()
- defer tp.mtx.Unlock()
-
- txD, ok := tp.pool[*txHash]
- if !ok {
- return
- }
-
- for _, output := range txD.Tx.ResultIds {
- delete(tp.utxo, *output)
- }
- delete(tp.pool, *txHash)
-
- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgRemoveTx}
- log.WithField("tx_id", txHash).Debug("remove tx from mempool")
-}
-
-// GetTransaction return the TxDesc by hash
-func (tp *TxPool) GetTransaction(txHash *bc.Hash) (*TxDesc, error) {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
-
- if txD, ok := tp.pool[*txHash]; ok {
- return txD, nil
- }
- return nil, ErrTransactionNotExist
-}
-
-// GetTransactions return all the transactions in the pool
-func (tp *TxPool) GetTransactions() []*TxDesc {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
-
- txDs := make([]*TxDesc, len(tp.pool))
- i := 0
- for _, desc := range tp.pool {
- txDs[i] = desc
- i++
- }
- return txDs
-}
-
-// IsTransactionInPool check wheather a transaction in pool or not
-func (tp *TxPool) IsTransactionInPool(txHash *bc.Hash) bool {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
-
- _, ok := tp.pool[*txHash]
- return ok
-}
-
-// IsTransactionInErrCache check wheather a transaction in errCache or not
-func (tp *TxPool) IsTransactionInErrCache(txHash *bc.Hash) bool {
- tp.mtx.RLock()
- defer tp.mtx.RUnlock()
-
- _, ok := tp.errCache.Get(txHash)
- return ok
-}
-
-// HaveTransaction IsTransactionInErrCache check is transaction in errCache or pool
-func (tp *TxPool) HaveTransaction(txHash *bc.Hash) bool {
- return tp.IsTransactionInPool(txHash) || tp.IsTransactionInErrCache(txHash)
-}
-
-// ProcessTransaction is the main entry for txpool handle new tx
-func (tp *TxPool) ProcessTransaction(tx *types.Tx, statusFail bool, height, fee uint64) (bool, error) {
- tp.mtx.Lock()
- defer tp.mtx.Unlock()
-
- txD := &TxDesc{
- Tx: tx,
- StatusFail: statusFail,
- Weight: tx.SerializedSize,
- Height: height,
- Fee: fee,
- }
- requireParents, err := tp.checkOrphanUtxos(tx)
- if err != nil {
- return false, err
- }
-
- if len(requireParents) > 0 {
- return true, tp.addOrphan(txD, requireParents)
- }
-
- if err := tp.addTransaction(txD); err != nil {
- return false, err
- }
-
- tp.processOrphans(txD)
- return false, nil
-}
-
-func (tp *TxPool) addOrphan(txD *TxDesc, requireParents []*bc.Hash) error {
- if len(tp.orphans) >= maxOrphanNum {
- return ErrPoolIsFull
- }
-
- orphan := &orphanTx{txD, time.Now().Add(orphanTTL)}
- tp.orphans[txD.Tx.ID] = orphan
- for _, hash := range requireParents {
- if _, ok := tp.orphansByPrev[*hash]; !ok {
- tp.orphansByPrev[*hash] = make(map[bc.Hash]*orphanTx)
- }
- tp.orphansByPrev[*hash][txD.Tx.ID] = orphan
- }
- return nil
-}
-
-func (tp *TxPool) addTransaction(txD *TxDesc) error {
- if len(tp.pool) >= maxNewTxNum {
- return ErrPoolIsFull
- }
-
- tx := txD.Tx
- txD.Added = time.Now()
- tp.pool[tx.ID] = txD
- for _, id := range tx.ResultIds {
- output, err := tx.Output(*id)
- if err != nil {
- // error due to it's a retirement, utxo doesn't care this output type so skip it
- continue
- }
- if !txD.StatusFail || *output.Source.Value.AssetId == *consensus.BTMAssetID {
- tp.utxo[*id] = tx
- }
- }
-
- atomic.StoreInt64(&tp.lastUpdated, time.Now().Unix())
- tp.msgCh <- &TxPoolMsg{TxDesc: txD, MsgType: MsgNewTx}
- log.WithField("tx_id", tx.ID.String()).Debug("Add tx to mempool")
- return nil
-}
-
-func (tp *TxPool) checkOrphanUtxos(tx *types.Tx) ([]*bc.Hash, error) {
- view := state.NewUtxoViewpoint()
- if err := tp.store.GetTransactionsUtxo(view, []*bc.Tx{tx.Tx}); err != nil {
- return nil, err
- }
-
- hashes := []*bc.Hash{}
- for _, hash := range tx.SpentOutputIDs {
- if !view.CanSpend(&hash) && tp.utxo[hash] == nil {
- hashes = append(hashes, &hash)
- }
- }
- return hashes, nil
-}
-
-func (tp *TxPool) orphanExpireWorker() {
- ticker := time.NewTicker(orphanExpireScanInterval)
- for now := range ticker.C {
- tp.ExpireOrphan(now)
- }
-}
-
-func (tp *TxPool) processOrphans(txD *TxDesc) {
- processOrphans := []*orphanTx{}
- addRely := func(tx *types.Tx) {
- for _, outHash := range tx.ResultIds {
- orphans, ok := tp.orphansByPrev[*outHash]
- if !ok {
- continue
- }
-
- for _, orphan := range orphans {
- processOrphans = append(processOrphans, orphan)
- }
- delete(tp.orphansByPrev, *outHash)
- }
- }
-
- addRely(txD.Tx)
- for ; len(processOrphans) > 0; processOrphans = processOrphans[1:] {
- processOrphan := processOrphans[0]
- requireParents, err := tp.checkOrphanUtxos(processOrphan.Tx)
- if err != nil {
- log.WithField("err", err).Error("processOrphans got unexpect error")
- continue
- }
-
- if len(requireParents) == 0 {
- addRely(processOrphan.Tx)
- tp.removeOrphan(&processOrphan.Tx.ID)
- tp.addTransaction(processOrphan.TxDesc)
- }
- }
-}
-
-func (tp *TxPool) removeOrphan(hash *bc.Hash) {
- orphan, ok := tp.orphans[*hash]
- if !ok {
- return
- }
-
- for _, spend := range orphan.Tx.SpentOutputIDs {
- orphans, ok := tp.orphansByPrev[spend]
- if !ok {
- continue
- }
-
- if delete(orphans, *hash); len(orphans) == 0 {
- delete(tp.orphansByPrev, spend)
- }
- }
- delete(tp.orphans, *hash)
-}
"github.com/bytom/account"
"github.com/bytom/blockchain/query"
"github.com/bytom/crypto/sha3pool"
- "github.com/bytom/protocol"
"github.com/bytom/protocol/bc/types"
)
func (a SortByTimestamp) Less(i, j int) bool { return a[i].Timestamp > a[j].Timestamp }
// AddUnconfirmedTx handle wallet status update when tx add into txpool
-func (w *Wallet) AddUnconfirmedTx(txD *protocol.TxDesc) {
- if err := w.saveUnconfirmedTx(txD.Tx); err != nil {
+func (w *Wallet) AddUnconfirmedTx(tx *types.Tx) {
+ if err := w.saveUnconfirmedTx(tx); err != nil {
log.WithField("err", err).Error("wallet fail on saveUnconfirmedTx")
}
-
- utxos := txOutToUtxos(txD.Tx, txD.StatusFail, 0)
- utxos = w.filterAccountUtxo(utxos)
- w.AccountMgr.AddUnconfirmedUtxo(utxos)
}
// GetUnconfirmedTxs get account unconfirmed transactions, filter transactions by accountID when accountID is not empty
}
// RemoveUnconfirmedTx handle wallet status update when tx removed from txpool
-func (w *Wallet) RemoveUnconfirmedTx(txD *protocol.TxDesc) {
- w.AccountMgr.RemoveUnconfirmedUtxo(txD.Tx.ResultIds)
+func (w *Wallet) RemoveUnconfirmedTx(tx *types.Tx) {
+ w.AccountMgr.RemoveUnconfirmedUtxo(tx.ResultIds)
}
func (w *Wallet) buildAnnotatedUnconfirmedTx(tx *types.Tx) *query.AnnotatedTx {
}
}
}
+
+// newTxListener listener transaction from txPool, and send it to syncManager and wallet
+func (w *Wallet) newTxListener() {
+ for {
+ tx := <-w.newTxCh
+ w.AddUnconfirmedTx(tx)
+ }
+}
const (
//SINGLE single sign
SINGLE = 1
+ //channel size for notifying tx msg
+ NewTxChSize = 1024
)
var walletKey = []byte("walletInfo")
Hsm *pseudohsm.HSM
chain *protocol.Chain
rescanCh chan struct{}
+ newTxCh chan *types.Tx
}
//NewWallet return a new wallet instance
chain: chain,
Hsm: hsm,
rescanCh: make(chan struct{}, 1),
+ newTxCh: make(chan *types.Tx, NewTxChSize),
}
if err := w.loadWalletInfo(); err != nil {
}
go w.walletUpdater()
+ go w.newTxListener()
go w.delUnconfirmedTx()
return w, nil
}
return w.status
}
+
+func (w *Wallet) GetTxCh() chan *types.Tx {
+ return w.newTxCh
+}