OSDN Git Service

Merge pull request #375 from Bytom/dev
[bytom/bytom-spv.git] / blockchain / account / reserve.go
1 package account
2
3 import (
4         "context"
5         "encoding/json"
6         "sync"
7         "sync/atomic"
8         "time"
9
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "github.com/bytom/errors"
13         "github.com/bytom/protocol"
14         "github.com/bytom/protocol/bc"
15         "github.com/bytom/sync/idempotency"
16 )
17
18 var (
19         // ErrInsufficient indicates the account doesn't contain enough
20         // units of the requested asset to satisfy the reservation.
21         // New units must be deposited into the account in order to
22         // satisfy the request; change will not be sufficient.
23         ErrInsufficient = errors.New("reservation found insufficient funds")
24
25         // ErrReserved indicates that a reservation could not be
26         // satisfied because some of the outputs were already reserved.
27         // When those reservations are finalized into a transaction
28         // (and no other transaction spends funds from the account),
29         // new change outputs will be created
30         // in sufficient amounts to satisfy the request.
31         ErrReserved    = errors.New("reservation found outputs already reserved")
32         ErrMatchUTXO   = errors.New("can't match enough valid utxos")
33         ErrReservation = errors.New("couldn't find reservation")
34 )
35
36 // UTXO describes an individual account utxo.
37 type UTXO struct {
38         OutputID bc.Hash
39         SourceID bc.Hash
40
41         // Avoiding AssetAmount here so that new(utxo) doesn't produce an
42         // AssetAmount with a nil AssetId.
43         AssetID bc.AssetID
44         Amount  uint64
45
46         SourcePos      uint64
47         ControlProgram []byte
48         RefDataHash    bc.Hash
49
50         AccountID           string
51         Address             string
52         ControlProgramIndex uint64
53         ValidHeight         uint64
54 }
55
56 func (u *UTXO) source() source {
57         return source{AssetID: u.AssetID, AccountID: u.AccountID}
58 }
59
60 // source describes the criteria to use when selecting UTXOs.
61 type source struct {
62         AssetID   bc.AssetID
63         AccountID string
64 }
65
66 // reservation describes a reservation of a set of UTXOs belonging
67 // to a particular account. Reservations are immutable.
68 type reservation struct {
69         ID          uint64
70         Source      source
71         UTXOs       []*UTXO
72         Change      uint64
73         Expiry      time.Time
74         ClientToken *string
75 }
76
77 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
78         return &reserver{
79                 c:            c,
80                 db:           walletdb,
81                 reservations: make(map[uint64]*reservation),
82                 sources:      make(map[source]*sourceReserver),
83         }
84 }
85
86 // reserver implements a utxo reserver that stores reservations
87 // in-memory. It relies on the account_utxos table for the source of
88 // truth of valid UTXOs but tracks which of those UTXOs are reserved
89 // in-memory.
90 //
91 // To reduce latency and prevent deadlock, no two mutexes (either on
92 // reserver or sourceReserver) should be held at the same time
93 //
94 // reserver ensures idempotency of reservations until the reservation
95 // expiration.
96 type reserver struct {
97         c                 *protocol.Chain
98         db                dbm.DB
99         nextReservationID uint64
100         idempotency       idempotency.Group
101
102         reservationsMu sync.Mutex
103         reservations   map[uint64]*reservation
104
105         sourcesMu sync.Mutex
106         sources   map[source]*sourceReserver
107 }
108
109 // Reserve selects and reserves UTXOs according to the criteria provided
110 // in source. The resulting reservation expires at exp.
111 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
112
113         if clientToken == nil {
114                 return re.reserve(src, amount, clientToken, exp)
115         }
116
117         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
118                 return re.reserve(src, amount, clientToken, exp)
119         })
120         return untypedRes.(*reservation), err
121 }
122
123 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
124         sourceReserver := re.source(src)
125
126         // Try to reserve the right amount.
127         rid := atomic.AddUint64(&re.nextReservationID, 1)
128         reserved, total, err, isImmature := sourceReserver.reserve(rid, amount)
129         if err != nil {
130                 if isImmature {
131                         return nil, errors.WithDetail(err, "some coinbase utxos are immature")
132                 }
133                 return nil, err
134         }
135
136         res = &reservation{
137                 ID:          rid,
138                 Source:      src,
139                 UTXOs:       reserved,
140                 Expiry:      exp,
141                 ClientToken: clientToken,
142         }
143
144         // Save the successful reservation.
145         re.reservationsMu.Lock()
146         defer re.reservationsMu.Unlock()
147         re.reservations[rid] = res
148
149         // Make change if necessary
150         if total > amount {
151                 res.Change = total - amount
152         }
153         return res, nil
154 }
155
156 // ReserveUTXO reserves a specific utxo for spending. The resulting
157 // reservation expires at exp.
158 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
159         if clientToken == nil {
160                 return re.reserveUTXO(ctx, out, exp, nil)
161         }
162
163         untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
164                 return re.reserveUTXO(ctx, out, exp, clientToken)
165         })
166         return untypedRes.(*reservation), err
167 }
168
169 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
170         u, err := findSpecificUTXO(re.db, out)
171         if err != nil {
172                 return nil, err
173         }
174
175         //u.ValidHeight > 0 means coinbase utxo
176         if u.ValidHeight > 0 && u.ValidHeight > re.c.Height() {
177                 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
178         }
179
180         rid := atomic.AddUint64(&re.nextReservationID, 1)
181         err = re.source(u.source()).reserveUTXO(rid, u)
182         if err != nil {
183                 return nil, err
184         }
185
186         res := &reservation{
187                 ID:          rid,
188                 Source:      u.source(),
189                 UTXOs:       []*UTXO{u},
190                 Expiry:      exp,
191                 ClientToken: clientToken,
192         }
193         re.reservationsMu.Lock()
194         re.reservations[rid] = res
195         re.reservationsMu.Unlock()
196         return res, nil
197 }
198
199 // Cancel makes a best-effort attempt at canceling the reservation with
200 // the provided ID.
201 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
202         re.reservationsMu.Lock()
203         res, ok := re.reservations[rid]
204         delete(re.reservations, rid)
205         re.reservationsMu.Unlock()
206         if !ok {
207                 return errors.Wrapf(ErrReservation, "rid=%d", rid)
208         }
209         re.source(res.Source).cancel(res)
210         /*if res.ClientToken != nil {
211                 re.idempotency.Forget(*res.ClientToken)
212         }*/
213         return nil
214 }
215
216 // ExpireReservations cleans up all reservations that have expired,
217 // making their UTXOs available for reservation again.
218 func (re *reserver) ExpireReservations(ctx context.Context) error {
219         // Remove records of any reservations that have expired.
220         now := time.Now()
221         var canceled []*reservation
222         re.reservationsMu.Lock()
223         for rid, res := range re.reservations {
224                 if res.Expiry.Before(now) {
225                         canceled = append(canceled, res)
226                         delete(re.reservations, rid)
227                 }
228         }
229         re.reservationsMu.Unlock()
230
231         // If we removed any expired reservations, update the corresponding
232         // source reservers.
233         for _, res := range canceled {
234                 re.source(res.Source).cancel(res)
235                 /*if res.ClientToken != nil {
236                         re.idempotency.Forget(*res.ClientToken)
237                 }*/
238         }
239
240         // TODO(jackson): Cleanup any source reservers that don't have
241         // anything reserved. It'll be a little tricky because of our
242         // locking scheme.
243         return nil
244 }
245
246 func (re *reserver) source(src source) *sourceReserver {
247         re.sourcesMu.Lock()
248         defer re.sourcesMu.Unlock()
249
250         sr, ok := re.sources[src]
251         if ok {
252                 return sr
253         }
254
255         sr = &sourceReserver{
256                 db:            re.db,
257                 src:           src,
258                 reserved:      make(map[bc.Hash]uint64),
259                 currentHeight: re.c.Height,
260         }
261         re.sources[src] = sr
262         return sr
263 }
264
265 type sourceReserver struct {
266         db            dbm.DB
267         src           source
268         currentHeight func() uint64
269         mu            sync.Mutex
270         reserved      map[bc.Hash]uint64
271 }
272
273 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, error, bool) {
274         var (
275                 reserved, unavailable uint64
276                 reservedUTXOs         []*UTXO
277         )
278
279         utxos, err, isImmature := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
280         if err != nil {
281                 return nil, 0, errors.Wrap(err), isImmature
282         }
283
284         sr.mu.Lock()
285         defer sr.mu.Unlock()
286         for _, u := range utxos {
287                 // If the UTXO is already reserved, skip it.
288                 if _, ok := sr.reserved[u.OutputID]; ok {
289                         unavailable += u.Amount
290                         continue
291                 }
292
293                 reserved += u.Amount
294                 reservedUTXOs = append(reservedUTXOs, u)
295                 if reserved >= amount {
296                         break
297                 }
298         }
299         if reserved+unavailable < amount {
300                 // Even if everything was available, this account wouldn't have
301                 // enough to satisfy the request.
302                 return nil, 0, ErrInsufficient, isImmature
303         }
304         if reserved < amount {
305                 // The account has enough for the request, but some is tied up in
306                 // other reservations.
307                 return nil, 0, ErrReserved, isImmature
308         }
309
310         // We've found enough to satisfy the request.
311         for _, u := range reservedUTXOs {
312                 sr.reserved[u.OutputID] = rid
313         }
314
315         return reservedUTXOs, reserved, nil, isImmature
316 }
317
318 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
319         sr.mu.Lock()
320         defer sr.mu.Unlock()
321
322         _, isReserved := sr.reserved[utxo.OutputID]
323         if isReserved {
324                 return ErrReserved
325         }
326
327         sr.reserved[utxo.OutputID] = rid
328         return nil
329 }
330
331 func (sr *sourceReserver) cancel(res *reservation) {
332         sr.mu.Lock()
333         defer sr.mu.Unlock()
334         for _, utxo := range res.UTXOs {
335                 delete(sr.reserved, utxo.OutputID)
336         }
337 }
338
339 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, error, bool) {
340         utxos := []*UTXO{}
341         isImmature := false
342         utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
343         defer utxoIter.Release()
344
345         for utxoIter.Next() {
346                 u := &UTXO{}
347                 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
348                         return nil, errors.Wrap(err), false
349                 }
350
351                 //u.ValidHeight > 0 means coinbase utxo
352                 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
353                         isImmature = true
354                         continue
355                 }
356
357                 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
358                         utxos = append(utxos, u)
359                 }
360         }
361
362         if len(utxos) == 0 {
363                 return nil, ErrMatchUTXO, isImmature
364         }
365         return utxos, nil, isImmature
366 }
367
368 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
369         u := &UTXO{}
370         data := db.Get(UTXOKey(outHash))
371         if data == nil {
372                 return nil, errors.Wrapf(ErrMatchUTXO, "utxo_id = %s", outHash.String())
373         }
374         return u, json.Unmarshal(data, u)
375 }