OSDN Git Service

Merge pull request #980 from freewind/fix-961
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strings"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-crypto"
8         "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         cfg "github.com/bytom/config"
13         "github.com/bytom/p2p"
14         "github.com/bytom/p2p/pex"
15         core "github.com/bytom/protocol"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/version"
18 )
19
20 //SyncManager Sync Manager is responsible for the business layer information synchronization
21 type SyncManager struct {
22         networkID uint64
23         sw        *p2p.Switch
24
25         privKey     crypto.PrivKeyEd25519 // local node's p2p key
26         chain       *core.Chain
27         txPool      *core.TxPool
28         fetcher     *Fetcher
29         blockKeeper *blockKeeper
30         peers       *peerSet
31
32         newBlockCh    chan *bc.Hash
33         newPeerCh     chan struct{}
34         txSyncCh      chan *txsync
35         dropPeerCh    chan *string
36         quitSync      chan struct{}
37         config        *cfg.Config
38         synchronising int32
39 }
40
41 //NewSyncManager create a sync manager
42 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
43         // Create the protocol manager with the base fields
44         manager := &SyncManager{
45                 txPool:     txPool,
46                 chain:      chain,
47                 privKey:    crypto.GenPrivKeyEd25519(),
48                 config:     config,
49                 quitSync:   make(chan struct{}),
50                 newBlockCh: newBlockCh,
51                 newPeerCh:  make(chan struct{}),
52                 txSyncCh:   make(chan *txsync),
53                 dropPeerCh: make(chan *string, maxQuitReq),
54                 peers:      newPeerSet(),
55         }
56
57         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
58         addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
59         manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
60
61         pexReactor := pex.NewPEXReactor(addrBook)
62         manager.sw.AddReactor("PEX", pexReactor)
63
64         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
65         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
66         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
67         manager.sw.AddReactor("PROTOCOL", protocolReactor)
68
69         // Create & add listener
70         var listenerStatus bool
71         var l p2p.Listener
72         if !config.VaultMode {
73                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
74                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
75                 manager.sw.AddListener(l)
76         }
77         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
78         manager.sw.SetNodePrivKey(manager.privKey)
79
80         return manager, nil
81 }
82
83 // Defaults to tcp
84 func protocolAndAddress(listenAddr string) (string, string) {
85         p, address := "tcp", listenAddr
86         parts := strings.SplitN(address, "://", 2)
87         if len(parts) == 2 {
88                 p, address = parts[0], parts[1]
89         }
90         return p, address
91 }
92
93 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
94         nodeInfo := &p2p.NodeInfo{
95                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
96                 Moniker: sm.config.Moniker,
97                 Network: sm.config.ChainID,
98                 Version: version.Version,
99                 Other: []string{
100                         cmn.Fmt("wire_version=%v", wire.Version),
101                         cmn.Fmt("p2p_version=%v", p2p.Version),
102                 },
103         }
104
105         if !sm.sw.IsListening() {
106                 return nodeInfo
107         }
108
109         p2pListener := sm.sw.Listeners()[0]
110
111         // We assume that the rpcListener has the same ExternalAddress.
112         // This is probably true because both P2P and RPC listeners use UPnP,
113         // except of course if the rpc is only bound to localhost
114         if listenerStatus {
115                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
116         } else {
117                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
118         }
119         return nodeInfo
120 }
121
122 func (sm *SyncManager) netStart() error {
123         _, err := sm.sw.Start()
124         return err
125 }
126
127 //Start start sync manager service
128 func (sm *SyncManager) Start() {
129         go sm.netStart()
130         // broadcast transactions
131         go sm.txBroadcastLoop()
132
133         // broadcast mined blocks
134         go sm.minedBroadcastLoop()
135
136         // start sync handlers
137         go sm.syncer()
138
139         go sm.txsyncLoop()
140 }
141
142 //Stop stop sync manager
143 func (sm *SyncManager) Stop() {
144         close(sm.quitSync)
145         sm.sw.Stop()
146 }
147
148 func (sm *SyncManager) txBroadcastLoop() {
149         newTxCh := sm.txPool.GetNewTxCh()
150         for {
151                 select {
152                 case newTx := <-newTxCh:
153                         peers, err := sm.peers.BroadcastTx(newTx)
154                         if err != nil {
155                                 log.Errorf("Broadcast new tx error. %v", err)
156                                 return
157                         }
158                         for _, smPeer := range peers {
159                                 if smPeer == nil {
160                                         continue
161                                 }
162                                 swPeer := smPeer.getPeer()
163                                 log.Info("Tx broadcast error. Stop Peer.")
164                                 sm.sw.StopPeerGracefully(swPeer)
165                         }
166                 case <-sm.quitSync:
167                         return
168                 }
169         }
170 }
171
172 func (sm *SyncManager) minedBroadcastLoop() {
173         for {
174                 select {
175                 case blockHash := <-sm.newBlockCh:
176                         block, err := sm.chain.GetBlockByHash(blockHash)
177                         if err != nil {
178                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
179                                 return
180                         }
181                         peers, err := sm.peers.BroadcastMinedBlock(block)
182                         if err != nil {
183                                 log.Errorf("Broadcast mine block error. %v", err)
184                                 return
185                         }
186                         for _, smPeer := range peers {
187                                 if smPeer == nil {
188                                         continue
189                                 }
190                                 swPeer := smPeer.getPeer()
191                                 log.Info("New mined block broadcast error. Stop Peer.")
192                                 sm.sw.StopPeerGracefully(swPeer)
193                         }
194                 case <-sm.quitSync:
195                         return
196                 }
197         }
198 }
199
200 //NodeInfo get P2P peer node info
201 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
202         return sm.sw.NodeInfo()
203 }
204
205 //BlockKeeper get block keeper
206 func (sm *SyncManager) BlockKeeper() *blockKeeper {
207         return sm.blockKeeper
208 }
209
210 //Peers get sync manager peer set
211 func (sm *SyncManager) Peers() *peerSet {
212         return sm.peers
213 }
214
215 //Switch get sync manager switch
216 func (sm *SyncManager) Switch() *p2p.Switch {
217         return sm.sw
218 }