import (
"context"
- "encoding/json"
- "sync"
- "time"
- //"github.com/bytom/blockchain/fetch"
+ log "github.com/sirupsen/logrus"
+
"github.com/bytom/blockchain/txbuilder"
- chainjson "github.com/bytom/encoding/json"
"github.com/bytom/errors"
- //"github.com/bytom/log"
- "github.com/bytom/net/http/httperror"
- "github.com/bytom/net/http/reqid"
- //"github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/legacy"
+ "github.com/bytom/protocol/bc/types"
)
-var defaultTxTTL = 5 * time.Minute
-
-func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
- var decoder func([]byte) (txbuilder.Action, error)
- switch action {
- case "control_account":
- decoder = a.accounts.DecodeControlAction
- case "control_program":
- decoder = txbuilder.DecodeControlProgramAction
- case "control_receiver":
- decoder = txbuilder.DecodeControlReceiverAction
- case "issue":
- decoder = a.assets.DecodeIssueAction
- case "retire":
- decoder = txbuilder.DecodeRetireAction
- case "spend_account":
- decoder = a.accounts.DecodeSpendAction
- case "spend_account_unspent_output":
- decoder = a.accounts.DecodeSpendUTXOAction
- case "set_transaction_reference_data":
- decoder = txbuilder.DecodeSetTxRefDataAction
- default:
- return nil, false
- }
- return decoder, true
-}
-
-func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
- err := a.filterAliases(ctx, req)
- if err != nil {
- return nil, err
- }
- actions := make([]txbuilder.Action, 0, len(req.Actions))
- for i, act := range req.Actions {
- typ, ok := act["type"].(string)
- if !ok {
- return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
- }
- decoder, ok := a.actionDecoder(typ)
- if !ok {
- return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
- }
-
- // Remarshal to JSON, the action may have been modified when we
- // filtered aliases.
- b, err := json.Marshal(act)
- if err != nil {
- return nil, err
- }
- a, err := decoder(b)
- if err != nil {
- return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
- }
- actions = append(actions, a)
- }
-
- ttl := req.TTL.Duration
- if ttl == 0 {
- ttl = defaultTxTTL
- }
- maxTime := time.Now().Add(ttl)
- tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
- if errors.Root(err) == txbuilder.ErrAction {
- // Format each of the inner errors contained in the data.
- var formattedErrs []httperror.Response
- for _, innerErr := range errors.Data(err)["actions"].([]error) {
- resp := errorFormatter.Format(innerErr)
- formattedErrs = append(formattedErrs, resp)
- }
- err = errors.WithData(err, "actions", formattedErrs)
- }
- if err != nil {
- return nil, err
- }
-
- // ensure null is never returned for signing instructions
- if tpl.SigningInstructions == nil {
- tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
- }
- return tpl, nil
-}
-
-// POST /build-transaction
-func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*BuildRequest) (interface{}, error) {
- responses := make([]interface{}, len(buildReqs))
- var wg sync.WaitGroup
- wg.Add(len(responses))
-
- for i := 0; i < len(responses); i++ {
- go func(i int) {
- subctx := reqid.NewSubContext(ctx, reqid.New())
- defer wg.Done()
- defer batchRecover(subctx, &responses[i])
-
- tmpl, err := a.buildSingle(subctx, buildReqs[i])
- if err != nil {
- responses[i] = err
- } else {
- responses[i] = tmpl
- }
- }(i)
- }
-
- wg.Wait()
- return responses, nil
-}
-
-func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
- if tpl.Transaction == nil {
- return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
- }
-
- err := a.finalizeTxWait(ctx, tpl, waitUntil)
- if err != nil {
- return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
- }
-
- return map[string]string{"id": tpl.Transaction.ID.String()}, nil
-}
-
-/*
-// recordSubmittedTx records a lower bound height at which the tx
-// was first submitted to the tx pool. If this request fails for
-// some reason, a retry will know to look for the transaction in
-// blocks starting at this height.
-//
-// If the tx has already been submitted, it returns the existing
-// height.
-func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
- const insertQ = `
- INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
- ON CONFLICT DO NOTHING
- `
- res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
- if err != nil {
- return 0, err
- }
- inserted, err := res.RowsAffected()
- if err != nil {
- return 0, err
- }
- if inserted == 1 {
- return currentHeight, nil
- }
-
- // The insert didn't affect any rows, meaning there was already an entry
- // for this transaction hash.
- const selectQ = `
- SELECT height FROM submitted_txs WHERE tx_hash = $1
- `
- var height uint64
- err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
- return height, err
-}
-*/
-
-/*
-// cleanUpSubmittedTxs will periodically delete records of submitted txs
-// older than a day. This function blocks and only exits when its context
-// is cancelled.
-func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
- ticker := time.NewTicker(15 * time.Minute)
- for {
- select {
- case <-ticker.C:
- // TODO(jackson): We could avoid expensive bulk deletes by partitioning
- // the table and DROP-ing tables of expired rows. Partitioning doesn't
- // play well with ON CONFLICT clauses though, so we would need to rework
- // how we guarantee uniqueness.
- const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
- _, err := db.Exec(ctx, q)
- if err != nil {
- log.Error(ctx, err)
- }
- case <-ctx.Done():
- ticker.Stop()
- return
- }
- }
-}
-*/
-
// finalizeTxWait calls FinalizeTx and then waits for confirmation of
// the transaction. A nil error return means the transaction is
// confirmed on the blockchain. ErrRejected means a conflicting tx is
// on the blockchain. context.DeadlineExceeded means ctx is an
// expiring context that timed out.
-func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
+func (bcr *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
// Use the current generator height as the lower bound of the block height
// that the transaction may appear in.
- localHeight := a.chain.Height()
- generatorHeight := localHeight
+ localHeight := bcr.chain.Height()
+ //generatorHeight := localHeight
- // Remember this height in case we retry this submit call.
- /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
- if err != nil {
- return errors.Wrap(err, "saving tx submitted height")
- }*/
+ log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
- err := txbuilder.FinalizeTx(ctx, a.chain, a.submitter, txTemplate.Transaction)
+ err := txbuilder.FinalizeTx(ctx, bcr.chain, txTemplate.Transaction)
if err != nil {
return err
}
return nil
}
- _, err = a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
+ //TODO:complete finalizeTxWait
+ //height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
if err != nil {
return err
}
if waitUntil == "confirmed" {
return nil
}
- /*
-
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-a.pinStore.AllWaiter(height):
- }
- */
return nil
}
-func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
+func (bcr *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *types.Tx, height uint64) (uint64, error) {
+ log.Printf("waitForTxInBlock function")
for {
height++
select {
case <-ctx.Done():
return 0, ctx.Err()
- case <-a.chain.BlockWaiter(height):
- b, err := a.chain.GetBlock(height)
+ case <-bcr.chain.BlockWaiter(height):
+ b, err := bcr.chain.GetBlockByHeight(height)
if err != nil {
return 0, errors.Wrap(err, "getting block that just landed")
}
}
}
- if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
- return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
- }
-
// might still be in pool or might be rejected; we can't
// tell definitively until its max time elapses.
-
// Re-insert into the pool in case it was dropped.
- err = txbuilder.FinalizeTx(ctx, a.chain, a.submitter, tx)
+ err = txbuilder.FinalizeTx(ctx, bcr.chain, tx)
if err != nil {
return 0, err
}
}
}
}
-
-type submitArg struct {
- Transactions []txbuilder.Template
- wait chainjson.Duration
- WaitUntil string `json:"wait_until"` // values none, confirmed, processed. default: processed
-}
-
-// POST /submit-transaction
-func (a *BlockchainReactor) submit(ctx context.Context, x submitArg) (interface{}, error) {
- // Setup a timeout for the provided wait duration.
- timeout := x.wait.Duration
- if timeout <= 0 {
- timeout = 30 * time.Second
- }
- ctx, cancel := context.WithTimeout(ctx, timeout)
- defer cancel()
-
- responses := make([]interface{}, len(x.Transactions))
- var wg sync.WaitGroup
- wg.Add(len(responses))
- for i := range responses {
- go func(i int) {
- subctx := reqid.NewSubContext(ctx, reqid.New())
- defer wg.Done()
- defer batchRecover(subctx, &responses[i])
-
- tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
- if err != nil {
- responses[i] = err
- } else {
- responses[i] = tx
- }
- }(i)
- }
-
- wg.Wait()
- return responses, nil
-}