OSDN Git Service

fix suggestion
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strings"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-crypto"
8         "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         cfg "github.com/bytom/config"
13         "github.com/bytom/p2p"
14         "github.com/bytom/p2p/pex"
15         core "github.com/bytom/protocol"
16         "github.com/bytom/protocol/bc"
17         "github.com/bytom/protocol/bc/types"
18         "github.com/bytom/version"
19 )
20
21 //SyncManager Sync Manager is responsible for the business layer information synchronization
22 type SyncManager struct {
23         networkID uint64
24         sw        *p2p.Switch
25
26         privKey     crypto.PrivKeyEd25519 // local node's p2p key
27         chain       *core.Chain
28         txPool      *core.TxPool
29         fetcher     *Fetcher
30         blockKeeper *blockKeeper
31         peers       *peerSet
32
33         newTxCh       chan *types.Tx
34         newBlockCh    chan *bc.Hash
35         newPeerCh     chan struct{}
36         txSyncCh      chan *txsync
37         dropPeerCh    chan *string
38         quitSync      chan struct{}
39         config        *cfg.Config
40         synchronising int32
41 }
42
43 //NewSyncManager create a sync manager
44 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
45         // Create the protocol manager with the base fields
46         manager := &SyncManager{
47                 txPool:     txPool,
48                 chain:      chain,
49                 privKey:    crypto.GenPrivKeyEd25519(),
50                 peers:      newPeerSet(),
51                 newTxCh:    make(chan *types.Tx, maxTxChanSize),
52                 newBlockCh: newBlockCh,
53                 newPeerCh:  make(chan struct{}),
54                 txSyncCh:   make(chan *txsync),
55                 dropPeerCh: make(chan *string, maxQuitReq),
56                 quitSync:   make(chan struct{}),
57                 config:     config,
58         }
59
60         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
61         addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
62         manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
63
64         pexReactor := pex.NewPEXReactor(addrBook)
65         manager.sw.AddReactor("PEX", pexReactor)
66
67         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
68         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
69         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
70         manager.sw.AddReactor("PROTOCOL", protocolReactor)
71
72         // Create & add listener
73         var listenerStatus bool
74         var l p2p.Listener
75         if !config.VaultMode {
76                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
77                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
78                 manager.sw.AddListener(l)
79         }
80         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
81         manager.sw.SetNodePrivKey(manager.privKey)
82
83         return manager, nil
84 }
85
86 // Defaults to tcp
87 func protocolAndAddress(listenAddr string) (string, string) {
88         p, address := "tcp", listenAddr
89         parts := strings.SplitN(address, "://", 2)
90         if len(parts) == 2 {
91                 p, address = parts[0], parts[1]
92         }
93         return p, address
94 }
95
96 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
97         nodeInfo := &p2p.NodeInfo{
98                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
99                 Moniker: sm.config.Moniker,
100                 Network: sm.config.ChainID,
101                 Version: version.Version,
102                 Other: []string{
103                         cmn.Fmt("wire_version=%v", wire.Version),
104                         cmn.Fmt("p2p_version=%v", p2p.Version),
105                 },
106         }
107
108         if !sm.sw.IsListening() {
109                 return nodeInfo
110         }
111
112         p2pListener := sm.sw.Listeners()[0]
113
114         // We assume that the rpcListener has the same ExternalAddress.
115         // This is probably true because both P2P and RPC listeners use UPnP,
116         // except of course if the rpc is only bound to localhost
117         if listenerStatus {
118                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
119         } else {
120                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
121         }
122         return nodeInfo
123 }
124
125 func (sm *SyncManager) netStart() error {
126         _, err := sm.sw.Start()
127         return err
128 }
129
130 //Start start sync manager service
131 func (sm *SyncManager) Start() {
132         go sm.netStart()
133         // broadcast transactions
134         go sm.txBroadcastLoop()
135
136         // broadcast mined blocks
137         go sm.minedBroadcastLoop()
138
139         // start sync handlers
140         go sm.syncer()
141
142         go sm.txsyncLoop()
143 }
144
145 //Stop stop sync manager
146 func (sm *SyncManager) Stop() {
147         close(sm.quitSync)
148         sm.sw.Stop()
149 }
150
151 func (sm *SyncManager) txBroadcastLoop() {
152         for {
153                 select {
154                 case newTx := <-sm.newTxCh:
155                         peers, err := sm.peers.BroadcastTx(newTx)
156                         if err != nil {
157                                 log.Errorf("Broadcast new tx error. %v", err)
158                                 return
159                         }
160                         for _, smPeer := range peers {
161                                 if smPeer == nil {
162                                         continue
163                                 }
164                                 swPeer := smPeer.getPeer()
165                                 log.Info("Tx broadcast error. Stop Peer.")
166                                 sm.sw.StopPeerGracefully(swPeer)
167                         }
168                 case <-sm.quitSync:
169                         return
170                 }
171         }
172 }
173
174 func (sm *SyncManager) minedBroadcastLoop() {
175         for {
176                 select {
177                 case blockHash := <-sm.newBlockCh:
178                         block, err := sm.chain.GetBlockByHash(blockHash)
179                         if err != nil {
180                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
181                                 return
182                         }
183                         peers, err := sm.peers.BroadcastMinedBlock(block)
184                         if err != nil {
185                                 log.Errorf("Broadcast mine block error. %v", err)
186                                 return
187                         }
188                         for _, smPeer := range peers {
189                                 if smPeer == nil {
190                                         continue
191                                 }
192                                 swPeer := smPeer.getPeer()
193                                 log.Info("New mined block broadcast error. Stop Peer.")
194                                 sm.sw.StopPeerGracefully(swPeer)
195                         }
196                 case <-sm.quitSync:
197                         return
198                 }
199         }
200 }
201
202 //NodeInfo get P2P peer node info
203 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
204         return sm.sw.NodeInfo()
205 }
206
207 //BlockKeeper get block keeper
208 func (sm *SyncManager) BlockKeeper() *blockKeeper {
209         return sm.blockKeeper
210 }
211
212 //Peers get sync manager peer set
213 func (sm *SyncManager) Peers() *peerSet {
214         return sm.peers
215 }
216
217 //Switch get sync manager switch
218 func (sm *SyncManager) Switch() *p2p.Switch {
219         return sm.sw
220 }
221
222 // GetNewTxCh return a unconfirmed transaction feed channel
223 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
224         return sm.newTxCh
225 }