package netsync
import (
+ "encoding/hex"
+ "errors"
+ "net"
+ "path"
+ "reflect"
+ "strconv"
"strings"
+ "sync"
log "github.com/sirupsen/logrus"
"github.com/tendermint/go-crypto"
- "github.com/tendermint/go-wire"
cmn "github.com/tendermint/tmlibs/common"
- dbm "github.com/tendermint/tmlibs/db"
+ "github.com/bytom/account"
cfg "github.com/bytom/config"
+ "github.com/bytom/consensus"
"github.com/bytom/p2p"
- core "github.com/bytom/protocol"
+ "github.com/bytom/p2p/discover"
"github.com/bytom/protocol/bc"
+ "github.com/bytom/protocol/bc/types"
"github.com/bytom/version"
+ "github.com/bytom/wallet"
)
+const (
+ maxTxChanSize = 10000
+)
+
+// Chain is the interface for Bytom core
+type Chain interface {
+ BestBlockHeader() *types.BlockHeader
+ BestBlockHeight() uint64
+ CalcNextSeed(*bc.Hash) (*bc.Hash, error)
+ GetBlockByHash(*bc.Hash) (*types.Block, error)
+ GetBlockByHeight(uint64) (*types.Block, error)
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+ GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+ InMainChain(bc.Hash) bool
+ ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
+}
+
//SyncManager Sync Manager is responsible for the business layer information synchronization
type SyncManager struct {
- networkID uint64
- sw *p2p.Switch
- addrBook *p2p.AddrBook // known peers
+ sw *p2p.Switch
+ genesisHash bc.Hash
privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain *core.Chain
- txPool *core.TxPool
- fetcher *Fetcher
+ chain Chain
blockKeeper *blockKeeper
peers *peerSet
- newBlockCh chan *bc.Hash
- newPeerCh chan struct{}
- txSyncCh chan *txsync
- dropPeerCh chan *string
- quitSync chan struct{}
- config *cfg.Config
- synchronising int32
+ newTxCh chan *types.Tx
+ txNotifyCh chan *types.Tx
+ newBlockCh chan *bc.Hash
+ newAddrCh chan *account.CtrlProgram
+ spvAddresses []*account.CtrlProgram
+ addrMutex sync.RWMutex
+ txSyncCh chan *txSyncMsg
+ quitSync chan struct{}
+ config *cfg.Config
}
//NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
- // Create the protocol manager with the base fields
- manager := &SyncManager{
- txPool: txPool,
- chain: chain,
- privKey: crypto.GenPrivKeyEd25519(),
- config: config,
- quitSync: make(chan struct{}),
- newBlockCh: newBlockCh,
- newPeerCh: make(chan struct{}),
- txSyncCh: make(chan *txsync),
- dropPeerCh: make(chan *string, maxQuitReq),
- peers: newPeerSet(),
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
+ genesisHeader, err := chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
}
- trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
- manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
-
- manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
- manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
-
- protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+ sw := p2p.NewSwitch(config)
+ peers := newPeerSet(sw)
+ manager := &SyncManager{
+ sw: sw,
+ genesisHash: genesisHeader.Hash(),
+ chain: chain,
+ privKey: crypto.GenPrivKeyEd25519(),
+ blockKeeper: newBlockKeeper(chain, peers),
+ peers: peers,
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
+ txNotifyCh: wallet.GetTxCh(),
+ newBlockCh: newBlockCh,
+ txSyncCh: make(chan *txSyncMsg),
+ quitSync: make(chan struct{}),
+ config: config,
+ newAddrCh: wallet.AccountMgr.NewAddrCh,
+ }
+ manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
+ protocolReactor := NewProtocolReactor(manager, manager.peers)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
// Create & add listener
- p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
- l := p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
- manager.sw.AddListener(l)
- manager.sw.SetNodeInfo(manager.makeNodeInfo())
+ var listenerStatus bool
+ var l p2p.Listener
+ if !config.VaultMode {
+ p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
+ l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
+ //manager.sw.AddListener(l)
+
+ discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
+ if err != nil {
+ return nil, err
+ }
+ manager.sw.SetDiscv(discv)
+ }
+ manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
manager.sw.SetNodePrivKey(manager.privKey)
+ return manager, nil
+}
- // Optionally, start the pex reactor
- //var addrBook *p2p.AddrBook
- if config.P2P.PexReactor {
- manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
- pexReactor := p2p.NewPEXReactor(manager.addrBook)
- manager.sw.AddReactor("PEX", pexReactor)
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+ bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+ if bestPeer != nil {
+ return bestPeer.getPeerInfo()
}
+ return nil
+}
- return manager, nil
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+ return sm.newTxCh
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+ return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+ peer := sm.peers.bestPeer(consensus.SFFullNode)
+ return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+ return sm.sw.NodeInfo()
+}
+
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+ if peer := sm.peers.getPeer(peerID); peer == nil {
+ return errors.New("peerId not exist")
+ }
+ sm.peers.removePeer(peerID)
+ return nil
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+ return sm.sw
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+ sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+}
+
+func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
+ sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+ blocks, err := msg.GetBlocks()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+ return
+ }
+
+ sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+ var block *types.Block
+ var err error
+ if msg.Height != 0 {
+ block, err = sm.chain.GetBlockByHeight(msg.Height)
+ } else {
+ block, err = sm.chain.GetBlockByHash(msg.GetHash())
+ }
+ if err != nil {
+ log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+ return
+ }
+
+ ok, err := peer.sendBlock(block)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+ blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(blocks) == 0 {
+ return
+ }
+
+ totalSize := 0
+ sendBlocks := []*types.Block{}
+ for _, block := range blocks {
+ rawData, err := block.MarshalText()
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+ continue
+ }
+
+ if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+ break
+ }
+ totalSize += len(rawData)
+ sendBlocks = append(sendBlocks, block)
+ }
+
+ ok, err := peer.sendBlocks(sendBlocks)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+ headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(headers) == 0 {
+ log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+ return
+ }
+
+ ok, err := peer.sendHeaders(headers)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+ headers, err := msg.GetHeaders()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+ return
+ }
+
+ sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+ bestHeader := sm.chain.BestBlockHeader()
+ genesisBlock, err := sm.chain.GetBlockByHeight(0)
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+ }
+
+ genesisHash := genesisBlock.Hash()
+ msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+ if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+ peer.setStatus(msg.Height, msg.GetHash())
+ return
+ }
+
+ if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+ log.WithFields(log.Fields{
+ "remote genesis": genesisHash.String(),
+ "local genesis": sm.genesisHash.String(),
+ }).Warn("fail hand shake due to differnt genesis")
+ return
+ }
+ if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
+ log.WithFields(log.Fields{
+ "peer ServiceFlag": basePeer.ServiceFlag(),
+ }).Warn("fail hand shake due to remote peer is not full node support spv proof")
+ return
+ }
+ sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+ tx, err := msg.GetTransaction()
+ if err != nil {
+ sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ return
+ }
+ sm.txNotifyCh <- tx
+}
+
+func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+ peer := sm.peers.getPeer(basePeer.ID())
+ if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
+ return
+ }
+
+ switch msg := msg.(type) {
+ case *StatusRequestMessage:
+ sm.handleStatusRequestMsg(basePeer)
+
+ case *StatusResponseMessage:
+ sm.handleStatusResponseMsg(basePeer, msg)
+
+ case *TransactionMessage:
+ sm.handleTransactionMsg(peer, msg)
+
+ case *HeadersMessage:
+ sm.handleHeadersMsg(peer, msg)
+
+ case *MerkleBlockMessage:
+ sm.handleMerkelBlockMsg(peer, msg)
+
+ default:
+ log.Errorf("unknown message type %v", reflect.TypeOf(msg))
+ }
}
// Defaults to tcp
return p, address
}
-func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
+func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
nodeInfo := &p2p.NodeInfo{
PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
Moniker: sm.config.Moniker,
Network: sm.config.ChainID,
Version: version.Version,
- Other: []string{
- cmn.Fmt("wire_version=%v", wire.Version),
- cmn.Fmt("p2p_version=%v", p2p.Version),
- },
+ Other: []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
}
if !sm.sw.IsListening() {
}
p2pListener := sm.sw.Listeners()[0]
- p2pHost := p2pListener.ExternalAddress().IP.String()
- p2pPort := p2pListener.ExternalAddress().Port
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
- nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
- return nodeInfo
-}
-
-func (sm *SyncManager) netStart() error {
- // Start the switch
- _, err := sm.sw.Start()
- if err != nil {
- return err
+ if listenerStatus {
+ nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
+ } else {
+ nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
}
-
- // If seeds exist, add them to the address book and dial out
- if sm.config.P2P.Seeds != "" {
- // dial out
- seeds := strings.Split(sm.config.P2P.Seeds, ",")
- if err := sm.DialSeeds(seeds); err != nil {
- return err
- }
- }
-
- return nil
+ return nodeInfo
}
//Start start sync manager service
func (sm *SyncManager) Start() {
- go sm.netStart()
+ if _, err := sm.sw.Start(); err != nil {
+ cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
+ }
+ go sm.spvAddressMgr()
// broadcast transactions
go sm.txBroadcastLoop()
-
- // broadcast mined blocks
go sm.minedBroadcastLoop()
-
- // start sync handlers
- go sm.syncer()
-
- go sm.txsyncLoop()
+ go sm.txSyncLoop()
}
//Stop stop sync manager
sm.sw.Stop()
}
-func (sm *SyncManager) txBroadcastLoop() {
- newTxCh := sm.txPool.GetNewTxCh()
- for {
- select {
- case newTx := <-newTxCh:
- peers, err := sm.peers.BroadcastTx(newTx)
- if err != nil {
- log.Errorf("Broadcast new tx error. %v", err)
- return
- }
- for _, peer := range peers {
- if ban := peer.addBanScore(0, 50, "Broadcast new tx error"); ban {
- peer := sm.peers.Peer(peer.id).getPeer()
- sm.sw.AddBannedPeer(peer)
- sm.sw.StopPeerGracefully(peer)
- }
- }
- case <-sm.quitSync:
- return
- }
+func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
+ addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
+ if err != nil {
+ return nil, err
+ }
+
+ conn, err := net.ListenUDP("udp", addr)
+ if err != nil {
+ return nil, err
+ }
+
+ realaddr := conn.LocalAddr().(*net.UDPAddr)
+ ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
+ if err != nil {
+ return nil, err
+ }
+
+ // add the seeds node to the discover table
+ if config.P2P.Seeds == "" {
+ return ntab, nil
+ }
+ nodes := []*discover.Node{}
+ for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+ version.Status.AddSeed(seed)
+ url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
+ nodes = append(nodes, discover.MustParseNode(url))
}
+ if err = ntab.SetFallbackNodes(nodes); err != nil {
+ return nil, err
+ }
+ return ntab, nil
}
func (sm *SyncManager) minedBroadcastLoop() {
log.Errorf("Failed on mined broadcast loop get block %v", err)
return
}
- peers, err := sm.peers.BroadcastMinedBlock(block)
- if err != nil {
+ if err := sm.peers.broadcastMinedBlock(block); err != nil {
log.Errorf("Broadcast mine block error. %v", err)
return
}
- for _, peer := range peers {
- if ban := peer.addBanScore(0, 50, "Broadcast block error"); ban {
- peer := sm.peers.Peer(peer.id).getPeer()
- sm.sw.AddBannedPeer(peer)
- sm.sw.StopPeerGracefully(peer)
- }
- }
case <-sm.quitSync:
return
}
}
}
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
- return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
- return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
- return sm.peers
-}
-
-//DialSeeds dial seed peers
-func (sm *SyncManager) DialSeeds(seeds []string) error {
- return sm.sw.DialSeeds(sm.addrBook, seeds)
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
- return sm.sw
-}