OSDN Git Service

f98f59aad194296f6cd7e68d649cb2fd87e83b49
[bytom/bytom-spv.git] / blockchain / transact.go
1 package blockchain
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "time"
8
9         "github.com/bytom/blockchain/txbuilder"
10         "github.com/bytom/errors"
11         "github.com/bytom/net/http/httperror"
12         "github.com/bytom/net/http/reqid"
13         "github.com/bytom/protocol/bc/legacy"
14
15         chainjson "github.com/bytom/encoding/json"
16         log "github.com/sirupsen/logrus"
17 )
18
19 var defaultTxTTL = 5 * time.Minute
20
21 func (a *BlockchainReactor) actionDecoder(action string) (func([]byte) (txbuilder.Action, error), bool) {
22         var decoder func([]byte) (txbuilder.Action, error)
23         switch action {
24         case "control_account":
25                 decoder = a.accounts.DecodeControlAction
26         case "control_program":
27                 decoder = txbuilder.DecodeControlProgramAction
28         case "control_receiver":
29                 decoder = txbuilder.DecodeControlReceiverAction
30         case "issue":
31                 decoder = a.assets.DecodeIssueAction
32         case "retire":
33                 decoder = txbuilder.DecodeRetireAction
34         case "spend_account":
35                 decoder = a.accounts.DecodeSpendAction
36         case "spend_account_unspent_output":
37                 decoder = a.accounts.DecodeSpendUTXOAction
38         case "set_transaction_reference_data":
39                 decoder = txbuilder.DecodeSetTxRefDataAction
40         default:
41                 return nil, false
42         }
43         return decoder, true
44 }
45 /*              {"actions": [
46                         {"type": "spend", "asset_id": "%s", "amount": 100},
47                         {"type": "control_account", "asset_id": "%s", "amount": 100, "account_id": "%s"}
48                 ]}`
49 */
50 func (a *BlockchainReactor) buildSingle(ctx context.Context, req *BuildRequest) (*txbuilder.Template, error) {
51         err := a.filterAliases(ctx, req)
52         if err != nil {
53                 return nil, err
54         }
55         actions := make([]txbuilder.Action, 0, len(req.Actions))
56         for i, act := range req.Actions {
57                 typ, ok := act["type"].(string)
58                 if !ok {
59                         return nil, errors.WithDetailf(errBadActionType, "no action type provided on action %d", i)
60                 }
61                 decoder, ok := a.actionDecoder(typ)
62                 if !ok {
63                         return nil, errors.WithDetailf(errBadActionType, "unknown action type %q on action %d", typ, i)
64                 }
65
66                 // Remarshal to JSON, the action may have been modified when we
67                 // filtered aliases.
68                 b, err := json.Marshal(act)
69                 if err != nil {
70                         return nil, err
71                 }
72                 action, err := decoder(b)
73                 if err != nil {
74                         return nil, errors.WithDetailf(errBadAction, "%s on action %d", err.Error(), i)
75                 }
76                 actions = append(actions, action)
77         }
78
79         ttl := req.TTL.Duration
80         if ttl == 0 {
81                 ttl = defaultTxTTL
82         }
83         maxTime := time.Now().Add(ttl)
84
85         tpl, err := txbuilder.Build(ctx, req.Tx, actions, maxTime)
86         if errors.Root(err) == txbuilder.ErrAction {
87                 // Format each of the inner errors contained in the data.
88                 var formattedErrs []httperror.Response
89                 for _, innerErr := range errors.Data(err)["actions"].([]error) {
90                         resp := errorFormatter.Format(innerErr)
91                         formattedErrs = append(formattedErrs, resp)
92                 }
93                 err = errors.WithData(err, "actions", formattedErrs)
94         }
95         if err != nil {
96                 return nil, err
97         }
98
99         // ensure null is never returned for signing instructions
100         if tpl.SigningInstructions == nil {
101                 tpl.SigningInstructions = []*txbuilder.SigningInstruction{}
102         }
103         return tpl, nil
104 }
105
106 // POST /build-transaction
107 func (a *BlockchainReactor) build(ctx context.Context, buildReqs []*BuildRequest) (interface{}, error) {
108         responses := make([]interface{}, len(buildReqs))
109         var wg sync.WaitGroup
110         wg.Add(len(responses))
111
112         for i := 0; i < len(responses); i++ {
113                 go func(i int) {
114                         subctx := reqid.NewSubContext(ctx, reqid.New())
115                         defer wg.Done()
116                         defer batchRecover(subctx, &responses[i])
117
118                         tmpl, err := a.buildSingle(subctx, buildReqs[i])
119                         if err != nil {
120                                 responses[i] = err
121                         } else {
122                                 responses[i] = tmpl
123                         }
124                 }(i)
125         }
126
127         wg.Wait()
128         return responses, nil
129 }
130
131 func (a *BlockchainReactor) submitSingle(ctx context.Context, tpl *txbuilder.Template, waitUntil string) (interface{}, error) {
132         if tpl.Transaction == nil {
133                 return nil, errors.Wrap(txbuilder.ErrMissingRawTx)
134         }
135
136         err := a.finalizeTxWait(ctx, tpl, waitUntil)
137         if err != nil {
138                 return nil, errors.Wrapf(err, "tx %s", tpl.Transaction.ID.String())
139         }
140
141         return map[string]string{"id": tpl.Transaction.ID.String()}, nil
142 }
143
144 /*
145 // recordSubmittedTx records a lower bound height at which the tx
146 // was first submitted to the tx pool. If this request fails for
147 // some reason, a retry will know to look for the transaction in
148 // blocks starting at this height.
149 //
150 // If the tx has already been submitted, it returns the existing
151 // height.
152 func recordSubmittedTx(ctx context.Context, db pg.DB, txHash bc.Hash, currentHeight uint64) (uint64, error) {
153         const insertQ = `
154                 INSERT INTO submitted_txs (tx_hash, height) VALUES($1, $2)
155                 ON CONFLICT DO NOTHING
156         `
157         res, err := db.Exec(ctx, insertQ, txHash.Bytes(), currentHeight)
158         if err != nil {
159                 return 0, err
160         }
161         inserted, err := res.RowsAffected()
162         if err != nil {
163                 return 0, err
164         }
165         if inserted == 1 {
166                 return currentHeight, nil
167         }
168
169         // The insert didn't affect any rows, meaning there was already an entry
170         // for this transaction hash.
171         const selectQ = `
172                 SELECT height FROM submitted_txs WHERE tx_hash = $1
173         `
174         var height uint64
175         err = db.QueryRow(ctx, selectQ, txHash.Bytes()).Scan(&height)
176         return height, err
177 }
178 */
179
180 /*
181 // cleanUpSubmittedTxs will periodically delete records of submitted txs
182 // older than a day. This function blocks and only exits when its context
183 // is cancelled.
184 func cleanUpSubmittedTxs(ctx context.Context, db pg.DB) {
185         ticker := time.NewTicker(15 * time.Minute)
186         for {
187                 select {
188                 case <-ticker.C:
189                         // TODO(jackson): We could avoid expensive bulk deletes by partitioning
190                         // the table and DROP-ing tables of expired rows. Partitioning doesn't
191                         // play well with ON CONFLICT clauses though, so we would need to rework
192                         // how we guarantee uniqueness.
193                         const q = `DELETE FROM submitted_txs WHERE submitted_at < now() - interval '1 day'`
194                         _, err := db.Exec(ctx, q)
195                         if err != nil {
196                                 log.Error(ctx, err)
197                         }
198                 case <-ctx.Done():
199                         ticker.Stop()
200                         return
201                 }
202         }
203 }
204 */
205
206 // finalizeTxWait calls FinalizeTx and then waits for confirmation of
207 // the transaction.  A nil error return means the transaction is
208 // confirmed on the blockchain.  ErrRejected means a conflicting tx is
209 // on the blockchain.  context.DeadlineExceeded means ctx is an
210 // expiring context that timed out.
211 func (a *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
212         // Use the current generator height as the lower bound of the block height
213         // that the transaction may appear in.
214         localHeight := a.chain.Height()
215         generatorHeight := localHeight
216
217         log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
218         // Remember this height in case we retry this submit call.
219         /*height, err := recordSubmittedTx(ctx, a.db, txTemplate.Transaction.ID, generatorHeight)
220         if err != nil {
221                 return errors.Wrap(err, "saving tx submitted height")
222         }*/
223
224         err := txbuilder.FinalizeTx(ctx, a.chain, txTemplate.Transaction)
225         if err != nil {
226                 return err
227         }
228         if waitUntil == "none" {
229                 return nil
230         }
231
232         height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
233         if err != nil {
234                 return err
235         }
236         if waitUntil == "confirmed" {
237                 return nil
238         }
239
240         select {
241         case <-ctx.Done():
242                 return ctx.Err()
243         case <-a.pinStore.AllWaiter(height):
244         }
245
246         return nil
247 }
248
249 func (a *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *legacy.Tx, height uint64) (uint64, error) {
250         log.Printf("waitForTxInBlock function")
251         for {
252                 height++
253                 select {
254                 case <-ctx.Done():
255                         return 0, ctx.Err()
256
257                 case <-a.chain.BlockWaiter(height):
258                         b, err := a.chain.GetBlockByHeight(height)
259                         if err != nil {
260                                 return 0, errors.Wrap(err, "getting block that just landed")
261                         }
262                         for _, confirmed := range b.Transactions {
263                                 if confirmed.ID == tx.ID {
264                                         // confirmed
265                                         return height, nil
266                                 }
267                         }
268
269                         if tx.MaxTime > 0 && tx.MaxTime < b.TimestampMS {
270                                 return 0, errors.Wrap(txbuilder.ErrRejected, "transaction max time exceeded")
271                         }
272
273                         // might still be in pool or might be rejected; we can't
274                         // tell definitively until its max time elapses.
275
276                         // Re-insert into the pool in case it was dropped.
277                         err = txbuilder.FinalizeTx(ctx, a.chain, tx)
278                         if err != nil {
279                                 return 0, err
280                         }
281
282                         // TODO(jackson): Do simple rejection checks like checking if
283                         // the tx's blockchain prevouts still exist in the state tree.
284                 }
285         }
286 }
287
288 type SubmitArg struct {
289         Transactions []txbuilder.Template
290         Wait         chainjson.Duration
291         WaitUntil    string `json:"wait_until"` // values none, confirmed, processed. default: processed
292 }
293
294 // POST /submit-transaction
295 func (a *BlockchainReactor) submit(ctx context.Context, x SubmitArg) (interface{}, error) {
296         // Setup a timeout for the provided wait duration.
297         timeout := x.Wait.Duration
298         if timeout <= 0 {
299                 timeout = 30 * time.Second
300         }
301         ctx, cancel := context.WithTimeout(ctx, timeout)
302         defer cancel()
303
304         responses := make([]interface{}, len(x.Transactions))
305         var wg sync.WaitGroup
306         wg.Add(len(responses))
307         for i := range responses {
308                 go func(i int) {
309                         subctx := reqid.NewSubContext(ctx, reqid.New())
310                         defer wg.Done()
311                         defer batchRecover(subctx, &responses[i])
312
313                         tx, err := a.submitSingle(subctx, &x.Transactions[i], x.WaitUntil)
314                         log.WithFields(log.Fields{"err": err, "tx": tx}).Info("submit single tx")
315                         if err != nil {
316                                 responses[i] = err
317                         } else {
318                                 responses[i] = tx
319                         }
320                 }(i)
321         }
322
323         wg.Wait()
324         return responses, nil
325 }