10 dbm "github.com/tendermint/tmlibs/db"
12 "github.com/bytom/errors"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/sync/idempotency"
19 // ErrInsufficient indicates the account doesn't contain enough
20 // units of the requested asset to satisfy the reservation.
21 // New units must be deposited into the account in order to
22 // satisfy the request; change will not be sufficient.
23 ErrInsufficient = errors.New("reservation found insufficient funds")
25 // ErrReserved indicates that a reservation could not be
26 // satisfied because some of the outputs were already reserved.
27 // When those reservations are finalized into a transaction
28 // (and no other transaction spends funds from the account),
29 // new change outputs will be created
30 // in sufficient amounts to satisfy the request.
31 ErrReserved = errors.New("reservation found outputs already reserved")
32 ErrMatchUTXO = errors.New("can't match enough valid utxos")
33 ErrReservation = errors.New("couldn't find reservation")
36 // UTXO describes an individual account utxo.
41 // Avoiding AssetAmount here so that new(utxo) doesn't produce an
42 // AssetAmount with a nil AssetId.
52 ControlProgramIndex uint64
56 func (u *UTXO) source() source {
57 return source{AssetID: u.AssetID, AccountID: u.AccountID}
60 // source describes the criteria to use when selecting UTXOs.
66 // reservation describes a reservation of a set of UTXOs belonging
67 // to a particular account. Reservations are immutable.
68 type reservation struct {
77 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
81 reservations: make(map[uint64]*reservation),
82 sources: make(map[source]*sourceReserver),
86 // reserver implements a utxo reserver that stores reservations
87 // in-memory. It relies on the account_utxos table for the source of
88 // truth of valid UTXOs but tracks which of those UTXOs are reserved
91 // To reduce latency and prevent deadlock, no two mutexes (either on
92 // reserver or sourceReserver) should be held at the same time
94 // reserver ensures idempotency of reservations until the reservation
96 type reserver struct {
99 nextReservationID uint64
100 idempotency idempotency.Group
102 reservationsMu sync.Mutex
103 reservations map[uint64]*reservation
106 sources map[source]*sourceReserver
109 // Reserve selects and reserves UTXOs according to the criteria provided
110 // in source. The resulting reservation expires at exp.
111 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
113 if clientToken == nil {
114 return re.reserve(src, amount, clientToken, exp)
117 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
118 return re.reserve(src, amount, clientToken, exp)
120 return untypedRes.(*reservation), err
123 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
124 sourceReserver := re.source(src)
126 // Try to reserve the right amount.
127 rid := atomic.AddUint64(&re.nextReservationID, 1)
128 reserved, total, err, isImmature := sourceReserver.reserve(rid, amount)
131 return nil, errors.WithDetail(err, "some coinbase utxos are immature")
141 ClientToken: clientToken,
144 // Save the successful reservation.
145 re.reservationsMu.Lock()
146 defer re.reservationsMu.Unlock()
147 re.reservations[rid] = res
149 // Make change if necessary
151 res.Change = total - amount
156 // ReserveUTXO reserves a specific utxo for spending. The resulting
157 // reservation expires at exp.
158 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
159 if clientToken == nil {
160 return re.reserveUTXO(ctx, out, exp, nil)
163 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
164 return re.reserveUTXO(ctx, out, exp, clientToken)
166 return untypedRes.(*reservation), err
169 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
170 u, err := findSpecificUTXO(re.db, out)
175 //u.ValidHeight > 0 means coinbase utxo
176 if u.ValidHeight > 0 && u.ValidHeight > re.c.Height() {
177 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
180 rid := atomic.AddUint64(&re.nextReservationID, 1)
181 err = re.source(u.source()).reserveUTXO(rid, u)
191 ClientToken: clientToken,
193 re.reservationsMu.Lock()
194 re.reservations[rid] = res
195 re.reservationsMu.Unlock()
199 // Cancel makes a best-effort attempt at canceling the reservation with
201 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
202 re.reservationsMu.Lock()
203 res, ok := re.reservations[rid]
204 delete(re.reservations, rid)
205 re.reservationsMu.Unlock()
207 return errors.Wrapf(ErrReservation, "rid=%d", rid)
209 re.source(res.Source).cancel(res)
210 /*if res.ClientToken != nil {
211 re.idempotency.Forget(*res.ClientToken)
216 // ExpireReservations cleans up all reservations that have expired,
217 // making their UTXOs available for reservation again.
218 func (re *reserver) ExpireReservations(ctx context.Context) error {
219 // Remove records of any reservations that have expired.
221 var canceled []*reservation
222 re.reservationsMu.Lock()
223 for rid, res := range re.reservations {
224 if res.Expiry.Before(now) {
225 canceled = append(canceled, res)
226 delete(re.reservations, rid)
229 re.reservationsMu.Unlock()
231 // If we removed any expired reservations, update the corresponding
233 for _, res := range canceled {
234 re.source(res.Source).cancel(res)
235 /*if res.ClientToken != nil {
236 re.idempotency.Forget(*res.ClientToken)
240 // TODO(jackson): Cleanup any source reservers that don't have
241 // anything reserved. It'll be a little tricky because of our
246 func (re *reserver) source(src source) *sourceReserver {
248 defer re.sourcesMu.Unlock()
250 sr, ok := re.sources[src]
255 sr = &sourceReserver{
258 reserved: make(map[bc.Hash]uint64),
259 currentHeight: re.c.Height,
265 type sourceReserver struct {
268 currentHeight func() uint64
270 reserved map[bc.Hash]uint64
273 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, error, bool) {
275 reserved, unavailable uint64
276 reservedUTXOs []*UTXO
279 utxos, err, isImmature := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
281 return nil, 0, errors.Wrap(err), isImmature
286 for _, u := range utxos {
287 // If the UTXO is already reserved, skip it.
288 if _, ok := sr.reserved[u.OutputID]; ok {
289 unavailable += u.Amount
294 reservedUTXOs = append(reservedUTXOs, u)
295 if reserved >= amount {
299 if reserved+unavailable < amount {
300 // Even if everything was available, this account wouldn't have
301 // enough to satisfy the request.
302 return nil, 0, ErrInsufficient, isImmature
304 if reserved < amount {
305 // The account has enough for the request, but some is tied up in
306 // other reservations.
307 return nil, 0, ErrReserved, isImmature
310 // We've found enough to satisfy the request.
311 for _, u := range reservedUTXOs {
312 sr.reserved[u.OutputID] = rid
315 return reservedUTXOs, reserved, nil, isImmature
318 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
322 _, isReserved := sr.reserved[utxo.OutputID]
327 sr.reserved[utxo.OutputID] = rid
331 func (sr *sourceReserver) cancel(res *reservation) {
334 for _, utxo := range res.UTXOs {
335 delete(sr.reserved, utxo.OutputID)
339 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, error, bool) {
342 utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
343 defer utxoIter.Release()
345 for utxoIter.Next() {
347 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
348 return nil, errors.Wrap(err), false
351 //u.ValidHeight > 0 means coinbase utxo
352 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
357 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
358 utxos = append(utxos, u)
363 return nil, ErrMatchUTXO, isImmature
365 return utxos, nil, isImmature
368 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
370 data := db.Get(UTXOKey(outHash))
372 return nil, errors.Wrapf(ErrMatchUTXO, "utxo_id = %s", outHash.String())
374 return u, json.Unmarshal(data, u)