OSDN Git Service

fix conflicts
[bytom/bytom.git] / blockchain / reactor.go
1 package blockchain
2
3 import (
4         "bytes"
5         "context"
6         "fmt"
7         "net/http"
8         "reflect"
9         "time"
10
11         "github.com/bytom/blockchain/accesstoken"
12         "github.com/bytom/blockchain/account"
13         "github.com/bytom/blockchain/asset"
14         "github.com/bytom/blockchain/pseudohsm"
15         "github.com/bytom/blockchain/txdb"
16         "github.com/bytom/blockchain/txfeed"
17         "github.com/bytom/encoding/json"
18         "github.com/bytom/log"
19         "github.com/bytom/mining/cpuminer"
20         "github.com/bytom/p2p"
21         "github.com/bytom/protocol"
22         "github.com/bytom/protocol/bc/legacy"
23         "github.com/bytom/types"
24         wire "github.com/tendermint/go-wire"
25         cmn "github.com/tendermint/tmlibs/common"
26         //"github.com/bytom/net/http/gzip"
27         "github.com/bytom/net/http/httpjson"
28         //"github.com/bytom/net/http/limit"
29
30         "github.com/bytom/errors"
31         "github.com/bytom/generated/dashboard"
32         "github.com/bytom/net/http/static"
33 )
34
35 const (
36         // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
37         BlockchainChannel = byte(0x40)
38
39         defaultChannelCapacity = 100
40         defaultSleepIntervalMS = 500
41         trySyncIntervalMS      = 100
42         // stop syncing when last block's time is
43         // within this much of the system time.
44         // stopSyncingDurationMinutes = 10
45
46         // ask for best height every 10s
47         statusUpdateIntervalSeconds = 10
48         // check if we should switch to consensus reactor
49         switchToConsensusIntervalSeconds = 1
50         maxBlockchainResponseSize        = 22020096 + 2
51         crosscoreRPCPrefix               = "/rpc/"
52 )
53
54 // BlockchainReactor handles long-term catchup syncing.
55 type BlockchainReactor struct {
56         p2p.BaseReactor
57
58         chain       *protocol.Chain
59         store       *txdb.Store
60         accounts    *account.Manager
61         assets      *asset.Registry
62         accesstoken *accesstoken.Token
63         txFeeds     *txfeed.TxFeed
64         pool        *BlockPool
65         txPool      *protocol.TxPool
66         hsm         *pseudohsm.HSM
67         mining      *cpuminer.CPUMiner
68         mux         *http.ServeMux
69         handler     http.Handler
70         fastSync    bool
71         requestsCh  chan BlockRequest
72         timeoutsCh  chan string
73         evsw        types.EventSwitch
74 }
75
76 func batchRecover(ctx context.Context, v *interface{}) {
77         if r := recover(); r != nil {
78                 var err error
79                 if recoveredErr, ok := r.(error); ok {
80                         err = recoveredErr
81                 } else {
82                         err = fmt.Errorf("panic with %T", r)
83                 }
84                 err = errors.Wrap(err)
85                 *v = err
86         }
87
88         if *v == nil {
89                 return
90         }
91         // Convert errors into error responses (including errors
92         // from recovered panics above).
93         if err, ok := (*v).(error); ok {
94                 errorFormatter.Log(ctx, err)
95                 *v = errorFormatter.Format(err)
96         }
97 }
98
99 func jsonHandler(f interface{}) http.Handler {
100         h, err := httpjson.Handler(f, errorFormatter.Write)
101         if err != nil {
102                 panic(err)
103         }
104         return h
105 }
106
107 func alwaysError(err error) http.Handler {
108         return jsonHandler(func() error { return err })
109 }
110
111 func (bcr *BlockchainReactor) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
112         bcr.handler.ServeHTTP(rw, req)
113 }
114
115 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
116         //if a.config == nil {
117         // never configured
118         log.Printf(ctx, "-------info-----")
119         return map[string]interface{}{
120                 "is_configured": false,
121                 "version":       "0.001",
122                 "build_commit":  "----",
123                 "build_date":    "------",
124                 "build_config":  "---------",
125         }, nil
126         //}
127 }
128
129 func (bcr *BlockchainReactor) createblockkey(ctx context.Context) {
130         log.Printf(ctx, "creat-block-key")
131 }
132
133 func webAssetsHandler(next http.Handler) http.Handler {
134         mux := http.NewServeMux()
135         mux.Handle("/dashboard/", http.StripPrefix("/dashboard/", static.Handler{
136                 Assets:  dashboard.Files,
137                 Default: "index.html",
138         }))
139         mux.Handle("/", next)
140         return mux
141 }
142
143 func maxBytes(h http.Handler) http.Handler {
144         const maxReqSize = 1e7 // 10MB
145         return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
146                 // A block can easily be bigger than maxReqSize, but everything
147                 // else should be pretty small.
148                 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
149                         req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
150                 }
151                 h.ServeHTTP(w, req)
152         })
153 }
154
155 func (bcr *BlockchainReactor) BuildHander() {
156         m := bcr.mux
157         m.Handle("/create-account", jsonHandler(bcr.createAccount))
158         m.Handle("/create-asset", jsonHandler(bcr.createAsset))
159         m.Handle("/update-account-tags", jsonHandler(bcr.updateAccountTags))
160         m.Handle("/update-asset-tags", jsonHandler(bcr.updateAssetTags))
161         m.Handle("/build-transaction", jsonHandler(bcr.build))
162         m.Handle("/create-control-program", jsonHandler(bcr.createControlProgram))
163         m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver))
164         m.Handle("/create-transaction-feed", jsonHandler(bcr.createTxFeed))
165         m.Handle("/get-transaction-feed", jsonHandler(bcr.getTxFeed))
166         m.Handle("/update-transaction-feed", jsonHandler(bcr.updateTxFeed))
167         m.Handle("/delete-transaction-feed", jsonHandler(bcr.deleteTxFeed))
168         m.Handle("/list-accounts", jsonHandler(bcr.listAccounts))
169         m.Handle("/list-assets", jsonHandler(bcr.listAssets))
170         m.Handle("/list-transaction-feeds", jsonHandler(bcr.listTxFeeds))
171         m.Handle("/list-transactions", jsonHandler(bcr.listTransactions))
172         m.Handle("/list-balances", jsonHandler(bcr.listBalances))
173         m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs))
174         m.Handle("/", alwaysError(errors.New("not Found")))
175         m.Handle("/info", jsonHandler(bcr.info))
176         m.Handle("/create-block-key", jsonHandler(bcr.createblockkey))
177         m.Handle("/submit-transaction", jsonHandler(bcr.submit))
178         m.Handle("/create-access-token", jsonHandler(bcr.createAccessToken))
179         m.Handle("/list-access-tokens", jsonHandler(bcr.listAccessTokens))
180         m.Handle("/delete-access-token", jsonHandler(bcr.deleteAccessToken))
181         //hsm api
182         m.Handle("/create-key", jsonHandler(bcr.pseudohsmCreateKey))
183         m.Handle("/list-keys", jsonHandler(bcr.pseudohsmListKeys))
184         m.Handle("/delete-key", jsonHandler(bcr.pseudohsmDeleteKey))
185         m.Handle("/sign-transactions", jsonHandler(bcr.pseudohsmSignTemplates))
186         m.Handle("/reset-password", jsonHandler(bcr.pseudohsmResetPassword))
187         m.Handle("/update-alias", jsonHandler(bcr.pseudohsmUpdateAlias))
188
189         latencyHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
190                 if l := latency(m, req); l != nil {
191                         defer l.RecordSince(time.Now())
192                 }
193                 m.ServeHTTP(w, req)
194         })
195         handler := maxBytes(latencyHandler) // TODO(tessr): consider moving this to non-core specific mux
196         handler = webAssetsHandler(handler)
197         /*      handler = healthHandler(handler)
198                 for _, l := range a.requestLimits {
199                         handler = limit.Handler(handler, alwaysError(errRateLimited), l.perSecond, l.burst, l.key)
200                 }
201                 handler = gzip.Handler{Handler: handler}
202                 handler = coreCounter(handler)
203                 handler = timeoutContextHandler(handler)
204                 if a.config != nil && a.config.BlockchainId != nil {
205                         handler = blockchainIDHandler(handler, a.config.BlockchainId.String())
206                 }
207         */
208
209         bcr.handler = handler
210 }
211
212 // Used as a request object for api queries
213 type requestQuery struct {
214         Filter       string        `json:"filter,omitempty"`
215         FilterParams []interface{} `json:"filter_params,omitempty"`
216         SumBy        []string      `json:"sum_by,omitempty"`
217         PageSize     int           `json:"page_size"`
218
219         // AscLongPoll and Timeout are used by /list-transactions
220         // to facilitate notifications.
221         AscLongPoll bool          `json:"ascending_with_long_poll,omitempty"`
222         Timeout     json.Duration `json:"timeout"`
223
224         // After is a completely opaque cursor, indicating that only
225         // items in the result set after the one identified by `After`
226         // should be included. It has no relationship to time.
227         After string `json:"after"`
228
229         // These two are used for time-range queries like /list-transactions
230         StartTimeMS uint64 `json:"start_time,omitempty"`
231         EndTimeMS   uint64 `json:"end_time,omitempty"`
232
233         // This is used for point-in-time queries like /list-balances
234         // TODO(bobg): Different request structs for endpoints with different needs
235         TimestampMS uint64 `json:"timestamp,omitempty"`
236
237         // This is used for filtering results from /list-access-tokens
238         // Value must be "client" or "network"
239         Type string `json:"type"`
240
241         // Aliases is used to filter results from /mockshm/list-keys
242         Aliases []string `json:"aliases,omitempty"`
243 }
244
245 // Used as a response object for api queries
246 type page struct {
247         Items    interface{}  `json:"items"`
248         Next     requestQuery `json:"next"`
249         LastPage bool         `json:"last_page"`
250 }
251
252 func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, hsm *pseudohsm.HSM, fastSync bool) *BlockchainReactor {
253         requestsCh := make(chan BlockRequest, defaultChannelCapacity)
254         timeoutsCh := make(chan string, defaultChannelCapacity)
255         pool := NewBlockPool(
256                 store.Height()+1,
257                 requestsCh,
258                 timeoutsCh,
259         )
260         mining := cpuminer.NewCPUMiner(chain, txPool)
261         bcR := &BlockchainReactor{
262                 chain:      chain,
263                 store:      store,
264                 accounts:   accounts,
265                 assets:     assets,
266                 pool:       pool,
267                 txPool:     txPool,
268                 mining:     mining,
269                 mux:        http.NewServeMux(),
270                 hsm:        hsm,
271                 fastSync:   fastSync,
272                 requestsCh: requestsCh,
273                 timeoutsCh: timeoutsCh,
274         }
275         bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
276         return bcR
277 }
278
279 // OnStart implements BaseService
280 func (bcR *BlockchainReactor) OnStart() error {
281         bcR.BaseReactor.OnStart()
282         bcR.BuildHander()
283         if bcR.fastSync {
284                 _, err := bcR.pool.Start()
285                 if err != nil {
286                         return err
287                 }
288                 go bcR.poolRoutine()
289         }
290         return nil
291 }
292
293 // OnStop implements BaseService
294 func (bcR *BlockchainReactor) OnStop() {
295         bcR.BaseReactor.OnStop()
296 }
297
298 // GetChannels implements Reactor
299 func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
300         return []*p2p.ChannelDescriptor{
301                 &p2p.ChannelDescriptor{
302                         ID:                BlockchainChannel,
303                         Priority:          5,
304                         SendQueueCapacity: 100,
305                 },
306         }
307 }
308
309 // AddPeer implements Reactor by sending our state to peer.
310 func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
311         if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}}) {
312                 // doing nothing, will try later in `poolRoutine`
313         }
314 }
315
316 // RemovePeer implements Reactor by removing peer from the pool.
317 func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
318         bcR.pool.RemovePeer(peer.Key)
319 }
320
321 // Receive implements Reactor by handling 4 types of messages (look below).
322 func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
323         _, msg, err := DecodeMessage(msgBytes)
324         if err != nil {
325                 bcR.Logger.Error("Error decoding message", "error", err)
326                 return
327         }
328
329         bcR.Logger.Info("Receive", "src", src, "chID", chID, "msg", msg)
330
331         switch msg := msg.(type) {
332         case *bcBlockRequestMessage:
333                 rawBlock, err := bcR.store.GetRawBlock(msg.Height)
334                 if err == nil {
335                         msg := &bcBlockResponseMessage{RawBlock: rawBlock}
336                         queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
337                         if !queued {
338                                 // queue is full, just ignore.
339                         }
340                 } else {
341                         fmt.Println("skip sent the block response due to block is nil")
342                         // TODO peer is asking for things we don't have.
343                 }
344         case *bcBlockResponseMessage:
345                 // Got a block.
346                 //fmt.Printf("receive block %v \n", msg.Block)
347                 bcR.pool.AddBlock(src.Key, msg.GetBlock(), len(msgBytes))
348         case *bcStatusRequestMessage:
349                 // Send peer our state.
350                 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}})
351                 if !queued {
352                         // sorry
353                 }
354         case *bcStatusResponseMessage:
355                 // Got a peer status. Unverified.
356                 //fmt.Printf("reveive peer high is %d \n", msg.Height)
357                 bcR.pool.SetPeerHeight(src.Key, msg.Height)
358         case *bcTransactionMessage:
359                 tx := msg.GetTransaction()
360
361                 if err := bcR.chain.ValidateTx(tx); err != nil {
362                         return
363                 }
364         default:
365                 bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
366         }
367 }
368
369 // Handle messages from the poolReactor telling the reactor what to do.
370 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
371 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
372 func (bcR *BlockchainReactor) poolRoutine() {
373
374         trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
375         statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
376         newTxCh := bcR.txPool.GetNewTxCh()
377         //switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
378
379 FOR_LOOP:
380         for {
381
382                 select {
383                 case request := <-bcR.requestsCh: // chan BlockRequest
384                         peer := bcR.Switch.Peers().Get(request.PeerID)
385                         if peer == nil {
386                                 continue FOR_LOOP // Peer has since been disconnected.
387                         }
388                         msg := &bcBlockRequestMessage{request.Height}
389                         queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
390                         if !queued {
391                                 // We couldn't make the request, send-queue full.
392                                 // The pool handles timeouts, just let it go.
393                                 continue FOR_LOOP
394                         }
395                 case peerID := <-bcR.timeoutsCh: // chan string
396                         // Peer timed out.
397                         peer := bcR.Switch.Peers().Get(peerID)
398                         if peer != nil {
399                                 bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
400                         }
401                 case newTx := <-newTxCh:
402                         go bcR.BroadcastTransaction(newTx)
403                 case _ = <-statusUpdateTicker.C:
404                         // ask for status updates
405                         go bcR.BroadcastStatusRequest()
406                 case _ = <-trySyncTicker.C: // chan time
407                 SYNC_LOOP:
408                         for i := 0; i < 10; i++ {
409                                 // See if there are any blocks to sync.
410                                 block, _ := bcR.pool.PeekTwoBlocks()
411                                 if block == nil {
412                                         break SYNC_LOOP
413                                 }
414
415                                 bcR.pool.PopRequest()
416                                 snap, err := bcR.chain.ApplyValidBlock(block)
417                                 if err != nil {
418                                         fmt.Printf("Failed to apply valid block: %v \n", err)
419                                         break SYNC_LOOP
420                                 }
421                                 err = bcR.chain.CommitAppliedBlock(nil, block, snap)
422                                 if err != nil {
423                                         fmt.Printf("Failed to commit block: %v \n", err)
424                                         break SYNC_LOOP
425                                 }
426                                 bcR.Logger.Info("finish to sync commit block", block.BlockHeader.Height)
427                         }
428                         continue FOR_LOOP
429                 case <-bcR.Quit:
430                         break FOR_LOOP
431                 }
432                 if bcR.pool.IsCaughtUp() && !bcR.mining.IsMining() {
433                         bcR.Logger.Info("start to mining")
434                         bcR.mining.Start()
435                 }
436         }
437 }
438
439 // BroadcastStatusRequest broadcasts `BlockStore` height.
440 func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
441         bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.chain.Height()}})
442         return nil
443 }
444
445 func (bcR *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error {
446         rawTx, err := tx.TxData.MarshalText()
447         if err == nil {
448                 return err
449         }
450         bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcTransactionMessage{rawTx}})
451         return nil
452 }
453
454 //-----------------------------------------------------------------------------
455 // Messages
456
457 const (
458         msgTypeBlockRequest       = byte(0x10)
459         msgTypeBlockResponse      = byte(0x11)
460         msgTypeStatusResponse     = byte(0x20)
461         msgTypeStatusRequest      = byte(0x21)
462         msgTypeTransactionRequest = byte(0x30)
463 )
464
465 // BlockchainMessage is a generic message for this reactor.
466 type BlockchainMessage interface{}
467
468 var _ = wire.RegisterInterface(
469         struct{ BlockchainMessage }{},
470         wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
471         wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
472         wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
473         wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
474         wire.ConcreteType{&bcTransactionMessage{}, msgTypeTransactionRequest},
475 )
476
477 // DecodeMessage decodes BlockchainMessage.
478 // TODO: ensure that bz is completely read.
479 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
480         msgType = bz[0]
481         n := int(0)
482         r := bytes.NewReader(bz)
483         msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
484         if err != nil && n != len(bz) {
485                 err = errors.New("DecodeMessage() had bytes left over")
486         }
487         return
488 }
489
490 //-----------------------------------
491
492 type bcBlockRequestMessage struct {
493         Height uint64
494 }
495
496 func (m *bcBlockRequestMessage) String() string {
497         return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
498 }
499
500 //-------------------------------------
501
502 type bcTransactionMessage struct {
503         RawTx []byte
504 }
505
506 func (m *bcTransactionMessage) GetTransaction() *legacy.Tx {
507         tx := &legacy.Tx{}
508         tx.UnmarshalText(m.RawTx)
509         return tx
510 }
511
512 //-------------------------------------
513
514 //-------------------------------------
515
516 // NOTE: keep up-to-date with maxBlockchainResponseSize
517 type bcBlockResponseMessage struct {
518         RawBlock []byte
519 }
520
521 func (m *bcBlockResponseMessage) GetBlock() *legacy.Block {
522         block := &legacy.Block{
523                 BlockHeader:  legacy.BlockHeader{},
524                 Transactions: []*legacy.Tx{},
525         }
526         block.UnmarshalText(m.RawBlock)
527         return block
528 }
529
530 func (m *bcBlockResponseMessage) String() string {
531         block := m.GetBlock()
532         return cmn.Fmt("[bcBlockResponseMessage %v]", block.BlockHeader.Height)
533 }
534
535 //-------------------------------------
536
537 type bcStatusRequestMessage struct {
538         Height uint64
539 }
540
541 func (m *bcStatusRequestMessage) String() string {
542         return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
543 }
544
545 //-------------------------------------
546
547 type bcStatusResponseMessage struct {
548         Height uint64
549 }
550
551 func (m *bcStatusResponseMessage) String() string {
552         return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)
553 }