9 //"github.com/bytom/blockchain/fetch"
10 "github.com/bytom/blockchain/txbuilder"
11 chainjson "github.com/bytom/encoding/json"
12 "github.com/bytom/errors"
13 //"github.com/bytom/log"
14 "github.com/bytom/net/http/httperror"
15 "github.com/bytom/net/http/reqid"
16 //"github.com/bytom/protocol/bc"
17 "github.com/bytom/protocol/bc/legacy"
20 var defaultTxTTL = 5 * time.Minute
22 func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
23 var decoder func([]byte) (txbuilder.Action, error)
25 case "control_account":
26 decoder = a.accounts.DecodeControlAction
27 case "control_program":
28 decoder = txbuilder.DecodeControlProgramAction
29 case "control_receiver":
30 decoder = txbuilder.DecodeControlReceiverAction
32 decoder = a.assets.DecodeIssueAction
34 decoder = txbuilder.DecodeRetireAction
36 decoder = a.accounts.DecodeSpendAction
37 case "spend_account_unspent_output":
38 decoder = a.accounts.DecodeSpendUTXOAction
39 case "set_transaction_reference_data":
40 decoder = txbuilder.DecodeSetTxRefDataAction
47 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
48 err := a.filterAliases(ctx, req)
52 actions := make([]txbuilder.Action, 0, len(req.Actions))
53 for i, act := range req.Actions {
54 typ, ok := act["type"].(string)
56 return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
58 decoder, ok := a.actionDecoder(typ)
60 return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
63 // Remarshal to JSON, the action may have been modified when we
65 b, err := json.Marshal(act)
71 return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
73 actions = append(actions, a)
76 ttl := req.TTL.Duration
80 maxTime := time.Now().Add(ttl)
81 tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
82 if errors.Root(err) == txbuilder.ErrAction {
83 // Format each of the inner errors contained in the data.
84 var formattedErrs []httperror.Response
85 for _, innerErr := range errors.Data(err)["actions"].([]error) {
86 resp := errorFormatter.Format(innerErr)
87 formattedErrs = append(formattedErrs, resp)
89 err = errors.WithData(err, "actions", formattedErrs)
95 // ensure null is never returned for signing instructions
96 if tpl.SigningInstructions == nil {
97 tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
102 // POST /build-transaction
103 func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*BuildRequest) (interface{}, error) {
104 responses := make([]interface{}, len(buildReqs))
105 var wg sync.WaitGroup
106 wg.Add(len(responses))
108 for i := 0; i < len(responses); i++ {
110 subctx := reqid.NewSubContext(ctx, reqid.New())
112 defer batchRecover(subctx, &responses[i])
114 tmpl, err := a.buildSingle(subctx, buildReqs[i])
124 return responses, nil
127 func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
128 if tpl.Transaction == nil {
129 return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
132 err := a.finalizeTxWait(ctx, tpl, waitUntil)
134 return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
137 return map[string]string{"id": tpl.Transaction.ID.String()}, nil
141 // recordSubmittedTx records a lower bound height at which the tx
142 // was first submitted to the tx pool. If this request fails for
143 // some reason, a retry will know to look for the transaction in
144 // blocks starting at this height.
146 // If the tx has already been submitted, it returns the existing
148 func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
150 INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
151 ON CONFLICT DO NOTHING
153 res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
157 inserted, err := res.RowsAffected()
162 return currentHeight, nil
165 // The insert didn't affect any rows, meaning there was already an entry
166 // for this transaction hash.
168 SELECT height FROM submitted_txs WHERE tx_hash = $1
171 err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
177 // cleanUpSubmittedTxs will periodically delete records of submitted txs
178 // older than a day. This function blocks and only exits when its context
180 func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
181 ticker := time.NewTicker(15 * time.Minute)
185 // TODO(jackson): We could avoid expensive bulk deletes by partitioning
186 // the table and DROP-ing tables of expired rows. Partitioning doesn't
187 // play well with ON CONFLICT clauses though, so we would need to rework
188 // how we guarantee uniqueness.
189 const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
190 _, err := db.Exec(ctx, q)
202 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
203 // the transaction. A nil error return means the transaction is
204 // confirmed on the blockchain. ErrRejected means a conflicting tx is
205 // on the blockchain. context.DeadlineExceeded means ctx is an
206 // expiring context that timed out.
207 func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
208 // Use the current generator height as the lower bound of the block height
209 // that the transaction may appear in.
210 localHeight := a.chain.Height()
211 generatorHeight := localHeight
213 // Remember this height in case we retry this submit call.
214 /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
216 return errors.Wrap(err, "saving tx submitted height")
219 err := txbuilder.FinalizeTx(ctx, a.chain, a.submitter, txTemplate.Transaction)
223 if waitUntil == "none" {
227 _, err = a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
231 if waitUntil == "confirmed" {
239 case <-a.pinStore.AllWaiter(height):
246 func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
253 case <-a.chain.BlockWaiter(height):
254 b, err := a.chain.GetBlock(height)
256 return 0, errors.Wrap(err, "getting block that just landed")
258 for _, confirmed := range b.Transactions {
259 if confirmed.ID == tx.ID {
265 if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
266 return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
269 // might still be in pool or might be rejected; we can't
270 // tell definitively until its max time elapses.
272 // Re-insert into the pool in case it was dropped.
273 err = txbuilder.FinalizeTx(ctx, a.chain, a.submitter, tx)
278 // TODO(jackson): Do simple rejection checks like checking if
279 // the tx's blockchain prevouts still exist in the state tree.
284 type submitArg struct {
285 Transactions []txbuilder.Template
286 wait chainjson.Duration
287 WaitUntil string `json:"wait_until"` // values none, confirmed, processed. default: processed
290 // POST /submit-transaction
291 func (a *BlockchainReactor) submit(ctx context.Context, x submitArg) (interface{}, error) {
292 // Setup a timeout for the provided wait duration.
293 timeout := x.wait.Duration
295 timeout = 30 * time.Second
297 ctx, cancel := context.WithTimeout(ctx, timeout)
300 responses := make([]interface{}, len(x.Transactions))
301 var wg sync.WaitGroup
302 wg.Add(len(responses))
303 for i := range responses {
305 subctx := reqid.NewSubContext(ctx, reqid.New())
307 defer batchRecover(subctx, &responses[i])
309 tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
319 return responses, nil