10 dbm "github.com/tendermint/tmlibs/db"
12 "github.com/bytom/errors"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/sync/idempotency"
19 // ErrInsufficient indicates the account doesn't contain enough
20 // units of the requested asset to satisfy the reservation.
21 // New units must be deposited into the account in order to
22 // satisfy the request; change will not be sufficient.
23 ErrInsufficient = errors.New("reservation found insufficient funds")
25 // ErrReserved indicates that a reservation could not be
26 // satisfied because some of the outputs were already reserved.
27 // When those reservations are finalized into a transaction
28 // (and no other transaction spends funds from the account),
29 // new change outputs will be created
30 // in sufficient amounts to satisfy the request.
31 ErrReserved = errors.New("reservation found outputs already reserved")
32 // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33 ErrMatchUTXO = errors.New("can't match enough valid utxos")
34 // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35 ErrReservation = errors.New("couldn't find reservation")
38 // UTXO describes an individual account utxo.
43 // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44 // AssetAmount with a nil AssetId.
53 ControlProgramIndex uint64
57 func (u *UTXO) source() source {
58 return source{AssetID: u.AssetID, AccountID: u.AccountID}
61 // source describes the criteria to use when selecting UTXOs.
67 // reservation describes a reservation of a set of UTXOs belonging
68 // to a particular account. Reservations are immutable.
69 type reservation struct {
78 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
82 reservations: make(map[uint64]*reservation),
83 sources: make(map[source]*sourceReserver),
87 // reserver implements a utxo reserver that stores reservations
88 // in-memory. It relies on the account_utxos table for the source of
89 // truth of valid UTXOs but tracks which of those UTXOs are reserved
92 // To reduce latency and prevent deadlock, no two mutexes (either on
93 // reserver or sourceReserver) should be held at the same time
95 // reserver ensures idempotency of reservations until the reservation
97 type reserver struct {
100 nextReservationID uint64
101 idempotency idempotency.Group
103 reservationsMu sync.Mutex
104 reservations map[uint64]*reservation
107 sources map[source]*sourceReserver
110 // Reserve selects and reserves UTXOs according to the criteria provided
111 // in source. The resulting reservation expires at exp.
112 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
114 if clientToken == nil {
115 return re.reserve(src, amount, clientToken, exp)
118 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
119 return re.reserve(src, amount, clientToken, exp)
121 return untypedRes.(*reservation), err
124 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
125 sourceReserver := re.source(src)
127 // Try to reserve the right amount.
128 rid := atomic.AddUint64(&re.nextReservationID, 1)
129 reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
132 return nil, errors.WithDetail(err, "some coinbase utxos are immature")
142 ClientToken: clientToken,
145 // Save the successful reservation.
146 re.reservationsMu.Lock()
147 defer re.reservationsMu.Unlock()
148 re.reservations[rid] = res
150 // Make change if necessary
152 res.Change = total - amount
157 // ReserveUTXO reserves a specific utxo for spending. The resulting
158 // reservation expires at exp.
159 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
160 if clientToken == nil {
161 return re.reserveUTXO(ctx, out, exp, nil)
164 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
165 return re.reserveUTXO(ctx, out, exp, clientToken)
167 return untypedRes.(*reservation), err
170 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
171 u, err := findSpecificUTXO(re.db, out)
176 //u.ValidHeight > 0 means coinbase utxo
177 if u.ValidHeight > 0 && u.ValidHeight > re.c.BestBlockHeight() {
178 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
181 rid := atomic.AddUint64(&re.nextReservationID, 1)
182 err = re.source(u.source()).reserveUTXO(rid, u)
192 ClientToken: clientToken,
194 re.reservationsMu.Lock()
195 re.reservations[rid] = res
196 re.reservationsMu.Unlock()
200 // Cancel makes a best-effort attempt at canceling the reservation with
202 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
203 re.reservationsMu.Lock()
204 res, ok := re.reservations[rid]
205 delete(re.reservations, rid)
206 re.reservationsMu.Unlock()
208 return errors.Wrapf(ErrReservation, "rid=%d", rid)
210 re.source(res.Source).cancel(res)
211 /*if res.ClientToken != nil {
212 re.idempotency.Forget(*res.ClientToken)
217 // ExpireReservations cleans up all reservations that have expired,
218 // making their UTXOs available for reservation again.
219 func (re *reserver) ExpireReservations(ctx context.Context) error {
220 // Remove records of any reservations that have expired.
222 var canceled []*reservation
223 re.reservationsMu.Lock()
224 for rid, res := range re.reservations {
225 if res.Expiry.Before(now) {
226 canceled = append(canceled, res)
227 delete(re.reservations, rid)
230 re.reservationsMu.Unlock()
232 // If we removed any expired reservations, update the corresponding
234 for _, res := range canceled {
235 re.source(res.Source).cancel(res)
236 /*if res.ClientToken != nil {
237 re.idempotency.Forget(*res.ClientToken)
241 // TODO(jackson): Cleanup any source reservers that don't have
242 // anything reserved. It'll be a little tricky because of our
247 func (re *reserver) source(src source) *sourceReserver {
249 defer re.sourcesMu.Unlock()
251 sr, ok := re.sources[src]
256 sr = &sourceReserver{
259 reserved: make(map[bc.Hash]uint64),
260 currentHeight: re.c.BestBlockHeight,
266 type sourceReserver struct {
269 currentHeight func() uint64
271 reserved map[bc.Hash]uint64
274 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
276 reserved, unavailable uint64
277 reservedUTXOs []*UTXO
280 utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
282 return nil, 0, isImmature, errors.Wrap(err)
287 for _, u := range utxos {
288 // If the UTXO is already reserved, skip it.
289 if _, ok := sr.reserved[u.OutputID]; ok {
290 unavailable += u.Amount
295 reservedUTXOs = append(reservedUTXOs, u)
296 if reserved >= amount {
300 if reserved+unavailable < amount {
301 // Even if everything was available, this account wouldn't have
302 // enough to satisfy the request.
303 return nil, 0, isImmature, ErrInsufficient
305 if reserved < amount {
306 // The account has enough for the request, but some is tied up in
307 // other reservations.
308 return nil, 0, isImmature, ErrReserved
311 // We've found enough to satisfy the request.
312 for _, u := range reservedUTXOs {
313 sr.reserved[u.OutputID] = rid
316 return reservedUTXOs, reserved, isImmature, nil
319 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
323 _, isReserved := sr.reserved[utxo.OutputID]
328 sr.reserved[utxo.OutputID] = rid
332 func (sr *sourceReserver) cancel(res *reservation) {
335 for _, utxo := range res.UTXOs {
336 delete(sr.reserved, utxo.OutputID)
340 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
343 utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
344 defer utxoIter.Release()
346 for utxoIter.Next() {
348 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
349 return nil, false, errors.Wrap(err)
352 //u.ValidHeight > 0 means coinbase utxo
353 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
358 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
359 utxos = append(utxos, u)
364 return nil, isImmature, ErrMatchUTXO
366 return utxos, isImmature, nil
369 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
372 data := db.Get(StandardUTXOKey(outHash))
374 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
375 return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
378 return u, json.Unmarshal(data, u)