OSDN Git Service

Merge pull request #970 from Bytom/p2p_test
[bytom/bytom-spv.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "strings"
5
6         log "github.com/sirupsen/logrus"
7         "github.com/tendermint/go-crypto"
8         "github.com/tendermint/go-wire"
9         cmn "github.com/tendermint/tmlibs/common"
10         dbm "github.com/tendermint/tmlibs/db"
11
12         cfg "github.com/bytom/config"
13         "github.com/bytom/p2p"
14         core "github.com/bytom/protocol"
15         "github.com/bytom/protocol/bc"
16         "github.com/bytom/version"
17 )
18
19 //SyncManager Sync Manager is responsible for the business layer information synchronization
20 type SyncManager struct {
21         networkID uint64
22         sw        *p2p.Switch
23
24         privKey     crypto.PrivKeyEd25519 // local node's p2p key
25         chain       *core.Chain
26         txPool      *core.TxPool
27         fetcher     *Fetcher
28         blockKeeper *blockKeeper
29         peers       *peerSet
30
31         newBlockCh    chan *bc.Hash
32         newPeerCh     chan struct{}
33         txSyncCh      chan *txsync
34         dropPeerCh    chan *string
35         quitSync      chan struct{}
36         config        *cfg.Config
37         synchronising int32
38 }
39
40 //NewSyncManager create a sync manager
41 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
42         // Create the protocol manager with the base fields
43         manager := &SyncManager{
44                 txPool:     txPool,
45                 chain:      chain,
46                 privKey:    crypto.GenPrivKeyEd25519(),
47                 config:     config,
48                 quitSync:   make(chan struct{}),
49                 newBlockCh: newBlockCh,
50                 newPeerCh:  make(chan struct{}),
51                 txSyncCh:   make(chan *txsync),
52                 dropPeerCh: make(chan *string, maxQuitReq),
53                 peers:      newPeerSet(),
54         }
55
56         trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
57         manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
58
59         manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
60         manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
61
62         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
63         manager.sw.AddReactor("PROTOCOL", protocolReactor)
64
65         // Create & add listener
66         var listenerStatus bool
67         var l p2p.Listener
68         if !config.VaultMode {
69                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
70                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
71                 manager.sw.AddListener(l)
72         }
73         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
74         manager.sw.SetNodePrivKey(manager.privKey)
75
76         return manager, nil
77 }
78
79 // Defaults to tcp
80 func protocolAndAddress(listenAddr string) (string, string) {
81         p, address := "tcp", listenAddr
82         parts := strings.SplitN(address, "://", 2)
83         if len(parts) == 2 {
84                 p, address = parts[0], parts[1]
85         }
86         return p, address
87 }
88
89 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
90         nodeInfo := &p2p.NodeInfo{
91                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
92                 Moniker: sm.config.Moniker,
93                 Network: sm.config.ChainID,
94                 Version: version.Version,
95                 Other: []string{
96                         cmn.Fmt("wire_version=%v", wire.Version),
97                         cmn.Fmt("p2p_version=%v", p2p.Version),
98                 },
99         }
100
101         if !sm.sw.IsListening() {
102                 return nodeInfo
103         }
104
105         p2pListener := sm.sw.Listeners()[0]
106
107         // We assume that the rpcListener has the same ExternalAddress.
108         // This is probably true because both P2P and RPC listeners use UPnP,
109         // except of course if the rpc is only bound to localhost
110         if listenerStatus {
111                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
112         } else {
113                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
114         }
115         return nodeInfo
116 }
117
118 func (sm *SyncManager) netStart() error {
119         // Start the switch
120         _, err := sm.sw.Start()
121         if err != nil {
122                 return err
123         }
124
125         // If seeds exist, add them to the address book and dial out
126         if sm.config.P2P.Seeds != "" {
127                 // dial out
128                 seeds := strings.Split(sm.config.P2P.Seeds, ",")
129                 if err := sm.DialSeeds(seeds); err != nil {
130                         return err
131                 }
132         }
133         return nil
134 }
135
136 //Start start sync manager service
137 func (sm *SyncManager) Start() {
138         go sm.netStart()
139         // broadcast transactions
140         go sm.txBroadcastLoop()
141
142         // broadcast mined blocks
143         go sm.minedBroadcastLoop()
144
145         // start sync handlers
146         go sm.syncer()
147
148         go sm.txsyncLoop()
149 }
150
151 //Stop stop sync manager
152 func (sm *SyncManager) Stop() {
153         close(sm.quitSync)
154         sm.sw.Stop()
155 }
156
157 func (sm *SyncManager) txBroadcastLoop() {
158         newTxCh := sm.txPool.GetNewTxCh()
159         for {
160                 select {
161                 case newTx := <-newTxCh:
162                         peers, err := sm.peers.BroadcastTx(newTx)
163                         if err != nil {
164                                 log.Errorf("Broadcast new tx error. %v", err)
165                                 return
166                         }
167                         for _, smPeer := range peers {
168                                 if smPeer == nil {
169                                         continue
170                                 }
171                                 swPeer := smPeer.getPeer()
172                                 log.Info("Tx broadcast error. Stop Peer.")
173                                 sm.sw.StopPeerGracefully(swPeer)
174                         }
175                 case <-sm.quitSync:
176                         return
177                 }
178         }
179 }
180
181 func (sm *SyncManager) minedBroadcastLoop() {
182         for {
183                 select {
184                 case blockHash := <-sm.newBlockCh:
185                         block, err := sm.chain.GetBlockByHash(blockHash)
186                         if err != nil {
187                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
188                                 return
189                         }
190                         peers, err := sm.peers.BroadcastMinedBlock(block)
191                         if err != nil {
192                                 log.Errorf("Broadcast mine block error. %v", err)
193                                 return
194                         }
195                         for _, smPeer := range peers {
196                                 if smPeer == nil {
197                                         continue
198                                 }
199                                 swPeer := smPeer.getPeer()
200                                 log.Info("New mined block broadcast error. Stop Peer.")
201                                 sm.sw.StopPeerGracefully(swPeer)
202                         }
203                 case <-sm.quitSync:
204                         return
205                 }
206         }
207 }
208
209 //NodeInfo get P2P peer node info
210 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
211         return sm.sw.NodeInfo()
212 }
213
214 //BlockKeeper get block keeper
215 func (sm *SyncManager) BlockKeeper() *blockKeeper {
216         return sm.blockKeeper
217 }
218
219 //Peers get sync manager peer set
220 func (sm *SyncManager) Peers() *peerSet {
221         return sm.peers
222 }
223
224 //DialSeeds dial seed peers
225 func (sm *SyncManager) DialSeeds(seeds []string) error {
226         return sm.sw.DialSeeds(seeds)
227 }
228
229 //Switch get sync manager switch
230 func (sm *SyncManager) Switch() *p2p.Switch {
231         return sm.sw
232 }