6 log "github.com/sirupsen/logrus"
7 "github.com/tendermint/go-crypto"
8 "github.com/tendermint/go-wire"
9 cmn "github.com/tendermint/tmlibs/common"
10 dbm "github.com/tendermint/tmlibs/db"
12 cfg "github.com/bytom/config"
13 "github.com/bytom/p2p"
14 "github.com/bytom/p2p/pex"
15 core "github.com/bytom/protocol"
16 "github.com/bytom/protocol/bc"
17 "github.com/bytom/protocol/bc/types"
18 "github.com/bytom/version"
21 //SyncManager Sync Manager is responsible for the business layer information synchronization
22 type SyncManager struct {
26 privKey crypto.PrivKeyEd25519 // local node's p2p key
30 blockKeeper *blockKeeper
33 newTxCh chan *types.Tx
34 newBlockCh chan *bc.Hash
35 newPeerCh chan struct{}
37 dropPeerCh chan *string
38 quitSync chan struct{}
43 //NewSyncManager create a sync manager
44 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
45 // Create the protocol manager with the base fields
46 manager := &SyncManager{
49 privKey: crypto.GenPrivKeyEd25519(),
51 newTxCh: make(chan *types.Tx, maxTxChanSize),
52 newBlockCh: newBlockCh,
53 newPeerCh: make(chan struct{}),
54 txSyncCh: make(chan *txsync),
55 dropPeerCh: make(chan *string, maxQuitReq),
56 quitSync: make(chan struct{}),
60 trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
61 addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
62 manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
64 pexReactor := pex.NewPEXReactor(addrBook)
65 manager.sw.AddReactor("PEX", pexReactor)
67 manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
68 manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
69 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
70 manager.sw.AddReactor("PROTOCOL", protocolReactor)
72 // Create & add listener
73 var listenerStatus bool
75 if !config.VaultMode {
76 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
77 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
78 manager.sw.AddListener(l)
80 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
81 manager.sw.SetNodePrivKey(manager.privKey)
87 func protocolAndAddress(listenAddr string) (string, string) {
88 p, address := "tcp", listenAddr
89 parts := strings.SplitN(address, "://", 2)
91 p, address = parts[0], parts[1]
96 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
97 nodeInfo := &p2p.NodeInfo{
98 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
99 Moniker: sm.config.Moniker,
100 Network: sm.config.ChainID,
101 Version: version.Version,
103 cmn.Fmt("wire_version=%v", wire.Version),
104 cmn.Fmt("p2p_version=%v", p2p.Version),
108 if !sm.sw.IsListening() {
112 p2pListener := sm.sw.Listeners()[0]
114 // We assume that the rpcListener has the same ExternalAddress.
115 // This is probably true because both P2P and RPC listeners use UPnP,
116 // except of course if the rpc is only bound to localhost
118 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
120 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
125 func (sm *SyncManager) netStart() error {
126 _, err := sm.sw.Start()
130 //Start start sync manager service
131 func (sm *SyncManager) Start() {
133 // broadcast transactions
134 go sm.txBroadcastLoop()
136 // broadcast mined blocks
137 go sm.minedBroadcastLoop()
139 // start sync handlers
145 //Stop stop sync manager
146 func (sm *SyncManager) Stop() {
151 func (sm *SyncManager) txBroadcastLoop() {
154 case newTx := <-sm.newTxCh:
155 peers, err := sm.peers.BroadcastTx(newTx)
157 log.Errorf("Broadcast new tx error. %v", err)
160 for _, smPeer := range peers {
164 swPeer := smPeer.getPeer()
165 log.Info("Tx broadcast error. Stop Peer.")
166 sm.sw.StopPeerGracefully(swPeer)
174 func (sm *SyncManager) minedBroadcastLoop() {
177 case blockHash := <-sm.newBlockCh:
178 block, err := sm.chain.GetBlockByHash(blockHash)
180 log.Errorf("Failed on mined broadcast loop get block %v", err)
183 peers, err := sm.peers.BroadcastMinedBlock(block)
185 log.Errorf("Broadcast mine block error. %v", err)
188 for _, smPeer := range peers {
192 swPeer := smPeer.getPeer()
193 log.Info("New mined block broadcast error. Stop Peer.")
194 sm.sw.StopPeerGracefully(swPeer)
202 //NodeInfo get P2P peer node info
203 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
204 return sm.sw.NodeInfo()
207 //BlockKeeper get block keeper
208 func (sm *SyncManager) BlockKeeper() *blockKeeper {
209 return sm.blockKeeper
212 //Peers get sync manager peer set
213 func (sm *SyncManager) Peers() *peerSet {
217 //Switch get sync manager switch
218 func (sm *SyncManager) Switch() *p2p.Switch {
222 // GetNewTxCh return a unconfirmed transaction feed channel
223 func (sm *SyncManager) GetNewTxCh() chan *types.Tx {