OSDN Git Service

Optimize p2p transfer business layer logic (#501)
[bytom/bytom.git] / netsync / fetcher.go
1 package netsync
2
3 import (
4         "errors"
5
6         log "github.com/sirupsen/logrus"
7         "gopkg.in/karalabe/cookiejar.v2/collections/prque"
8
9         "github.com/bytom/p2p"
10         core "github.com/bytom/protocol"
11         "github.com/bytom/protocol/bc"
12         "github.com/bytom/protocol/bc/types"
13         "strings"
14 )
15
16 const (
17         maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
18 )
19
20 var (
21         errTerminated = errors.New("terminated")
22 )
23
24 // Fetcher is responsible for accumulating block announcements from various peers
25 // and scheduling them for retrieval.
26 type Fetcher struct {
27         chain *core.Chain
28         sw    *p2p.Switch
29         peers *peerSet
30
31         // Various event channels
32         newMinedBlock chan *blockPending
33         quit          chan struct{}
34
35         // Block cache
36         queue  *prque.Prque              // Queue containing the import operations (block number sorted)
37         queues map[string]int            // Per peer block counts to prevent memory exhaustion
38         queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
39 }
40
41 //NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
42 func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
43         return &Fetcher{
44                 chain:         chain,
45                 sw:            sw,
46                 peers:         peers,
47                 newMinedBlock: make(chan *blockPending),
48                 quit:          make(chan struct{}),
49                 queue:         prque.New(),
50                 queues:        make(map[string]int),
51                 queued:        make(map[bc.Hash]*blockPending),
52         }
53 }
54
55 // Start boots up the announcement based synchroniser, accepting and processing
56 // hash notifications and block fetches until termination requested.
57 func (f *Fetcher) Start() {
58         go f.loop()
59 }
60
61 // Stop terminates the announcement based synchroniser, canceling all pending
62 // operations.
63 func (f *Fetcher) Stop() {
64         close(f.quit)
65 }
66
67 // Enqueue tries to fill gaps the the fetcher's future import queue.
68 func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
69         op := &blockPending{
70                 peerID: peer,
71                 block:  block,
72         }
73         select {
74         case f.newMinedBlock <- op:
75                 return nil
76         case <-f.quit:
77                 return errTerminated
78         }
79 }
80
81 // Loop is the main fetcher loop, checking and processing various notification
82 // events.
83 func (f *Fetcher) loop() {
84         for {
85                 // Import any queued blocks that could potentially fit
86                 height := f.chain.Height()
87                 for !f.queue.Empty() {
88                         op := f.queue.PopItem().(*blockPending)
89                         // If too high up the chain or phase, continue later
90                         number := op.block.Height
91                         if number > height+1 {
92                                 f.queue.Push(op, -float32(op.block.Height))
93                                 break
94                         }
95                         // Otherwise if fresh and still unknown, try and import
96                         hash := op.block.Hash()
97                         block, _ := f.chain.GetBlockByHash(&hash)
98                         if block != nil {
99                                 f.forgetBlock(hash)
100                                 continue
101                         }
102                         if strings.Compare(op.block.PreviousBlockHash.String(), f.chain.BestBlockHash().String()) != 0 {
103                                 f.forgetBlock(hash)
104                                 continue
105                         }
106                         f.insert(op.peerID, op.block)
107                 }
108                 // Wait for an outside event to occur
109                 select {
110                 case <-f.quit:
111                         // Fetcher terminating, abort all operations
112                         return
113
114                 case op := <-f.newMinedBlock:
115                         // A direct block insertion was requested, try and fill any pending gaps
116                         f.enqueue(op.peerID, op.block)
117                 }
118         }
119 }
120
121 // enqueue schedules a new future import operation, if the block to be imported
122 // has not yet been seen.
123 func (f *Fetcher) enqueue(peer string, block *types.Block) {
124         hash := block.Hash()
125
126         //TODO: Ensure the peer isn't DOSing us
127         // Discard any past or too distant blocks
128         if dist := int64(block.Height) - int64(f.chain.Height()); dist < 0 || dist > maxQueueDist {
129                 log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
130                 return
131         }
132         // Schedule the block for future importing
133         if _, ok := f.queued[hash]; !ok {
134                 op := &blockPending{
135                         peerID: peer,
136                         block:  block,
137                 }
138                 f.queued[hash] = op
139                 f.queue.Push(op, -float32(block.Height))
140                 log.Info("Queued propagated block.", " peer: ", peer, "number: ", block.Height, "queued: ", f.queue.Size())
141         }
142 }
143
144 // insert spawns a new goroutine to run a block insertion into the chain. If the
145 // block's number is at the same height as the current import phase, it updates
146 // the phase states accordingly.
147 func (f *Fetcher) insert(peer string, block *types.Block) {
148         // Run the import on a new thread
149         log.Info("Importing propagated block", " from peer: ", peer, " height: ", block.Height)
150         // Run the actual import and log any issues
151         if _, err := f.chain.ProcessBlock(block); err != nil {
152                 log.Info("Propagated block import failed", " from peer: ", peer, " height: ", block.Height, "err: ", err)
153                 return
154         }
155         // If import succeeded, broadcast the block
156         log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
157         go f.peers.BroadcastMinedBlock(block)
158 }
159
160 // forgetBlock removes all traces of a queued block from the fetcher's internal
161 // state.
162 func (f *Fetcher) forgetBlock(hash bc.Hash) {
163         if insert := f.queued[hash]; insert != nil {
164                 f.queues[insert.peerID]--
165                 if f.queues[insert.peerID] == 0 {
166                         delete(f.queues, insert.peerID)
167                 }
168                 delete(f.queued, hash)
169         }
170 }