6 log "github.com/sirupsen/logrus"
7 "github.com/tendermint/go-crypto"
8 "github.com/tendermint/go-wire"
9 cmn "github.com/tendermint/tmlibs/common"
10 dbm "github.com/tendermint/tmlibs/db"
15 cfg "github.com/bytom/config"
16 "github.com/bytom/p2p"
17 core "github.com/bytom/protocol"
18 "github.com/bytom/protocol/bc"
19 "github.com/bytom/version"
22 //SyncManager Sync Manager is responsible for the business layer information synchronization
23 type SyncManager struct {
26 addrBook *p2p.AddrBook // known peers
28 privKey crypto.PrivKeyEd25519 // local node's p2p key
32 blockKeeper *blockKeeper
36 newBlockCh chan *bc.Hash
37 newPeerCh chan struct{}
39 dropPeerCh chan *string
40 quitSync chan struct{}
45 //NewSyncManager create a sync manager
46 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
47 // Create the protocol manager with the base fields
48 manager := &SyncManager{
51 privKey: crypto.GenPrivKeyEd25519(),
53 quitSync: make(chan struct{}),
54 newBlockCh: newBlockCh,
55 newPeerCh: make(chan struct{}),
56 txSyncCh: make(chan *txsync),
57 dropPeerCh: make(chan *string, maxQuitReq),
61 trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
62 manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
64 manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
65 manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
67 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
68 manager.sw.AddReactor("PROTOCOL", protocolReactor)
70 // Create & add listener
73 if !config.VaultMode {
74 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
75 l, mapResult = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
76 manager.sw.AddListener(l)
78 manager.sw.SetNodeInfo(manager.makeNodeInfo(mapResult))
79 manager.sw.SetNodePrivKey(manager.privKey)
80 manager.mapResult = mapResult
81 // Optionally, start the pex reactor
82 //var addrBook *p2p.AddrBook
83 if config.P2P.PexReactor {
84 manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
85 pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
86 manager.sw.AddReactor("PEX", pexReactor)
93 func protocolAndAddress(listenAddr string) (string, string) {
94 p, address := "tcp", listenAddr
95 parts := strings.SplitN(address, "://", 2)
97 p, address = parts[0], parts[1]
102 func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
103 nodeInfo := &p2p.NodeInfo{
104 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
105 Moniker: sm.config.Moniker,
106 Network: sm.config.ChainID,
107 Version: version.Version,
109 cmn.Fmt("wire_version=%v", wire.Version),
110 cmn.Fmt("p2p_version=%v", p2p.Version),
114 if !sm.sw.IsListening() {
118 p2pListener := sm.sw.Listeners()[0]
120 // We assume that the rpcListener has the same ExternalAddress.
121 // This is probably true because both P2P and RPC listeners use UPnP,
122 // except of course if the rpc is only bound to localhost
124 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
126 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
131 func (sm *SyncManager) netStart() error {
133 _, err := sm.sw.Start()
139 conn, err := net.DialTimeout("tcp", sm.NodeInfo().ListenAddr, 3*time.Second)
141 if err != nil && conn == nil {
142 log.Error("Could not open listen port")
145 if err == nil && conn != nil {
146 log.Info("Success open listen port")
148 sm.sw.SetNodeInfo(sm.makeNodeInfo(true))
152 // If seeds exist, add them to the address book and dial out
153 if sm.config.P2P.Seeds != "" {
155 seeds := strings.Split(sm.config.P2P.Seeds, ",")
156 if err := sm.DialSeeds(seeds); err != nil {
160 log.WithField("nodeInfo", sm.sw.NodeInfo()).Info("net start")
164 //Start start sync manager service
165 func (sm *SyncManager) Start() {
167 // broadcast transactions
168 go sm.txBroadcastLoop()
170 // broadcast mined blocks
171 go sm.minedBroadcastLoop()
173 // start sync handlers
179 //Stop stop sync manager
180 func (sm *SyncManager) Stop() {
185 func (sm *SyncManager) txBroadcastLoop() {
186 newTxCh := sm.txPool.GetNewTxCh()
189 case newTx := <-newTxCh:
190 peers, err := sm.peers.BroadcastTx(newTx)
192 log.Errorf("Broadcast new tx error. %v", err)
195 for _, smPeer := range peers {
199 swPeer := smPeer.getPeer()
200 log.Info("Tx broadcast error. Stop Peer.")
201 sm.sw.StopPeerGracefully(swPeer)
209 func (sm *SyncManager) minedBroadcastLoop() {
212 case blockHash := <-sm.newBlockCh:
213 block, err := sm.chain.GetBlockByHash(blockHash)
215 log.Errorf("Failed on mined broadcast loop get block %v", err)
218 peers, err := sm.peers.BroadcastMinedBlock(block)
220 log.Errorf("Broadcast mine block error. %v", err)
223 for _, smPeer := range peers {
227 swPeer := smPeer.getPeer()
228 log.Info("New mined block broadcast error. Stop Peer.")
229 sm.sw.StopPeerGracefully(swPeer)
237 //NodeInfo get P2P peer node info
238 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
239 return sm.sw.NodeInfo()
242 //BlockKeeper get block keeper
243 func (sm *SyncManager) BlockKeeper() *blockKeeper {
244 return sm.blockKeeper
247 //Peers get sync manager peer set
248 func (sm *SyncManager) Peers() *peerSet {
252 //DialSeeds dial seed peers
253 func (sm *SyncManager) DialSeeds(seeds []string) error {
254 return sm.sw.DialSeeds(sm.addrBook, seeds)
257 //Switch get sync manager switch
258 func (sm *SyncManager) Switch() *p2p.Switch {