10 dbm "github.com/tendermint/tmlibs/db"
12 "github.com/bytom/errors"
13 "github.com/bytom/protocol"
14 "github.com/bytom/protocol/bc"
15 "github.com/bytom/sync/idempotency"
19 // ErrInsufficient indicates the account doesn't contain enough
20 // units of the requested asset to satisfy the reservation.
21 // New units must be deposited into the account in order to
22 // satisfy the request; change will not be sufficient.
23 ErrInsufficient = errors.New("reservation found insufficient funds")
25 // ErrReserved indicates that a reservation could not be
26 // satisfied because some of the outputs were already reserved.
27 // When those reservations are finalized into a transaction
28 // (and no other transaction spends funds from the account),
29 // new change outputs will be created
30 // in sufficient amounts to satisfy the request.
31 ErrReserved = errors.New("reservation found outputs already reserved")
32 // ErrMatchUTXO indicates the account doesn't contain enough utxo to satisfy the reservation.
33 ErrMatchUTXO = errors.New("can't match enough valid utxos")
34 // ErrReservation indicates the reserver doesn't found the reservation with the provided ID.
35 ErrReservation = errors.New("couldn't find reservation")
38 // UTXO describes an individual account utxo.
43 // Avoiding AssetAmount here so that new(utxo) doesn't produce an
44 // AssetAmount with a nil AssetId.
53 ControlProgramIndex uint64
58 func (u *UTXO) source() source {
59 return source{AssetID: u.AssetID, AccountID: u.AccountID}
62 // source describes the criteria to use when selecting UTXOs.
68 // reservation describes a reservation of a set of UTXOs belonging
69 // to a particular account. Reservations are immutable.
70 type reservation struct {
79 func newReserver(c *protocol.Chain, walletdb dbm.DB) *reserver {
83 reservations: make(map[uint64]*reservation),
84 sources: make(map[source]*sourceReserver),
88 // reserver implements a utxo reserver that stores reservations
89 // in-memory. It relies on the account_utxos table for the source of
90 // truth of valid UTXOs but tracks which of those UTXOs are reserved
93 // To reduce latency and prevent deadlock, no two mutexes (either on
94 // reserver or sourceReserver) should be held at the same time
96 // reserver ensures idempotency of reservations until the reservation
98 type reserver struct {
99 // `sync/atomic` expects the first word in an allocated struct to be 64-bit
100 // aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
101 nextReservationID uint64
104 idempotency idempotency.Group
106 reservationsMu sync.Mutex
107 reservations map[uint64]*reservation
110 sources map[source]*sourceReserver
113 // Reserve selects and reserves UTXOs according to the criteria provided
114 // in source. The resulting reservation expires at exp.
115 func (re *reserver) Reserve(src source, amount uint64, clientToken *string, exp time.Time) (*reservation, error) {
117 if clientToken == nil {
118 return re.reserve(src, amount, clientToken, exp)
121 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
122 return re.reserve(src, amount, clientToken, exp)
124 return untypedRes.(*reservation), err
127 func (re *reserver) reserve(src source, amount uint64, clientToken *string, exp time.Time) (res *reservation, err error) {
128 sourceReserver := re.source(src)
130 // Try to reserve the right amount.
131 rid := atomic.AddUint64(&re.nextReservationID, 1)
132 reserved, total, isImmature, err := sourceReserver.reserve(rid, amount)
135 return nil, errors.WithDetail(err, "some coinbase utxos are immature")
145 ClientToken: clientToken,
148 // Save the successful reservation.
149 re.reservationsMu.Lock()
150 defer re.reservationsMu.Unlock()
151 re.reservations[rid] = res
153 // Make change if necessary
155 res.Change = total - amount
160 // ReserveUTXO reserves a specific utxo for spending. The resulting
161 // reservation expires at exp.
162 func (re *reserver) ReserveUTXO(ctx context.Context, out bc.Hash, clientToken *string, exp time.Time) (*reservation, error) {
163 if clientToken == nil {
164 return re.reserveUTXO(ctx, out, exp, nil)
167 untypedRes, err := re.idempotency.Once(*clientToken, func() (interface{}, error) {
168 return re.reserveUTXO(ctx, out, exp, clientToken)
170 return untypedRes.(*reservation), err
173 func (re *reserver) reserveUTXO(ctx context.Context, out bc.Hash, exp time.Time, clientToken *string) (*reservation, error) {
174 u, err := findSpecificUTXO(re.db, out)
179 //u.ValidHeight > 0 means coinbase utxo
180 if u.ValidHeight > 0 && u.ValidHeight > re.c.BestBlockHeight() {
181 return nil, errors.WithDetail(ErrMatchUTXO, "this coinbase utxo is immature")
184 rid := atomic.AddUint64(&re.nextReservationID, 1)
185 err = re.source(u.source()).reserveUTXO(rid, u)
195 ClientToken: clientToken,
197 re.reservationsMu.Lock()
198 re.reservations[rid] = res
199 re.reservationsMu.Unlock()
203 // Cancel makes a best-effort attempt at canceling the reservation with
205 func (re *reserver) Cancel(ctx context.Context, rid uint64) error {
206 re.reservationsMu.Lock()
207 res, ok := re.reservations[rid]
208 delete(re.reservations, rid)
209 re.reservationsMu.Unlock()
211 return errors.Wrapf(ErrReservation, "rid=%d", rid)
213 re.source(res.Source).cancel(res)
214 /*if res.ClientToken != nil {
215 re.idempotency.Forget(*res.ClientToken)
220 // ExpireReservations cleans up all reservations that have expired,
221 // making their UTXOs available for reservation again.
222 func (re *reserver) ExpireReservations(ctx context.Context) error {
223 // Remove records of any reservations that have expired.
225 var canceled []*reservation
226 re.reservationsMu.Lock()
227 for rid, res := range re.reservations {
228 if res.Expiry.Before(now) {
229 canceled = append(canceled, res)
230 delete(re.reservations, rid)
233 re.reservationsMu.Unlock()
235 // If we removed any expired reservations, update the corresponding
237 for _, res := range canceled {
238 re.source(res.Source).cancel(res)
239 /*if res.ClientToken != nil {
240 re.idempotency.Forget(*res.ClientToken)
244 // TODO(jackson): Cleanup any source reservers that don't have
245 // anything reserved. It'll be a little tricky because of our
250 func (re *reserver) source(src source) *sourceReserver {
252 defer re.sourcesMu.Unlock()
254 sr, ok := re.sources[src]
259 sr = &sourceReserver{
262 reserved: make(map[bc.Hash]uint64),
263 currentHeight: re.c.BestBlockHeight,
269 type sourceReserver struct {
272 currentHeight func() uint64
274 reserved map[bc.Hash]uint64
277 func (sr *sourceReserver) reserve(rid uint64, amount uint64) ([]*UTXO, uint64, bool, error) {
279 reserved, unavailable uint64
280 reservedUTXOs []*UTXO
283 utxos, isImmature, err := findMatchingUTXOs(sr.db, sr.src, sr.currentHeight)
285 return nil, 0, isImmature, errors.Wrap(err)
290 for _, u := range utxos {
291 // If the UTXO is already reserved, skip it.
292 if _, ok := sr.reserved[u.OutputID]; ok {
293 unavailable += u.Amount
298 reservedUTXOs = append(reservedUTXOs, u)
299 if reserved >= amount {
303 if reserved+unavailable < amount {
304 // Even if everything was available, this account wouldn't have
305 // enough to satisfy the request.
306 return nil, 0, isImmature, ErrInsufficient
308 if reserved < amount {
309 // The account has enough for the request, but some is tied up in
310 // other reservations.
311 return nil, 0, isImmature, ErrReserved
314 // We've found enough to satisfy the request.
315 for _, u := range reservedUTXOs {
316 sr.reserved[u.OutputID] = rid
319 return reservedUTXOs, reserved, isImmature, nil
322 func (sr *sourceReserver) reserveUTXO(rid uint64, utxo *UTXO) error {
326 _, isReserved := sr.reserved[utxo.OutputID]
331 sr.reserved[utxo.OutputID] = rid
335 func (sr *sourceReserver) cancel(res *reservation) {
338 for _, utxo := range res.UTXOs {
339 delete(sr.reserved, utxo.OutputID)
343 func findMatchingUTXOs(db dbm.DB, src source, currentHeight func() uint64) ([]*UTXO, bool, error) {
346 utxoIter := db.IteratorPrefix([]byte(UTXOPreFix))
347 defer utxoIter.Release()
349 for utxoIter.Next() {
351 if err := json.Unmarshal(utxoIter.Value(), u); err != nil {
352 return nil, false, errors.Wrap(err)
355 //u.ValidHeight > 0 means coinbase utxo
356 if u.ValidHeight > 0 && u.ValidHeight > currentHeight() {
361 if u.AccountID == src.AccountID && u.AssetID == src.AssetID {
362 utxos = append(utxos, u)
367 return nil, isImmature, ErrMatchUTXO
369 return utxos, isImmature, nil
372 func findSpecificUTXO(db dbm.DB, outHash bc.Hash) (*UTXO, error) {
375 data := db.Get(StandardUTXOKey(outHash))
377 if data = db.Get(ContractUTXOKey(outHash)); data == nil {
378 return nil, errors.Wrapf(ErrMatchUTXO, "output_id = %s", outHash.String())
381 return u, json.Unmarshal(data, u)