OSDN Git Service

Merge pull request #1025 from Bytom/dev
[bytom/bytom-spv.git] / account / reserve.go
1 package account
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "sync/atomic"
8         "time"
9
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/sync/idempotency"
16 )
17
18 var (
19         // ErrInsufficient indicates the account doesn't contain enough
20         // units of the requested asset to satisfy the reservation.
21         // New units must be deposited into the account in order to
22         // satisfy the request; change will not be sufficient.
23         ErrInsufficient = errors.New("reservation found insufficient funds")
24
25         // ErrReserved indicates that a reservation could not be
26         // satisfied because some of the outputs were already reserved.
27         // When those reservations are finalized into a transaction
28         // (and no other transaction spends funds from the account),
29         // new change outputs will be created
30         // in sufficient amounts to satisfy the request.
31         ErrReserved = errors.New("reservation found outputs already reserved")
32         // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33         ErrMatchUTXO = errors.New("can't match enough valid utxos")
34         // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35         ErrReservation = errors.New("couldn't find reservation")
36 )
37
38 // UTXO describes an individual account utxo.
39 type UTXO struct {
40         OutputID bc.Hash
41         SourceID bc.Hash
42
43         // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44         // AssetAmount with a nil AssetId.
45         AssetID bc.AssetID
46         Amount  uint64
47
48         SourcePos      uint64
49         ControlProgram []byte
50
51         AccountID           string
52         Address             string
53         ControlProgramIndex uint64
54         ValidHeight         uint64
55         Change              bool
56 }
57
58 func (u *UTXO) source() source {
59         return source{AssetID: u.AssetID, AccountID: u.AccountID}
60 }
61
62 // source describes the criteria to use when selecting UTXOs.
63 type source struct {
64         AssetID   bc.AssetID
65         AccountID string
66 }
67
68 // reservation describes a reservation of a set of UTXOs belonging
69 // to a particular account. Reservations are immutable.
70 type reservation struct {
71         ID          uint64
72         Source      source
73         UTXOs       []*UTXO
74         Change      uint64
75         Expiry      time.Time
76         ClientToken *string
77 }
78
79 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
80         return &reserver{
81                 c:            c,
82                 db:           walletdb,
83                 reservations: make(map[uint64]*reservation),
84                 sources:      make(map[source]*sourceReserver),
85         }
86 }
87
88 // reserver implements a utxo reserver that stores reservations
89 // in-memory. It relies on the account_utxos table for the source of
90 // truth of valid UTXOs but tracks which of those UTXOs are reserved
91 // in-memory.
92 //
93 // To reduce latency and prevent deadlock, no two mutexes (either on
94 // reserver or sourceReserver) should be held at the same time
95 //
96 // reserver ensures idempotency of reservations until the reservation
97 // expiration.
98 type reserver struct {
99         // `sync/atomic` expects the first word in an allocated struct to be 64-bit
100         // aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
101         nextReservationID uint64
102         c                 *protocol.Chain
103         db                dbm.DB
104         idempotency       idempotency.Group
105
106         reservationsMu sync.Mutex
107         reservations   map[uint64]*reservation
108
109         sourcesMu sync.Mutex
110         sources   map[source]*sourceReserver
111 }
112
113 // Reserve selects and reserves UTXOs according to the criteria provided
114 // in source. The resulting reservation expires at exp.
115 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
116
117         if clientToken == nil {
118                 return re.reserve(src, amount, clientToken, exp)
119         }
120
121         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
122                 return re.reserve(src, amount, clientToken, exp)
123         })
124         return untypedRes.(*reservation), err
125 }
126
127 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
128         sourceReserver := re.source(src)
129
130         // Try to reserve the right amount.
131         rid := atomic.AddUint64(&re.nextReservationID, 1)
132         reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
133         if err != nil {
134                 if isImmature {
135                         return nil, errors.WithDetail(err, "some coinbase utxos are immature")
136                 }
137                 return nil, err
138         }
139
140         res = &reservation{
141                 ID:          rid,
142                 Source:      src,
143                 UTXOs:       reserved,
144                 Expiry:      exp,
145                 ClientToken: clientToken,
146         }
147
148         // Save the successful reservation.
149         re.reservationsMu.Lock()
150         defer re.reservationsMu.Unlock()
151         re.reservations[rid] = res
152
153         // Make change if necessary
154         if total > amount {
155                 res.Change = total - amount
156         }
157         return res, nil
158 }
159
160 // ReserveUTXO reserves a specific utxo for spending. The resulting
161 // reservation expires at exp.
162 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
163         if clientToken == nil {
164                 return re.reserveUTXO(ctx, out, exp, nil)
165         }
166
167         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
168                 return re.reserveUTXO(ctx, out, exp, clientToken)
169         })
170         return untypedRes.(*reservation), err
171 }
172
173 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
174         u, err := findSpecificUTXO(re.db, out)
175         if err != nil {
176                 return nil, err
177         }
178
179         //u.ValidHeight > 0 means coinbase utxo
180         if u.ValidHeight > 0 && u.ValidHeight > re.c.BestBlockHeight() {
181                 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
182         }
183
184         rid := atomic.AddUint64(&re.nextReservationID, 1)
185         err = re.source(u.source()).reserveUTXO(rid, u)
186         if err != nil {
187                 return nil, err
188         }
189
190         res := &reservation{
191                 ID:          rid,
192                 Source:      u.source(),
193                 UTXOs:       []*UTXO{u},
194                 Expiry:      exp,
195                 ClientToken: clientToken,
196         }
197         re.reservationsMu.Lock()
198         re.reservations[rid] = res
199         re.reservationsMu.Unlock()
200         return res, nil
201 }
202
203 // Cancel makes a best-effort attempt at canceling the reservation with
204 // the provided ID.
205 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
206         re.reservationsMu.Lock()
207         res, ok := re.reservations[rid]
208         delete(re.reservations, rid)
209         re.reservationsMu.Unlock()
210         if !ok {
211                 return errors.Wrapf(ErrReservation, "rid=%d", rid)
212         }
213         re.source(res.Source).cancel(res)
214         /*if res.ClientToken != nil {
215                 re.idempotency.Forget(*res.ClientToken)
216         }*/
217         return nil
218 }
219
220 // ExpireReservations cleans up all reservations that have expired,
221 // making their UTXOs available for reservation again.
222 func (re *reserver) ExpireReservations(ctx context.Context) error {
223         // Remove records of any reservations that have expired.
224         now := time.Now()
225         var canceled []*reservation
226         re.reservationsMu.Lock()
227         for rid, res := range re.reservations {
228                 if res.Expiry.Before(now) {
229                         canceled = append(canceled, res)
230                         delete(re.reservations, rid)
231                 }
232         }
233         re.reservationsMu.Unlock()
234
235         // If we removed any expired reservations, update the corresponding
236         // source reservers.
237         for _, res := range canceled {
238                 re.source(res.Source).cancel(res)
239                 /*if res.ClientToken != nil {
240                         re.idempotency.Forget(*res.ClientToken)
241                 }*/
242         }
243
244         // TODO(jackson): Cleanup any source reservers that don't have
245         // anything reserved. It'll be a little tricky because of our
246         // locking scheme.
247         return nil
248 }
249
250 func (re *reserver) source(src source) *sourceReserver {
251         re.sourcesMu.Lock()
252         defer re.sourcesMu.Unlock()
253
254         sr, ok := re.sources[src]
255         if ok {
256                 return sr
257         }
258
259         sr = &sourceReserver{
260                 db:            re.db,
261                 src:           src,
262                 reserved:      make(map[bc.Hash]uint64),
263                 currentHeight: re.c.BestBlockHeight,
264         }
265         re.sources[src] = sr
266         return sr
267 }
268
269 type sourceReserver struct {
270         db            dbm.DB
271         src           source
272         currentHeight func() uint64
273         mu            sync.Mutex
274         reserved      map[bc.Hash]uint64
275 }
276
277 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
278         var (
279                 reserved, unavailable uint64
280                 reservedUTXOs         []*UTXO
281         )
282
283         utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
284         if err != nil {
285                 return nil, 0, isImmature, errors.Wrap(err)
286         }
287
288         sr.mu.Lock()
289         defer sr.mu.Unlock()
290         for _, u := range utxos {
291                 // If the UTXO is already reserved, skip it.
292                 if _, ok := sr.reserved[u.OutputID]; ok {
293                         unavailable += u.Amount
294                         continue
295                 }
296
297                 reserved += u.Amount
298                 reservedUTXOs = append(reservedUTXOs, u)
299                 if reserved >= amount {
300                         break
301                 }
302         }
303         if reserved+unavailable < amount {
304                 // Even if everything was available, this account wouldn't have
305                 // enough to satisfy the request.
306                 return nil, 0, isImmature, ErrInsufficient
307         }
308         if reserved < amount {
309                 // The account has enough for the request, but some is tied up in
310                 // other reservations.
311                 return nil, 0, isImmature, ErrReserved
312         }
313
314         // We've found enough to satisfy the request.
315         for _, u := range reservedUTXOs {
316                 sr.reserved[u.OutputID] = rid
317         }
318
319         return reservedUTXOs, reserved, isImmature, nil
320 }
321
322 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
323         sr.mu.Lock()
324         defer sr.mu.Unlock()
325
326         _, isReserved := sr.reserved[utxo.OutputID]
327         if isReserved {
328                 return ErrReserved
329         }
330
331         sr.reserved[utxo.OutputID] = rid
332         return nil
333 }
334
335 func (sr *sourceReserver) cancel(res *reservation) {
336         sr.mu.Lock()
337         defer sr.mu.Unlock()
338         for _, utxo := range res.UTXOs {
339                 delete(sr.reserved, utxo.OutputID)
340         }
341 }
342
343 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
344         utxos := []*UTXO{}
345         isImmature := false
346         utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
347         defer utxoIter.Release()
348
349         for utxoIter.Next() {
350                 u := &UTXO{}
351                 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
352                         return nil, false, errors.Wrap(err)
353                 }
354
355                 //u.ValidHeight > 0 means coinbase utxo
356                 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
357                         isImmature = true
358                         continue
359                 }
360
361                 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
362                         utxos = append(utxos, u)
363                 }
364         }
365
366         if len(utxos) == 0 {
367                 return nil, isImmature, ErrMatchUTXO
368         }
369         return utxos, isImmature, nil
370 }
371
372 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
373         u := &UTXO{}
374
375         data := db.Get(StandardUTXOKey(outHash))
376         if data == nil {
377                 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
378                         return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
379                 }
380         }
381         return u, json.Unmarshal(data, u)
382 }