OSDN Git Service

Merge pull request #1090 from Bytom/add_fastsync_flag
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "path"
7         "strconv"
8         "strings"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         cfg "github.com/bytom/config"
15         "github.com/bytom/consensus"
16         "github.com/bytom/p2p"
17         "github.com/bytom/p2p/discover"
18         "github.com/bytom/p2p/pex"
19         core "github.com/bytom/protocol"
20         "github.com/bytom/protocol/bc"
21         "github.com/bytom/protocol/bc/types"
22         "github.com/bytom/version"
23 )
24
25 //SyncManager Sync Manager is responsible for the business layer information synchronization
26 type SyncManager struct {
27         sw *p2p.Switch
28
29         privKey     crypto.PrivKeyEd25519 // local node's p2p key
30         chain       *core.Chain
31         txPool      *core.TxPool
32         fetcher     *Fetcher
33         blockKeeper *blockKeeper
34         peers       *peerSet
35
36         newTxCh       chan *types.Tx
37         newBlockCh    chan *bc.Hash
38         newPeerCh     chan struct{}
39         txSyncCh      chan *txsync
40         dropPeerCh    chan *string
41         quitSync      chan struct{}
42         config        *cfg.Config
43         synchronising int32
44 }
45
46 //NewSyncManager create a sync manager
47 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
48         sw := p2p.NewSwitch(config)
49         peers := newPeerSet()
50         dropPeerCh := make(chan *string, maxQuitReq)
51         manager := &SyncManager{
52                 sw:          sw,
53                 txPool:      txPool,
54                 chain:       chain,
55                 privKey:     crypto.GenPrivKeyEd25519(),
56                 fetcher:     NewFetcher(chain, sw, peers),
57                 blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
58                 peers:       peers,
59                 newTxCh:     make(chan *types.Tx, maxTxChanSize),
60                 newBlockCh:  newBlockCh,
61                 newPeerCh:   make(chan struct{}),
62                 txSyncCh:    make(chan *txsync),
63                 dropPeerCh:  dropPeerCh,
64                 quitSync:    make(chan struct{}),
65                 config:      config,
66         }
67
68         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
69         manager.sw.AddReactor("PROTOCOL", protocolReactor)
70
71         // Create & add listener
72         var listenerStatus bool
73         var l p2p.Listener
74         if !config.VaultMode {
75                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
76                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
77                 manager.sw.AddListener(l)
78
79                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
80                 if err != nil {
81                         return nil, err
82                 }
83
84                 pexReactor := pex.NewPEXReactor(discv)
85                 manager.sw.AddReactor("PEX", pexReactor)
86         }
87         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
88         manager.sw.SetNodePrivKey(manager.privKey)
89         return manager, nil
90 }
91
92 // Defaults to tcp
93 func protocolAndAddress(listenAddr string) (string, string) {
94         p, address := "tcp", listenAddr
95         parts := strings.SplitN(address, "://", 2)
96         if len(parts) == 2 {
97                 p, address = parts[0], parts[1]
98         }
99         return p, address
100 }
101
102 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
103         nodeInfo := &p2p.NodeInfo{
104                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
105                 Moniker: sm.config.Moniker,
106                 Network: sm.config.ChainID,
107                 Version: version.Version,
108                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
109         }
110
111         if !sm.sw.IsListening() {
112                 return nodeInfo
113         }
114
115         p2pListener := sm.sw.Listeners()[0]
116
117         // We assume that the rpcListener has the same ExternalAddress.
118         // This is probably true because both P2P and RPC listeners use UPnP,
119         // except of course if the rpc is only bound to localhost
120         if listenerStatus {
121                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
122         } else {
123                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
124         }
125         return nodeInfo
126 }
127
128 //Start start sync manager service
129 func (sm *SyncManager) Start() {
130         if _, err := sm.sw.Start(); err != nil {
131                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
132         }
133         // broadcast transactions
134         go sm.txBroadcastLoop()
135
136         // broadcast mined blocks
137         go sm.minedBroadcastLoop()
138
139         // start sync handlers
140         go sm.syncer()
141
142         go sm.txsyncLoop()
143 }
144
145 //Stop stop sync manager
146 func (sm *SyncManager) Stop() {
147         close(sm.quitSync)
148         sm.sw.Stop()
149 }
150
151 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
152         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
153         if err != nil {
154                 return nil, err
155         }
156
157         conn, err := net.ListenUDP("udp", addr)
158         if err != nil {
159                 return nil, err
160         }
161
162         realaddr := conn.LocalAddr().(*net.UDPAddr)
163         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
164         if err != nil {
165                 return nil, err
166         }
167
168         // add the seeds node to the discover table
169         if config.P2P.Seeds == "" {
170                 return ntab, nil
171         }
172         nodes := []*discover.Node{}
173         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
174                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
175                 nodes = append(nodes, discover.MustParseNode(url))
176         }
177         if err = ntab.SetFallbackNodes(nodes); err != nil {
178                 return nil, err
179         }
180         return ntab, nil
181 }
182
183 func (sm *SyncManager) txBroadcastLoop() {
184         for {
185                 select {
186                 case newTx := <-sm.newTxCh:
187                         peers, err := sm.peers.BroadcastTx(newTx)
188                         if err != nil {
189                                 log.Errorf("Broadcast new tx error. %v", err)
190                                 return
191                         }
192                         for _, smPeer := range peers {
193                                 if smPeer == nil {
194                                         continue
195                                 }
196                                 swPeer := smPeer.getPeer()
197                                 log.Info("Tx broadcast error. Stop Peer.")
198                                 sm.sw.StopPeerGracefully(swPeer)
199                         }
200                 case <-sm.quitSync:
201                         return
202                 }
203         }
204 }
205
206 func (sm *SyncManager) minedBroadcastLoop() {
207         for {
208                 select {
209                 case blockHash := <-sm.newBlockCh:
210                         block, err := sm.chain.GetBlockByHash(blockHash)
211                         if err != nil {
212                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
213                                 return
214                         }
215                         peers, err := sm.peers.BroadcastMinedBlock(block)
216                         if err != nil {
217                                 log.Errorf("Broadcast mine block error. %v", err)
218                                 return
219                         }
220                         for _, smPeer := range peers {
221                                 if smPeer == nil {
222                                         continue
223                                 }
224                                 swPeer := smPeer.getPeer()
225                                 log.Info("New mined block broadcast error. Stop Peer.")
226                                 sm.sw.StopPeerGracefully(swPeer)
227                         }
228                 case <-sm.quitSync:
229                         return
230                 }
231         }
232 }
233
234 //NodeInfo get P2P peer node info
235 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
236         return sm.sw.NodeInfo()
237 }
238
239 //BlockKeeper get block keeper
240 func (sm *SyncManager) BlockKeeper() *blockKeeper {
241         return sm.blockKeeper
242 }
243
244 //Peers get sync manager peer set
245 func (sm *SyncManager) Peers() *peerSet {
246         return sm.peers
247 }
248
249 //Switch get sync manager switch
250 func (sm *SyncManager) Switch() *p2p.Switch {
251         return sm.sw
252 }
253
254 // GetNewTxCh return a unconfirmed transaction feed channel
255 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
256         return sm.newTxCh
257 }