OSDN Git Service

Merge pull request #1042 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strconv"
5         "strings"
6
7         log "github.com/sirupsen/logrus"
8         "github.com/tendermint/go-crypto"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         cfg "github.com/bytom/config"
13         "github.com/bytom/consensus"
14         "github.com/bytom/p2p"
15         "github.com/bytom/p2p/pex"
16         core "github.com/bytom/protocol"
17         "github.com/bytom/protocol/bc"
18         "github.com/bytom/protocol/bc/types"
19         "github.com/bytom/version"
20 )
21
22 //SyncManager Sync Manager is responsible for the business layer information synchronization
23 type SyncManager struct {
24         networkID uint64
25         sw        *p2p.Switch
26
27         privKey     crypto.PrivKeyEd25519 // local node's p2p key
28         chain       *core.Chain
29         txPool      *core.TxPool
30         fetcher     *Fetcher
31         blockKeeper *blockKeeper
32         peers       *peerSet
33
34         newTxCh       chan *types.Tx
35         newBlockCh    chan *bc.Hash
36         newPeerCh     chan struct{}
37         txSyncCh      chan *txsync
38         dropPeerCh    chan *string
39         quitSync      chan struct{}
40         config        *cfg.Config
41         synchronising int32
42 }
43
44 //NewSyncManager create a sync manager
45 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
46         // Create the protocol manager with the base fields
47         manager := &SyncManager{
48                 txPool:     txPool,
49                 chain:      chain,
50                 privKey:    crypto.GenPrivKeyEd25519(),
51                 peers:      newPeerSet(),
52                 newTxCh:    make(chan *types.Tx, maxTxChanSize),
53                 newBlockCh: newBlockCh,
54                 newPeerCh:  make(chan struct{}),
55                 txSyncCh:   make(chan *txsync),
56                 dropPeerCh: make(chan *string, maxQuitReq),
57                 quitSync:   make(chan struct{}),
58                 config:     config,
59         }
60
61         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
62         addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
63         manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
64
65         pexReactor := pex.NewPEXReactor(addrBook)
66         manager.sw.AddReactor("PEX", pexReactor)
67
68         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
69         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
70         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
71         manager.sw.AddReactor("PROTOCOL", protocolReactor)
72
73         // Create & add listener
74         var listenerStatus bool
75         var l p2p.Listener
76         if !config.VaultMode {
77                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
78                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
79                 manager.sw.AddListener(l)
80         }
81         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
82         manager.sw.SetNodePrivKey(manager.privKey)
83
84         return manager, nil
85 }
86
87 // Defaults to tcp
88 func protocolAndAddress(listenAddr string) (string, string) {
89         p, address := "tcp", listenAddr
90         parts := strings.SplitN(address, "://", 2)
91         if len(parts) == 2 {
92                 p, address = parts[0], parts[1]
93         }
94         return p, address
95 }
96
97 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
98         nodeInfo := &p2p.NodeInfo{
99                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
100                 Moniker: sm.config.Moniker,
101                 Network: sm.config.ChainID,
102                 Version: version.Version,
103                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
104         }
105
106         if !sm.sw.IsListening() {
107                 return nodeInfo
108         }
109
110         p2pListener := sm.sw.Listeners()[0]
111
112         // We assume that the rpcListener has the same ExternalAddress.
113         // This is probably true because both P2P and RPC listeners use UPnP,
114         // except of course if the rpc is only bound to localhost
115         if listenerStatus {
116                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
117         } else {
118                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
119         }
120         return nodeInfo
121 }
122
123 func (sm *SyncManager) netStart() error {
124         _, err := sm.sw.Start()
125         return err
126 }
127
128 //Start start sync manager service
129 func (sm *SyncManager) Start() {
130         go sm.netStart()
131         // broadcast transactions
132         go sm.txBroadcastLoop()
133
134         // broadcast mined blocks
135         go sm.minedBroadcastLoop()
136
137         // start sync handlers
138         go sm.syncer()
139
140         go sm.txsyncLoop()
141 }
142
143 //Stop stop sync manager
144 func (sm *SyncManager) Stop() {
145         close(sm.quitSync)
146         sm.sw.Stop()
147 }
148
149 func (sm *SyncManager) txBroadcastLoop() {
150         for {
151                 select {
152                 case newTx := <-sm.newTxCh:
153                         peers, err := sm.peers.BroadcastTx(newTx)
154                         if err != nil {
155                                 log.Errorf("Broadcast new tx error. %v", err)
156                                 return
157                         }
158                         for _, smPeer := range peers {
159                                 if smPeer == nil {
160                                         continue
161                                 }
162                                 swPeer := smPeer.getPeer()
163                                 log.Info("Tx broadcast error. Stop Peer.")
164                                 sm.sw.StopPeerGracefully(swPeer)
165                         }
166                 case <-sm.quitSync:
167                         return
168                 }
169         }
170 }
171
172 func (sm *SyncManager) minedBroadcastLoop() {
173         for {
174                 select {
175                 case blockHash := <-sm.newBlockCh:
176                         block, err := sm.chain.GetBlockByHash(blockHash)
177                         if err != nil {
178                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
179                                 return
180                         }
181                         peers, err := sm.peers.BroadcastMinedBlock(block)
182                         if err != nil {
183                                 log.Errorf("Broadcast mine block error. %v", err)
184                                 return
185                         }
186                         for _, smPeer := range peers {
187                                 if smPeer == nil {
188                                         continue
189                                 }
190                                 swPeer := smPeer.getPeer()
191                                 log.Info("New mined block broadcast error. Stop Peer.")
192                                 sm.sw.StopPeerGracefully(swPeer)
193                         }
194                 case <-sm.quitSync:
195                         return
196                 }
197         }
198 }
199
200 //NodeInfo get P2P peer node info
201 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
202         return sm.sw.NodeInfo()
203 }
204
205 //BlockKeeper get block keeper
206 func (sm *SyncManager) BlockKeeper() *blockKeeper {
207         return sm.blockKeeper
208 }
209
210 //Peers get sync manager peer set
211 func (sm *SyncManager) Peers() *peerSet {
212         return sm.peers
213 }
214
215 //Switch get sync manager switch
216 func (sm *SyncManager) Switch() *p2p.Switch {
217         return sm.sw
218 }
219
220 // GetNewTxCh return a unconfirmed transaction feed channel
221 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
222         return sm.newTxCh
223 }