10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
14 cfg "github.com/bytom/config"
15 "github.com/bytom/consensus"
16 "github.com/bytom/p2p"
17 "github.com/bytom/p2p/discover"
18 "github.com/bytom/p2p/pex"
19 core "github.com/bytom/protocol"
20 "github.com/bytom/protocol/bc"
21 "github.com/bytom/protocol/bc/types"
22 "github.com/bytom/version"
25 //SyncManager Sync Manager is responsible for the business layer information synchronization
26 type SyncManager struct {
29 privKey crypto.PrivKeyEd25519 // local node's p2p key
33 blockKeeper *blockKeeper
36 newTxCh chan *types.Tx
37 newBlockCh chan *bc.Hash
38 newPeerCh chan struct{}
40 dropPeerCh chan *string
41 quitSync chan struct{}
46 //NewSyncManager create a sync manager
47 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
48 sw := p2p.NewSwitch(config)
50 dropPeerCh := make(chan *string, maxQuitReq)
51 manager := &SyncManager{
55 privKey: crypto.GenPrivKeyEd25519(),
56 fetcher: NewFetcher(chain, sw, peers),
57 blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
59 newTxCh: make(chan *types.Tx, maxTxChanSize),
60 newBlockCh: newBlockCh,
61 newPeerCh: make(chan struct{}),
62 txSyncCh: make(chan *txsync),
63 dropPeerCh: dropPeerCh,
64 quitSync: make(chan struct{}),
68 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
69 manager.sw.AddReactor("PROTOCOL", protocolReactor)
71 // Create & add listener
72 var listenerStatus bool
74 if !config.VaultMode {
75 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
76 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
77 manager.sw.AddListener(l)
79 discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
84 pexReactor := pex.NewPEXReactor(discv)
85 manager.sw.AddReactor("PEX", pexReactor)
87 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
88 manager.sw.SetNodePrivKey(manager.privKey)
93 func protocolAndAddress(listenAddr string) (string, string) {
94 p, address := "tcp", listenAddr
95 parts := strings.SplitN(address, "://", 2)
97 p, address = parts[0], parts[1]
102 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
103 nodeInfo := &p2p.NodeInfo{
104 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
105 Moniker: sm.config.Moniker,
106 Network: sm.config.ChainID,
107 Version: version.Version,
108 Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
111 if !sm.sw.IsListening() {
115 p2pListener := sm.sw.Listeners()[0]
117 // We assume that the rpcListener has the same ExternalAddress.
118 // This is probably true because both P2P and RPC listeners use UPnP,
119 // except of course if the rpc is only bound to localhost
121 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
123 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
128 //Start start sync manager service
129 func (sm *SyncManager) Start() {
130 if _, err := sm.sw.Start(); err != nil {
131 cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
133 // broadcast transactions
134 go sm.txBroadcastLoop()
136 // broadcast mined blocks
137 go sm.minedBroadcastLoop()
139 // start sync handlers
145 //Stop stop sync manager
146 func (sm *SyncManager) Stop() {
151 func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
152 addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
157 conn, err := net.ListenUDP("udp", addr)
162 realaddr := conn.LocalAddr().(*net.UDPAddr)
163 ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
168 // add the seeds node to the discover table
169 if config.P2P.Seeds == "" {
172 nodes := []*discover.Node{}
173 for _, seed := range strings.Split(config.P2P.Seeds, ",") {
174 url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
175 nodes = append(nodes, discover.MustParseNode(url))
177 if err = ntab.SetFallbackNodes(nodes); err != nil {
183 func (sm *SyncManager) txBroadcastLoop() {
186 case newTx := <-sm.newTxCh:
187 peers, err := sm.peers.BroadcastTx(newTx)
189 log.Errorf("Broadcast new tx error. %v", err)
192 for _, smPeer := range peers {
196 swPeer := smPeer.getPeer()
197 log.Info("Tx broadcast error. Stop Peer.")
198 sm.sw.StopPeerGracefully(swPeer)
206 func (sm *SyncManager) minedBroadcastLoop() {
209 case blockHash := <-sm.newBlockCh:
210 block, err := sm.chain.GetBlockByHash(blockHash)
212 log.Errorf("Failed on mined broadcast loop get block %v", err)
215 peers, err := sm.peers.BroadcastMinedBlock(block)
217 log.Errorf("Broadcast mine block error. %v", err)
220 for _, smPeer := range peers {
224 swPeer := smPeer.getPeer()
225 log.Info("New mined block broadcast error. Stop Peer.")
226 sm.sw.StopPeerGracefully(swPeer)
234 //NodeInfo get P2P peer node info
235 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
236 return sm.sw.NodeInfo()
239 //BlockKeeper get block keeper
240 func (sm *SyncManager) BlockKeeper() *blockKeeper {
241 return sm.blockKeeper
244 //Peers get sync manager peer set
245 func (sm *SyncManager) Peers() *peerSet {
249 //Switch get sync manager switch
250 func (sm *SyncManager) Switch() *p2p.Switch {
254 // GetNewTxCh return a unconfirmed transaction feed channel
255 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {