OSDN Git Service

fix log output
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strings"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-crypto"
8         "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         "net"
13         "time"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/p2p"
17         core "github.com/bytom/protocol"
18         "github.com/bytom/protocol/bc"
19         "github.com/bytom/version"
20 )
21
22 //SyncManager Sync Manager is responsible for the business layer information synchronization
23 type SyncManager struct {
24         networkID uint64
25         sw        *p2p.Switch
26         addrBook  *p2p.AddrBook // known peers
27
28         privKey     crypto.PrivKeyEd25519 // local node's p2p key
29         chain       *core.Chain
30         txPool      *core.TxPool
31         fetcher     *Fetcher
32         blockKeeper *blockKeeper
33         peers       *peerSet
34         mapResult   bool
35
36         newBlockCh    chan *bc.Hash
37         newPeerCh     chan struct{}
38         txSyncCh      chan *txsync
39         dropPeerCh    chan *string
40         quitSync      chan struct{}
41         config        *cfg.Config
42         synchronising int32
43 }
44
45 //NewSyncManager create a sync manager
46 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
47         // Create the protocol manager with the base fields
48         manager := &SyncManager{
49                 txPool:     txPool,
50                 chain:      chain,
51                 privKey:    crypto.GenPrivKeyEd25519(),
52                 config:     config,
53                 quitSync:   make(chan struct{}),
54                 newBlockCh: newBlockCh,
55                 newPeerCh:  make(chan struct{}),
56                 txSyncCh:   make(chan *txsync),
57                 dropPeerCh: make(chan *string, maxQuitReq),
58                 peers:      newPeerSet(),
59         }
60
61         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
62         manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
63
64         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
65         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
66
67         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
68         manager.sw.AddReactor("PROTOCOL", protocolReactor)
69
70         // Create & add listener
71         var mapResult bool
72         var l p2p.Listener
73         if !config.VaultMode {
74                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
75                 l, mapResult = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
76                 manager.sw.AddListener(l)
77         }
78         manager.sw.SetNodeInfo(manager.makeNodeInfo(mapResult))
79         manager.sw.SetNodePrivKey(manager.privKey)
80         manager.mapResult = mapResult
81         // Optionally, start the pex reactor
82         //var addrBook *p2p.AddrBook
83         if config.P2P.PexReactor {
84                 manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
85                 pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
86                 manager.sw.AddReactor("PEX", pexReactor)
87         }
88
89         return manager, nil
90 }
91
92 // Defaults to tcp
93 func protocolAndAddress(listenAddr string) (string, string) {
94         p, address := "tcp", listenAddr
95         parts := strings.SplitN(address, "://", 2)
96         if len(parts) == 2 {
97                 p, address = parts[0], parts[1]
98         }
99         return p, address
100 }
101
102 func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
103         nodeInfo := &p2p.NodeInfo{
104                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
105                 Moniker: sm.config.Moniker,
106                 Network: sm.config.ChainID,
107                 Version: version.Version,
108                 Other: []string{
109                         cmn.Fmt("wire_version=%v", wire.Version),
110                         cmn.Fmt("p2p_version=%v", p2p.Version),
111                 },
112         }
113
114         if !sm.sw.IsListening() {
115                 return nodeInfo
116         }
117
118         p2pListener := sm.sw.Listeners()[0]
119
120         // We assume that the rpcListener has the same ExternalAddress.
121         // This is probably true because both P2P and RPC listeners use UPnP,
122         // except of course if the rpc is only bound to localhost
123         if listenOpen {
124                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
125         } else {
126                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
127         }
128         return nodeInfo
129 }
130
131 func (sm *SyncManager) netStart() error {
132         // Start the switch
133         _, err := sm.sw.Start()
134         if err != nil {
135                 return err
136         }
137
138         if !sm.mapResult {
139                 conn, err := net.DialTimeout("tcp", sm.NodeInfo().ListenAddr, 3*time.Second)
140
141                 if err != nil && conn == nil {
142                         log.Error("Could not open listen port")
143                 }
144
145                 if err == nil && conn != nil {
146                         log.Info("Success open listen port")
147                         conn.Close()
148                         sm.sw.SetNodeInfo(sm.makeNodeInfo(true))
149                 }
150         }
151
152         // If seeds exist, add them to the address book and dial out
153         if sm.config.P2P.Seeds != "" {
154                 // dial out
155                 seeds := strings.Split(sm.config.P2P.Seeds, ",")
156                 if err := sm.DialSeeds(seeds); err != nil {
157                         return err
158                 }
159         }
160         log.WithField("nodeInfo", sm.sw.NodeInfo()).Info("net start")
161         return nil
162 }
163
164 //Start start sync manager service
165 func (sm *SyncManager) Start() {
166         go sm.netStart()
167         // broadcast transactions
168         go sm.txBroadcastLoop()
169
170         // broadcast mined blocks
171         go sm.minedBroadcastLoop()
172
173         // start sync handlers
174         go sm.syncer()
175
176         go sm.txsyncLoop()
177 }
178
179 //Stop stop sync manager
180 func (sm *SyncManager) Stop() {
181         close(sm.quitSync)
182         sm.sw.Stop()
183 }
184
185 func (sm *SyncManager) txBroadcastLoop() {
186         newTxCh := sm.txPool.GetNewTxCh()
187         for {
188                 select {
189                 case newTx := <-newTxCh:
190                         peers, err := sm.peers.BroadcastTx(newTx)
191                         if err != nil {
192                                 log.Errorf("Broadcast new tx error. %v", err)
193                                 return
194                         }
195                         for _, smPeer := range peers {
196                                 if smPeer == nil {
197                                         continue
198                                 }
199                                 swPeer := smPeer.getPeer()
200                                 log.Info("Tx broadcast error. Stop Peer.")
201                                 sm.sw.StopPeerGracefully(swPeer)
202                         }
203                 case <-sm.quitSync:
204                         return
205                 }
206         }
207 }
208
209 func (sm *SyncManager) minedBroadcastLoop() {
210         for {
211                 select {
212                 case blockHash := <-sm.newBlockCh:
213                         block, err := sm.chain.GetBlockByHash(blockHash)
214                         if err != nil {
215                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
216                                 return
217                         }
218                         peers, err := sm.peers.BroadcastMinedBlock(block)
219                         if err != nil {
220                                 log.Errorf("Broadcast mine block error. %v", err)
221                                 return
222                         }
223                         for _, smPeer := range peers {
224                                 if smPeer == nil {
225                                         continue
226                                 }
227                                 swPeer := smPeer.getPeer()
228                                 log.Info("New mined block broadcast error. Stop Peer.")
229                                 sm.sw.StopPeerGracefully(swPeer)
230                         }
231                 case <-sm.quitSync:
232                         return
233                 }
234         }
235 }
236
237 //NodeInfo get P2P peer node info
238 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
239         return sm.sw.NodeInfo()
240 }
241
242 //BlockKeeper get block keeper
243 func (sm *SyncManager) BlockKeeper() *blockKeeper {
244         return sm.blockKeeper
245 }
246
247 //Peers get sync manager peer set
248 func (sm *SyncManager) Peers() *peerSet {
249         return sm.peers
250 }
251
252 //DialSeeds dial seed peers
253 func (sm *SyncManager) DialSeeds(seeds []string) error {
254         return sm.sw.DialSeeds(sm.addrBook, seeds)
255 }
256
257 //Switch get sync manager switch
258 func (sm *SyncManager) Switch() *p2p.Switch {
259         return sm.sw
260 }