11 "github.com/bytom/blockchain/accesstoken"
12 "github.com/bytom/blockchain/account"
13 "github.com/bytom/blockchain/asset"
14 "github.com/bytom/blockchain/pseudohsm"
15 "github.com/bytom/blockchain/txdb"
16 "github.com/bytom/blockchain/txfeed"
17 "github.com/bytom/encoding/json"
18 "github.com/bytom/log"
19 "github.com/bytom/mining/cpuminer"
20 "github.com/bytom/p2p"
21 "github.com/bytom/protocol"
22 "github.com/bytom/protocol/bc/legacy"
23 "github.com/bytom/types"
24 wire "github.com/tendermint/go-wire"
25 cmn "github.com/tendermint/tmlibs/common"
26 //"github.com/bytom/net/http/gzip"
27 "github.com/bytom/net/http/httpjson"
28 //"github.com/bytom/net/http/limit"
30 "github.com/bytom/errors"
31 "github.com/bytom/generated/dashboard"
32 "github.com/bytom/net/http/static"
36 // BlockchainChannel is a channel for blocks and status updates (`BlockStore` height)
37 BlockchainChannel = byte(0x40)
39 defaultChannelCapacity = 100
40 defaultSleepIntervalMS = 500
41 trySyncIntervalMS = 100
42 // stop syncing when last block's time is
43 // within this much of the system time.
44 // stopSyncingDurationMinutes = 10
46 // ask for best height every 10s
47 statusUpdateIntervalSeconds = 10
48 // check if we should switch to consensus reactor
49 switchToConsensusIntervalSeconds = 1
50 maxBlockchainResponseSize = 22020096 + 2
51 crosscoreRPCPrefix = "/rpc/"
54 // BlockchainReactor handles long-term catchup syncing.
55 type BlockchainReactor struct {
60 accounts *account.Manager
61 assets *asset.Registry
62 accesstoken *accesstoken.Token
63 txFeeds *txfeed.TxFeed
65 txPool *protocol.TxPool
67 mining *cpuminer.CPUMiner
71 requestsCh chan BlockRequest
72 timeoutsCh chan string
73 evsw types.EventSwitch
76 func batchRecover(ctx context.Context, v *interface{}) {
77 if r := recover(); r != nil {
79 if recoveredErr, ok := r.(error); ok {
82 err = fmt.Errorf("panic with %T", r)
84 err = errors.Wrap(err)
91 // Convert errors into error responses (including errors
92 // from recovered panics above).
93 if err, ok := (*v).(error); ok {
94 errorFormatter.Log(ctx, err)
95 *v = errorFormatter.Format(err)
99 func jsonHandler(f interface{}) http.Handler {
100 h, err := httpjson.Handler(f, errorFormatter.Write)
107 func alwaysError(err error) http.Handler {
108 return jsonHandler(func() error { return err })
111 func (bcr *BlockchainReactor) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
112 bcr.handler.ServeHTTP(rw, req)
115 func (bcr *BlockchainReactor) info(ctx context.Context) (map[string]interface{}, error) {
116 //if a.config == nil {
118 log.Printf(ctx, "-------info-----")
119 return map[string]interface{}{
120 "is_configured": false,
122 "build_commit": "----",
123 "build_date": "------",
124 "build_config": "---------",
129 func (bcr *BlockchainReactor) createblockkey(ctx context.Context) {
130 log.Printf(ctx, "creat-block-key")
133 func webAssetsHandler(next http.Handler) http.Handler {
134 mux := http.NewServeMux()
135 mux.Handle("/dashboard/", http.StripPrefix("/dashboard/", static.Handler{
136 Assets: dashboard.Files,
137 Default: "index.html",
139 mux.Handle("/", next)
143 func maxBytes(h http.Handler) http.Handler {
144 const maxReqSize = 1e7 // 10MB
145 return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
146 // A block can easily be bigger than maxReqSize, but everything
147 // else should be pretty small.
148 if req.URL.Path != crosscoreRPCPrefix+"signer/sign-block" {
149 req.Body = http.MaxBytesReader(w, req.Body, maxReqSize)
155 func (bcr *BlockchainReactor) BuildHander() {
157 m.Handle("/create-account", jsonHandler(bcr.createAccount))
158 m.Handle("/create-asset", jsonHandler(bcr.createAsset))
159 m.Handle("/update-account-tags", jsonHandler(bcr.updateAccountTags))
160 m.Handle("/update-asset-tags", jsonHandler(bcr.updateAssetTags))
161 m.Handle("/build-transaction", jsonHandler(bcr.build))
162 m.Handle("/create-control-program", jsonHandler(bcr.createControlProgram))
163 m.Handle("/create-account-receiver", jsonHandler(bcr.createAccountReceiver))
164 m.Handle("/create-transaction-feed", jsonHandler(bcr.createTxFeed))
165 m.Handle("/get-transaction-feed", jsonHandler(bcr.getTxFeed))
166 m.Handle("/update-transaction-feed", jsonHandler(bcr.updateTxFeed))
167 m.Handle("/delete-transaction-feed", jsonHandler(bcr.deleteTxFeed))
168 m.Handle("/list-accounts", jsonHandler(bcr.listAccounts))
169 m.Handle("/list-assets", jsonHandler(bcr.listAssets))
170 m.Handle("/list-transaction-feeds", jsonHandler(bcr.listTxFeeds))
171 m.Handle("/list-transactions", jsonHandler(bcr.listTransactions))
172 m.Handle("/list-balances", jsonHandler(bcr.listBalances))
173 m.Handle("/list-unspent-outputs", jsonHandler(bcr.listUnspentOutputs))
174 m.Handle("/", alwaysError(errors.New("not Found")))
175 m.Handle("/info", jsonHandler(bcr.info))
176 m.Handle("/create-block-key", jsonHandler(bcr.createblockkey))
177 m.Handle("/submit-transaction", jsonHandler(bcr.submit))
178 m.Handle("/create-access-token", jsonHandler(bcr.createAccessToken))
179 m.Handle("/list-access-tokens", jsonHandler(bcr.listAccessTokens))
180 m.Handle("/delete-access-token", jsonHandler(bcr.deleteAccessToken))
182 m.Handle("/create-key", jsonHandler(bcr.pseudohsmCreateKey))
183 m.Handle("/list-keys", jsonHandler(bcr.pseudohsmListKeys))
184 m.Handle("/delete-key", jsonHandler(bcr.pseudohsmDeleteKey))
185 m.Handle("/sign-transactions", jsonHandler(bcr.pseudohsmSignTemplates))
186 m.Handle("/reset-password", jsonHandler(bcr.pseudohsmResetPassword))
187 m.Handle("/update-alias", jsonHandler(bcr.pseudohsmUpdateAlias))
189 latencyHandler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
190 if l := latency(m, req); l != nil {
191 defer l.RecordSince(time.Now())
195 handler := maxBytes(latencyHandler) // TODO(tessr): consider moving this to non-core specific mux
196 handler = webAssetsHandler(handler)
197 /* handler = healthHandler(handler)
198 for _, l := range a.requestLimits {
199 handler = limit.Handler(handler, alwaysError(errRateLimited), l.perSecond, l.burst, l.key)
201 handler = gzip.Handler{Handler: handler}
202 handler = coreCounter(handler)
203 handler = timeoutContextHandler(handler)
204 if a.config != nil && a.config.BlockchainId != nil {
205 handler = blockchainIDHandler(handler, a.config.BlockchainId.String())
209 bcr.handler = handler
212 // Used as a request object for api queries
213 type requestQuery struct {
214 Filter string `json:"filter,omitempty"`
215 FilterParams []interface{} `json:"filter_params,omitempty"`
216 SumBy []string `json:"sum_by,omitempty"`
217 PageSize int `json:"page_size"`
219 // AscLongPoll and Timeout are used by /list-transactions
220 // to facilitate notifications.
221 AscLongPoll bool `json:"ascending_with_long_poll,omitempty"`
222 Timeout json.Duration `json:"timeout"`
224 // After is a completely opaque cursor, indicating that only
225 // items in the result set after the one identified by `After`
226 // should be included. It has no relationship to time.
227 After string `json:"after"`
229 // These two are used for time-range queries like /list-transactions
230 StartTimeMS uint64 `json:"start_time,omitempty"`
231 EndTimeMS uint64 `json:"end_time,omitempty"`
233 // This is used for point-in-time queries like /list-balances
234 // TODO(bobg): Different request structs for endpoints with different needs
235 TimestampMS uint64 `json:"timestamp,omitempty"`
237 // This is used for filtering results from /list-access-tokens
238 // Value must be "client" or "network"
239 Type string `json:"type"`
241 // Aliases is used to filter results from /mockshm/list-keys
242 Aliases []string `json:"aliases,omitempty"`
245 // Used as a response object for api queries
247 Items interface{} `json:"items"`
248 Next requestQuery `json:"next"`
249 LastPage bool `json:"last_page"`
252 func NewBlockchainReactor(store *txdb.Store, chain *protocol.Chain, txPool *protocol.TxPool, accounts *account.Manager, assets *asset.Registry, hsm *pseudohsm.HSM, fastSync bool) *BlockchainReactor {
253 requestsCh := make(chan BlockRequest, defaultChannelCapacity)
254 timeoutsCh := make(chan string, defaultChannelCapacity)
255 pool := NewBlockPool(
260 mining := cpuminer.NewCPUMiner(chain, txPool)
261 bcR := &BlockchainReactor{
269 mux: http.NewServeMux(),
272 requestsCh: requestsCh,
273 timeoutsCh: timeoutsCh,
275 bcR.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcR)
279 // OnStart implements BaseService
280 func (bcR *BlockchainReactor) OnStart() error {
281 bcR.BaseReactor.OnStart()
284 _, err := bcR.pool.Start()
293 // OnStop implements BaseService
294 func (bcR *BlockchainReactor) OnStop() {
295 bcR.BaseReactor.OnStop()
298 // GetChannels implements Reactor
299 func (bcR *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
300 return []*p2p.ChannelDescriptor{
301 &p2p.ChannelDescriptor{
302 ID: BlockchainChannel,
304 SendQueueCapacity: 100,
309 // AddPeer implements Reactor by sending our state to peer.
310 func (bcR *BlockchainReactor) AddPeer(peer *p2p.Peer) {
311 if !peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}}) {
312 // doing nothing, will try later in `poolRoutine`
316 // RemovePeer implements Reactor by removing peer from the pool.
317 func (bcR *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
318 bcR.pool.RemovePeer(peer.Key)
321 // Receive implements Reactor by handling 4 types of messages (look below).
322 func (bcR *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
323 _, msg, err := DecodeMessage(msgBytes)
325 bcR.Logger.Error("Error decoding message", "error", err)
329 bcR.Logger.Info("Receive", "src", src, "chID", chID, "msg", msg)
331 switch msg := msg.(type) {
332 case *bcBlockRequestMessage:
333 rawBlock, err := bcR.store.GetRawBlock(msg.Height)
335 msg := &bcBlockResponseMessage{RawBlock: rawBlock}
336 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
338 // queue is full, just ignore.
341 fmt.Println("skip sent the block response due to block is nil")
342 // TODO peer is asking for things we don't have.
344 case *bcBlockResponseMessage:
346 //fmt.Printf("receive block %v \n", msg.Block)
347 bcR.pool.AddBlock(src.Key, msg.GetBlock(), len(msgBytes))
348 case *bcStatusRequestMessage:
349 // Send peer our state.
350 queued := src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusResponseMessage{bcR.chain.Height()}})
354 case *bcStatusResponseMessage:
355 // Got a peer status. Unverified.
356 //fmt.Printf("reveive peer high is %d \n", msg.Height)
357 bcR.pool.SetPeerHeight(src.Key, msg.Height)
358 case *bcTransactionMessage:
359 tx := msg.GetTransaction()
361 if err := bcR.chain.ValidateTx(tx); err != nil {
365 bcR.Logger.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
369 // Handle messages from the poolReactor telling the reactor what to do.
370 // NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
371 // (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
372 func (bcR *BlockchainReactor) poolRoutine() {
374 trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
375 statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
376 newTxCh := bcR.txPool.GetNewTxCh()
377 //switchToConsensusTicker := time.NewTicker(switchToConsensusIntervalSeconds * time.Second)
383 case request := <-bcR.requestsCh: // chan BlockRequest
384 peer := bcR.Switch.Peers().Get(request.PeerID)
386 continue FOR_LOOP // Peer has since been disconnected.
388 msg := &bcBlockRequestMessage{request.Height}
389 queued := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
391 // We couldn't make the request, send-queue full.
392 // The pool handles timeouts, just let it go.
395 case peerID := <-bcR.timeoutsCh: // chan string
397 peer := bcR.Switch.Peers().Get(peerID)
399 bcR.Switch.StopPeerForError(peer, errors.New("BlockchainReactor Timeout"))
401 case newTx := <-newTxCh:
402 go bcR.BroadcastTransaction(newTx)
403 case _ = <-statusUpdateTicker.C:
404 // ask for status updates
405 go bcR.BroadcastStatusRequest()
406 case _ = <-trySyncTicker.C: // chan time
408 for i := 0; i < 10; i++ {
409 // See if there are any blocks to sync.
410 block, _ := bcR.pool.PeekTwoBlocks()
415 bcR.pool.PopRequest()
416 snap, err := bcR.chain.ApplyValidBlock(block)
418 fmt.Printf("Failed to apply valid block: %v \n", err)
421 err = bcR.chain.CommitAppliedBlock(nil, block, snap)
423 fmt.Printf("Failed to commit block: %v \n", err)
426 bcR.Logger.Info("finish to sync commit block", block.BlockHeader.Height)
432 if bcR.pool.IsCaughtUp() && !bcR.mining.IsMining() {
433 bcR.Logger.Info("start to mining")
439 // BroadcastStatusRequest broadcasts `BlockStore` height.
440 func (bcR *BlockchainReactor) BroadcastStatusRequest() error {
441 bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcStatusRequestMessage{bcR.chain.Height()}})
445 func (bcR *BlockchainReactor) BroadcastTransaction(tx *legacy.Tx) error {
446 rawTx, err := tx.TxData.MarshalText()
450 bcR.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{&bcTransactionMessage{rawTx}})
454 //-----------------------------------------------------------------------------
458 msgTypeBlockRequest = byte(0x10)
459 msgTypeBlockResponse = byte(0x11)
460 msgTypeStatusResponse = byte(0x20)
461 msgTypeStatusRequest = byte(0x21)
462 msgTypeTransactionRequest = byte(0x30)
465 // BlockchainMessage is a generic message for this reactor.
466 type BlockchainMessage interface{}
468 var _ = wire.RegisterInterface(
469 struct{ BlockchainMessage }{},
470 wire.ConcreteType{&bcBlockRequestMessage{}, msgTypeBlockRequest},
471 wire.ConcreteType{&bcBlockResponseMessage{}, msgTypeBlockResponse},
472 wire.ConcreteType{&bcStatusResponseMessage{}, msgTypeStatusResponse},
473 wire.ConcreteType{&bcStatusRequestMessage{}, msgTypeStatusRequest},
474 wire.ConcreteType{&bcTransactionMessage{}, msgTypeTransactionRequest},
477 // DecodeMessage decodes BlockchainMessage.
478 // TODO: ensure that bz is completely read.
479 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
482 r := bytes.NewReader(bz)
483 msg = wire.ReadBinary(struct{ BlockchainMessage }{}, r, maxBlockchainResponseSize, &n, &err).(struct{ BlockchainMessage }).BlockchainMessage
484 if err != nil && n != len(bz) {
485 err = errors.New("DecodeMessage() had bytes left over")
490 //-----------------------------------
492 type bcBlockRequestMessage struct {
496 func (m *bcBlockRequestMessage) String() string {
497 return cmn.Fmt("[bcBlockRequestMessage %v]", m.Height)
500 //-------------------------------------
502 type bcTransactionMessage struct {
506 func (m *bcTransactionMessage) GetTransaction() *legacy.Tx {
508 tx.UnmarshalText(m.RawTx)
512 //-------------------------------------
514 //-------------------------------------
516 // NOTE: keep up-to-date with maxBlockchainResponseSize
517 type bcBlockResponseMessage struct {
521 func (m *bcBlockResponseMessage) GetBlock() *legacy.Block {
522 block := &legacy.Block{
523 BlockHeader: legacy.BlockHeader{},
524 Transactions: []*legacy.Tx{},
526 block.UnmarshalText(m.RawBlock)
530 func (m *bcBlockResponseMessage) String() string {
531 block := m.GetBlock()
532 return cmn.Fmt("[bcBlockResponseMessage %v]", block.BlockHeader.Height)
535 //-------------------------------------
537 type bcStatusRequestMessage struct {
541 func (m *bcStatusRequestMessage) String() string {
542 return cmn.Fmt("[bcStatusRequestMessage %v]", m.Height)
545 //-------------------------------------
547 type bcStatusResponseMessage struct {
551 func (m *bcStatusResponseMessage) String() string {
552 return cmn.Fmt("[bcStatusResponseMessage %v]", m.Height)