6 log "github.com/sirupsen/logrus"
7 "github.com/tendermint/go-crypto"
8 "github.com/tendermint/go-wire"
9 cmn "github.com/tendermint/tmlibs/common"
10 dbm "github.com/tendermint/tmlibs/db"
12 cfg "github.com/bytom/config"
13 "github.com/bytom/p2p"
14 core "github.com/bytom/protocol"
15 "github.com/bytom/protocol/bc"
16 "github.com/bytom/version"
19 //SyncManager Sync Manager is responsible for the business layer information synchronization
20 type SyncManager struct {
23 addrBook *p2p.AddrBook // known peers
25 privKey crypto.PrivKeyEd25519 // local node's p2p key
29 blockKeeper *blockKeeper
32 newBlockCh chan *bc.Hash
33 newPeerCh chan struct{}
35 dropPeerCh chan *string
36 quitSync chan struct{}
41 //NewSyncManager create a sync manager
42 func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
43 // Create the protocol manager with the base fields
44 manager := &SyncManager{
47 privKey: crypto.GenPrivKeyEd25519(),
49 quitSync: make(chan struct{}),
50 newBlockCh: newBlockCh,
51 newPeerCh: make(chan struct{}),
52 txSyncCh: make(chan *txsync),
53 dropPeerCh: make(chan *string, maxQuitReq),
57 trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
58 manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
60 manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
61 manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
63 protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
64 manager.sw.AddReactor("PROTOCOL", protocolReactor)
66 // Create & add listener
67 p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
68 l := p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
69 manager.sw.AddListener(l)
70 manager.sw.SetNodeInfo(manager.makeNodeInfo())
71 manager.sw.SetNodePrivKey(manager.privKey)
73 // Optionally, start the pex reactor
74 //var addrBook *p2p.AddrBook
75 if config.P2P.PexReactor {
76 manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
77 pexReactor := p2p.NewPEXReactor(manager.addrBook, manager.sw)
78 manager.sw.AddReactor("PEX", pexReactor)
85 func protocolAndAddress(listenAddr string) (string, string) {
86 p, address := "tcp", listenAddr
87 parts := strings.SplitN(address, "://", 2)
89 p, address = parts[0], parts[1]
94 func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
95 nodeInfo := &p2p.NodeInfo{
96 PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
97 Moniker: sm.config.Moniker,
98 Network: sm.config.ChainID,
99 Version: version.Version,
101 cmn.Fmt("wire_version=%v", wire.Version),
102 cmn.Fmt("p2p_version=%v", p2p.Version),
106 if !sm.sw.IsListening() {
110 p2pListener := sm.sw.Listeners()[0]
111 p2pHost := p2pListener.ExternalAddress().IP.String()
112 p2pPort := p2pListener.ExternalAddress().Port
114 // We assume that the rpcListener has the same ExternalAddress.
115 // This is probably true because both P2P and RPC listeners use UPnP,
116 // except of course if the rpc is only bound to localhost
117 nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
121 func (sm *SyncManager) netStart() error {
123 _, err := sm.sw.Start()
128 // If seeds exist, add them to the address book and dial out
129 if sm.config.P2P.Seeds != "" {
131 seeds := strings.Split(sm.config.P2P.Seeds, ",")
132 if err := sm.DialSeeds(seeds); err != nil {
140 //Start start sync manager service
141 func (sm *SyncManager) Start() {
143 // broadcast transactions
144 go sm.txBroadcastLoop()
146 // broadcast mined blocks
147 go sm.minedBroadcastLoop()
149 // start sync handlers
155 //Stop stop sync manager
156 func (sm *SyncManager) Stop() {
161 func (sm *SyncManager) txBroadcastLoop() {
162 newTxCh := sm.txPool.GetNewTxCh()
165 case newTx := <-newTxCh:
166 peers, err := sm.peers.BroadcastTx(newTx)
168 log.Errorf("Broadcast new tx error. %v", err)
171 for _, smPeer := range peers {
175 swPeer := smPeer.getPeer()
176 if ban := smPeer.addBanScore(0, 50, "Broadcast new tx error"); ban {
177 sm.sw.AddBannedPeer(swPeer)
178 sm.sw.StopPeerGracefully(swPeer)
187 func (sm *SyncManager) minedBroadcastLoop() {
190 case blockHash := <-sm.newBlockCh:
191 block, err := sm.chain.GetBlockByHash(blockHash)
193 log.Errorf("Failed on mined broadcast loop get block %v", err)
196 peers, err := sm.peers.BroadcastMinedBlock(block)
198 log.Errorf("Broadcast mine block error. %v", err)
201 for _, smPeer := range peers {
205 swPeer := smPeer.getPeer()
206 if ban := smPeer.addBanScore(0, 50, "Broadcast block error"); ban {
207 sm.sw.AddBannedPeer(swPeer)
208 sm.sw.StopPeerGracefully(swPeer)
217 //NodeInfo get P2P peer node info
218 func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
219 return sm.sw.NodeInfo()
222 //BlockKeeper get block keeper
223 func (sm *SyncManager) BlockKeeper() *blockKeeper {
224 return sm.blockKeeper
227 //Peers get sync manager peer set
228 func (sm *SyncManager) Peers() *peerSet {
232 //DialSeeds dial seed peers
233 func (sm *SyncManager) DialSeeds(seeds []string) error {
234 return sm.sw.DialSeeds(sm.addrBook, seeds)
237 //Switch get sync manager switch
238 func (sm *SyncManager) Switch() *p2p.Switch {