9 "github.com/bytom/blockchain/txbuilder"
10 "github.com/bytom/errors"
11 "github.com/bytom/net/http/httperror"
12 "github.com/bytom/net/http/reqid"
13 "github.com/bytom/protocol/bc/legacy"
15 chainjson "github.com/bytom/encoding/json"
16 log "github.com/sirupsen/logrus"
19 var defaultTxTTL = 5 * time.Minute
21 func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
22 var decoder func([]byte) (txbuilder.Action, error)
24 case "control_account":
25 decoder = a.accounts.DecodeControlAction
26 case "control_program":
27 decoder = txbuilder.DecodeControlProgramAction
28 case "control_receiver":
29 decoder = txbuilder.DecodeControlReceiverAction
31 decoder = a.assets.DecodeIssueAction
33 decoder = txbuilder.DecodeRetireAction
35 decoder = a.accounts.DecodeSpendAction
36 case "spend_account_unspent_output":
37 decoder = a.accounts.DecodeSpendUTXOAction
38 case "set_transaction_reference_data":
39 decoder = txbuilder.DecodeSetTxRefDataAction
46 {"type": "spend", "asset_id": "%s", "amount": 100},
47 {"type": "control_account", "asset_id": "%s", "amount": 100, "account_id": "%s"}
50 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
51 err := a.filterAliases(ctx, req)
55 actions := make([]txbuilder.Action, 0, len(req.Actions))
56 for i, act := range req.Actions {
57 typ, ok := act["type"].(string)
59 return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
61 decoder, ok := a.actionDecoder(typ)
63 return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
66 // Remarshal to JSON, the action may have been modified when we
68 b, err := json.Marshal(act)
72 action, err := decoder(b)
74 return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
76 actions = append(actions, action)
79 ttl := req.TTL.Duration
83 maxTime := time.Now().Add(ttl)
85 tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
86 if errors.Root(err) == txbuilder.ErrAction {
87 // Format each of the inner errors contained in the data.
88 var formattedErrs []httperror.Response
89 for _, innerErr := range errors.Data(err)["actions"].([]error) {
90 resp := errorFormatter.Format(innerErr)
91 formattedErrs = append(formattedErrs, resp)
93 err = errors.WithData(err, "actions", formattedErrs)
99 // ensure null is never returned for signing instructions
100 if tpl.SigningInstructions == nil {
101 tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
106 // POST /build-transaction
107 func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*BuildRequest) (interface{}, error) {
108 responses := make([]interface{}, len(buildReqs))
109 var wg sync.WaitGroup
110 wg.Add(len(responses))
112 for i := 0; i < len(responses); i++ {
114 subctx := reqid.NewSubContext(ctx, reqid.New())
116 defer batchRecover(subctx, &responses[i])
118 tmpl, err := a.buildSingle(subctx, buildReqs[i])
128 return responses, nil
131 func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
132 if tpl.Transaction == nil {
133 return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
136 err := a.finalizeTxWait(ctx, tpl, waitUntil)
138 return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
141 return map[string]string{"id": tpl.Transaction.ID.String()}, nil
145 // recordSubmittedTx records a lower bound height at which the tx
146 // was first submitted to the tx pool. If this request fails for
147 // some reason, a retry will know to look for the transaction in
148 // blocks starting at this height.
150 // If the tx has already been submitted, it returns the existing
152 func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
154 INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
155 ON CONFLICT DO NOTHING
157 res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
161 inserted, err := res.RowsAffected()
166 return currentHeight, nil
169 // The insert didn't affect any rows, meaning there was already an entry
170 // for this transaction hash.
172 SELECT height FROM submitted_txs WHERE tx_hash = $1
175 err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
181 // cleanUpSubmittedTxs will periodically delete records of submitted txs
182 // older than a day. This function blocks and only exits when its context
184 func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
185 ticker := time.NewTicker(15 * time.Minute)
189 // TODO(jackson): We could avoid expensive bulk deletes by partitioning
190 // the table and DROP-ing tables of expired rows. Partitioning doesn't
191 // play well with ON CONFLICT clauses though, so we would need to rework
192 // how we guarantee uniqueness.
193 const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
194 _, err := db.Exec(ctx, q)
206 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
207 // the transaction. A nil error return means the transaction is
208 // confirmed on the blockchain. ErrRejected means a conflicting tx is
209 // on the blockchain. context.DeadlineExceeded means ctx is an
210 // expiring context that timed out.
211 func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
212 // Use the current generator height as the lower bound of the block height
213 // that the transaction may appear in.
214 localHeight := a.chain.Height()
215 generatorHeight := localHeight
217 log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
218 // Remember this height in case we retry this submit call.
219 /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
221 return errors.Wrap(err, "saving tx submitted height")
224 err := txbuilder.FinalizeTx(ctx, a.chain, txTemplate.Transaction)
228 if waitUntil == "none" {
232 height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
236 if waitUntil == "confirmed" {
243 case <-a.pinStore.AllWaiter(height):
249 func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
250 log.Printf("waitForTxInBlock function")
257 case <-a.chain.BlockWaiter(height):
258 b, err := a.chain.GetBlockByHeight(height)
260 return 0, errors.Wrap(err, "getting block that just landed")
262 for _, confirmed := range b.Transactions {
263 if confirmed.ID == tx.ID {
269 if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
270 return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
273 // might still be in pool or might be rejected; we can't
274 // tell definitively until its max time elapses.
276 // Re-insert into the pool in case it was dropped.
277 err = txbuilder.FinalizeTx(ctx, a.chain, tx)
282 // TODO(jackson): Do simple rejection checks like checking if
283 // the tx's blockchain prevouts still exist in the state tree.
288 type SubmitArg struct {
289 Transactions []txbuilder.Template
290 Wait chainjson.Duration
291 WaitUntil string `json:"wait_until"` // values none, confirmed, processed. default: processed
294 // POST /submit-transaction
295 func (a *BlockchainReactor) submit(ctx context.Context, x SubmitArg) (interface{}, error) {
296 // Setup a timeout for the provided wait duration.
297 timeout := x.Wait.Duration
299 timeout = 30 * time.Second
301 ctx, cancel := context.WithTimeout(ctx, timeout)
304 responses := make([]interface{}, len(x.Transactions))
305 var wg sync.WaitGroup
306 wg.Add(len(responses))
307 for i := range responses {
309 subctx := reqid.NewSubContext(ctx, reqid.New())
311 defer batchRecover(subctx, &responses[i])
313 tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
314 log.WithFields(log.Fields{"err": err, "tx": tx}).Info("submit single tx")
324 return responses, nil