OSDN Git Service

Merge pull request #1064 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
1 package netsync
2
3 import (
4         "encoding/hex"
5         "net"
6         "path"
7         "strconv"
8         "strings"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13
14         cfg "github.com/bytom/config"
15         "github.com/bytom/consensus"
16         "github.com/bytom/p2p"
17         "github.com/bytom/p2p/discover"
18         "github.com/bytom/p2p/pex"
19         core "github.com/bytom/protocol"
20         "github.com/bytom/protocol/bc"
21         "github.com/bytom/protocol/bc/types"
22         "github.com/bytom/version"
23 )
24
25 //SyncManager Sync Manager is responsible for the business layer information synchronization
26 type SyncManager struct {
27         networkID uint64
28         sw        *p2p.Switch
29
30         privKey     crypto.PrivKeyEd25519 // local node's p2p key
31         chain       *core.Chain
32         txPool      *core.TxPool
33         fetcher     *Fetcher
34         blockKeeper *blockKeeper
35         peers       *peerSet
36
37         newTxCh       chan *types.Tx
38         newBlockCh    chan *bc.Hash
39         newPeerCh     chan struct{}
40         txSyncCh      chan *txsync
41         dropPeerCh    chan *string
42         quitSync      chan struct{}
43         config        *cfg.Config
44         synchronising int32
45 }
46
47 //NewSyncManager create a sync manager
48 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
49         sw := p2p.NewSwitch(config)
50         peers := newPeerSet()
51         dropPeerCh := make(chan *string, maxQuitReq)
52         manager := &SyncManager{
53                 sw:          sw,
54                 txPool:      txPool,
55                 chain:       chain,
56                 privKey:     crypto.GenPrivKeyEd25519(),
57                 fetcher:     NewFetcher(chain, sw, peers),
58                 blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
59                 peers:       peers,
60                 newTxCh:     make(chan *types.Tx, maxTxChanSize),
61                 newBlockCh:  newBlockCh,
62                 newPeerCh:   make(chan struct{}),
63                 txSyncCh:    make(chan *txsync),
64                 dropPeerCh:  dropPeerCh,
65                 quitSync:    make(chan struct{}),
66                 config:      config,
67         }
68
69         protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
70         manager.sw.AddReactor("PROTOCOL", protocolReactor)
71
72         // Create & add listener
73         var listenerStatus bool
74         var l p2p.Listener
75         if !config.VaultMode {
76                 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
77                 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
78                 manager.sw.AddListener(l)
79
80                 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
81                 if err != nil {
82                         return nil, err
83                 }
84
85                 pexReactor := pex.NewPEXReactor(discv)
86                 manager.sw.AddReactor("PEX", pexReactor)
87         }
88         manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
89         manager.sw.SetNodePrivKey(manager.privKey)
90         return manager, nil
91 }
92
93 // Defaults to tcp
94 func protocolAndAddress(listenAddr string) (string, string) {
95         p, address := "tcp", listenAddr
96         parts := strings.SplitN(address, "://", 2)
97         if len(parts) == 2 {
98                 p, address = parts[0], parts[1]
99         }
100         return p, address
101 }
102
103 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
104         nodeInfo := &p2p.NodeInfo{
105                 PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
106                 Moniker: sm.config.Moniker,
107                 Network: sm.config.ChainID,
108                 Version: version.Version,
109                 Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
110         }
111
112         if !sm.sw.IsListening() {
113                 return nodeInfo
114         }
115
116         p2pListener := sm.sw.Listeners()[0]
117
118         // We assume that the rpcListener has the same ExternalAddress.
119         // This is probably true because both P2P and RPC listeners use UPnP,
120         // except of course if the rpc is only bound to localhost
121         if listenerStatus {
122                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
123         } else {
124                 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
125         }
126         return nodeInfo
127 }
128
129 //Start start sync manager service
130 func (sm *SyncManager) Start() {
131         if _, err := sm.sw.Start(); err != nil {
132                 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
133         }
134         // broadcast transactions
135         go sm.txBroadcastLoop()
136
137         // broadcast mined blocks
138         go sm.minedBroadcastLoop()
139
140         // start sync handlers
141         go sm.syncer()
142
143         go sm.txsyncLoop()
144 }
145
146 //Stop stop sync manager
147 func (sm *SyncManager) Stop() {
148         close(sm.quitSync)
149         sm.sw.Stop()
150 }
151
152 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
153         addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
154         if err != nil {
155                 return nil, err
156         }
157
158         conn, err := net.ListenUDP("udp", addr)
159         if err != nil {
160                 return nil, err
161         }
162
163         realaddr := conn.LocalAddr().(*net.UDPAddr)
164         ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
165         if err != nil {
166                 return nil, err
167         }
168
169         // add the seeds node to the discover table
170         if config.P2P.Seeds == "" {
171                 return ntab, nil
172         }
173         nodes := []*discover.Node{}
174         for _, seed := range strings.Split(config.P2P.Seeds, ",") {
175                 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
176                 nodes = append(nodes, discover.MustParseNode(url))
177         }
178         if err = ntab.SetFallbackNodes(nodes); err != nil {
179                 return nil, err
180         }
181         return ntab, nil
182 }
183
184 func (sm *SyncManager) txBroadcastLoop() {
185         for {
186                 select {
187                 case newTx := <-sm.newTxCh:
188                         peers, err := sm.peers.BroadcastTx(newTx)
189                         if err != nil {
190                                 log.Errorf("Broadcast new tx error. %v", err)
191                                 return
192                         }
193                         for _, smPeer := range peers {
194                                 if smPeer == nil {
195                                         continue
196                                 }
197                                 swPeer := smPeer.getPeer()
198                                 log.Info("Tx broadcast error. Stop Peer.")
199                                 sm.sw.StopPeerGracefully(swPeer)
200                         }
201                 case <-sm.quitSync:
202                         return
203                 }
204         }
205 }
206
207 func (sm *SyncManager) minedBroadcastLoop() {
208         for {
209                 select {
210                 case blockHash := <-sm.newBlockCh:
211                         block, err := sm.chain.GetBlockByHash(blockHash)
212                         if err != nil {
213                                 log.Errorf("Failed on mined broadcast loop get block %v", err)
214                                 return
215                         }
216                         peers, err := sm.peers.BroadcastMinedBlock(block)
217                         if err != nil {
218                                 log.Errorf("Broadcast mine block error. %v", err)
219                                 return
220                         }
221                         for _, smPeer := range peers {
222                                 if smPeer == nil {
223                                         continue
224                                 }
225                                 swPeer := smPeer.getPeer()
226                                 log.Info("New mined block broadcast error. Stop Peer.")
227                                 sm.sw.StopPeerGracefully(swPeer)
228                         }
229                 case <-sm.quitSync:
230                         return
231                 }
232         }
233 }
234
235 //NodeInfo get P2P peer node info
236 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
237         return sm.sw.NodeInfo()
238 }
239
240 //BlockKeeper get block keeper
241 func (sm *SyncManager) BlockKeeper() *blockKeeper {
242         return sm.blockKeeper
243 }
244
245 //Peers get sync manager peer set
246 func (sm *SyncManager) Peers() *peerSet {
247         return sm.peers
248 }
249
250 //Switch get sync manager switch
251 func (sm *SyncManager) Switch() *p2p.Switch {
252         return sm.sw
253 }
254
255 // GetNewTxCh return a unconfirmed transaction feed channel
256 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
257         return sm.newTxCh
258 }