6 log "github.com/sirupsen/logrus"
7 "github.com/tendermint/go-crypto"
8 "github.com/tendermint/go-wire"
9 cmn "github.com/tendermint/tmlibs/common"
10 dbm "github.com/tendermint/tmlibs/db"
12 cfg "github.com/bytom/config"
13 "github.com/bytom/p2p"
14 core "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/version"
19 //SyncManager Sync Manager is responsible for the business layer information synchronization
20 type SyncManager struct {
24 privKey crypto.PrivKeyEd25519 // local node's p2p key
28 blockKeeper *blockKeeper
31 newBlockCh chan *bc.Hash
32 newPeerCh chan struct{}
34 dropPeerCh chan *string
35 quitSync chan struct{}
40 //NewSyncManager create a sync manager
41 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
42 // Create the protocol manager with the base fields
43 manager := &SyncManager{
46 privKey: crypto.GenPrivKeyEd25519(),
48 quitSync: make(chan struct{}),
49 newBlockCh: newBlockCh,
50 newPeerCh: make(chan struct{}),
51 txSyncCh: make(chan *txsync),
52 dropPeerCh: make(chan *string, maxQuitReq),
56 trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
57 manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
59 manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
60 manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
62 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
63 manager.sw.AddReactor("PROTOCOL", protocolReactor)
65 // Create & add listener
66 var listenerStatus bool
68 if !config.VaultMode {
69 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
70 l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
71 manager.sw.AddListener(l)
73 manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
74 manager.sw.SetNodePrivKey(manager.privKey)
80 func protocolAndAddress(listenAddr string) (string, string) {
81 p, address := "tcp", listenAddr
82 parts := strings.SplitN(address, "://", 2)
84 p, address = parts[0], parts[1]
89 func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
90 nodeInfo := &p2p.NodeInfo{
91 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
92 Moniker: sm.config.Moniker,
93 Network: sm.config.ChainID,
94 Version: version.Version,
96 cmn.Fmt("wire_version=%v", wire.Version),
97 cmn.Fmt("p2p_version=%v", p2p.Version),
101 if !sm.sw.IsListening() {
105 p2pListener := sm.sw.Listeners()[0]
107 // We assume that the rpcListener has the same ExternalAddress.
108 // This is probably true because both P2P and RPC listeners use UPnP,
109 // except of course if the rpc is only bound to localhost
111 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
113 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
118 func (sm *SyncManager) netStart() error {
120 _, err := sm.sw.Start()
125 // If seeds exist, add them to the address book and dial out
126 if sm.config.P2P.Seeds != "" {
128 seeds := strings.Split(sm.config.P2P.Seeds, ",")
129 if err := sm.DialSeeds(seeds); err != nil {
136 //Start start sync manager service
137 func (sm *SyncManager) Start() {
139 // broadcast transactions
140 go sm.txBroadcastLoop()
142 // broadcast mined blocks
143 go sm.minedBroadcastLoop()
145 // start sync handlers
151 //Stop stop sync manager
152 func (sm *SyncManager) Stop() {
157 func (sm *SyncManager) txBroadcastLoop() {
158 newTxCh := sm.txPool.GetNewTxCh()
161 case newTx := <-newTxCh:
162 peers, err := sm.peers.BroadcastTx(newTx)
164 log.Errorf("Broadcast new tx error. %v", err)
167 for _, smPeer := range peers {
171 swPeer := smPeer.getPeer()
172 log.Info("Tx broadcast error. Stop Peer.")
173 sm.sw.StopPeerGracefully(swPeer)
181 func (sm *SyncManager) minedBroadcastLoop() {
184 case blockHash := <-sm.newBlockCh:
185 block, err := sm.chain.GetBlockByHash(blockHash)
187 log.Errorf("Failed on mined broadcast loop get block %v", err)
190 peers, err := sm.peers.BroadcastMinedBlock(block)
192 log.Errorf("Broadcast mine block error. %v", err)
195 for _, smPeer := range peers {
199 swPeer := smPeer.getPeer()
200 log.Info("New mined block broadcast error. Stop Peer.")
201 sm.sw.StopPeerGracefully(swPeer)
209 //NodeInfo get P2P peer node info
210 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
211 return sm.sw.NodeInfo()
214 //BlockKeeper get block keeper
215 func (sm *SyncManager) BlockKeeper() *blockKeeper {
216 return sm.blockKeeper
219 //Peers get sync manager peer set
220 func (sm *SyncManager) Peers() *peerSet {
224 //DialSeeds dial seed peers
225 func (sm *SyncManager) DialSeeds(seeds []string) error {
226 return sm.sw.DialSeeds(seeds)
229 //Switch get sync manager switch
230 func (sm *SyncManager) Switch() *p2p.Switch {