10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
14 cfg "github.com/bytom/config"
15 "github.com/bytom/consensus"
16 "github.com/bytom/p2p"
17 "github.com/bytom/p2p/discover"
18 "github.com/bytom/p2p/pex"
19 core "github.com/bytom/protocol"
20 "github.com/bytom/protocol/bc"
21 "github.com/bytom/protocol/bc/types"
22 "github.com/bytom/version"
25 //SyncManager Sync Manager is responsible for the business layer information synchronization
26 type SyncManager struct {
30 privKey crypto.PrivKeyEd25519 // local node's p2p key
34 blockKeeper *blockKeeper
37 newTxCh chan *types.Tx
38 newBlockCh chan *bc.Hash
39 newPeerCh chan struct{}
41 dropPeerCh chan *string
42 quitSync chan struct{}
47 //NewSyncManager create a sync manager
48 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
49 sw := p2p.NewSwitch(config)
51 dropPeerCh := make(chan *string, maxQuitReq)
52 manager := &SyncManager{
56 privKey: crypto.GenPrivKeyEd25519(),
57 fetcher: NewFetcher(chain, sw, peers),
58 blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
60 newTxCh: make(chan *types.Tx, maxTxChanSize),
61 newBlockCh: newBlockCh,
62 newPeerCh: make(chan struct{}),
63 txSyncCh: make(chan *txsync),
64 dropPeerCh: dropPeerCh,
65 quitSync: make(chan struct{}),
69 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
70 manager.sw.AddReactor("PROTOCOL", protocolReactor)
72 // Create & add listener
73 var listenerStatus bool
75 if !config.VaultMode {
76 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
77 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
78 manager.sw.AddListener(l)
80 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
85 pexReactor := pex.NewPEXReactor(discv)
86 manager.sw.AddReactor("PEX", pexReactor)
88 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
89 manager.sw.SetNodePrivKey(manager.privKey)
94 func protocolAndAddress(listenAddr string) (string, string) {
95 p, address := "tcp", listenAddr
96 parts := strings.SplitN(address, "://", 2)
98 p, address = parts[0], parts[1]
103 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
104 nodeInfo := &p2p.NodeInfo{
105 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
106 Moniker: sm.config.Moniker,
107 Network: sm.config.ChainID,
108 Version: version.Version,
109 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
112 if !sm.sw.IsListening() {
116 p2pListener := sm.sw.Listeners()[0]
118 // We assume that the rpcListener has the same ExternalAddress.
119 // This is probably true because both P2P and RPC listeners use UPnP,
120 // except of course if the rpc is only bound to localhost
122 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
124 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
129 //Start start sync manager service
130 func (sm *SyncManager) Start() {
131 if _, err := sm.sw.Start(); err != nil {
132 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
134 // broadcast transactions
135 go sm.txBroadcastLoop()
137 // broadcast mined blocks
138 go sm.minedBroadcastLoop()
140 // start sync handlers
146 //Stop stop sync manager
147 func (sm *SyncManager) Stop() {
152 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
153 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
158 conn, err := net.ListenUDP("udp", addr)
163 realaddr := conn.LocalAddr().(*net.UDPAddr)
164 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
169 // add the seeds node to the discover table
170 if config.P2P.Seeds == "" {
173 nodes := []*discover.Node{}
174 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
175 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
176 nodes = append(nodes, discover.MustParseNode(url))
178 if err = ntab.SetFallbackNodes(nodes); err != nil {
184 func (sm *SyncManager) txBroadcastLoop() {
187 case newTx := <-sm.newTxCh:
188 peers, err := sm.peers.BroadcastTx(newTx)
190 log.Errorf("Broadcast new tx error. %v", err)
193 for _, smPeer := range peers {
197 swPeer := smPeer.getPeer()
198 log.Info("Tx broadcast error. Stop Peer.")
199 sm.sw.StopPeerGracefully(swPeer)
207 func (sm *SyncManager) minedBroadcastLoop() {
210 case blockHash := <-sm.newBlockCh:
211 block, err := sm.chain.GetBlockByHash(blockHash)
213 log.Errorf("Failed on mined broadcast loop get block %v", err)
216 peers, err := sm.peers.BroadcastMinedBlock(block)
218 log.Errorf("Broadcast mine block error. %v", err)
221 for _, smPeer := range peers {
225 swPeer := smPeer.getPeer()
226 log.Info("New mined block broadcast error. Stop Peer.")
227 sm.sw.StopPeerGracefully(swPeer)
235 //NodeInfo get P2P peer node info
236 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
237 return sm.sw.NodeInfo()
240 //BlockKeeper get block keeper
241 func (sm *SyncManager) BlockKeeper() *blockKeeper {
242 return sm.blockKeeper
245 //Peers get sync manager peer set
246 func (sm *SyncManager) Peers() *peerSet {
250 //Switch get sync manager switch
251 func (sm *SyncManager) Switch() *p2p.Switch {
255 // GetNewTxCh return a unconfirmed transaction feed channel
256 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {