import (
"encoding/hex"
+ "errors"
"net"
"path"
+ "reflect"
"strconv"
"strings"
+ "sync"
log "github.com/sirupsen/logrus"
"github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
+ "github.com/bytom/account"
cfg "github.com/bytom/config"
"github.com/bytom/consensus"
"github.com/bytom/p2p"
"github.com/bytom/p2p/discover"
- "github.com/bytom/p2p/pex"
- core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/version"
+ "github.com/bytom/wallet"
)
+const (
+ maxTxChanSize = 10000
+)
+
+// Chain is the interface for Bytom core
+type Chain interface {
+ BestBlockHeader() *types.BlockHeader
+ BestBlockHeight() uint64
+ CalcNextSeed(*bc.Hash) (*bc.Hash, error)
+ GetBlockByHash(*bc.Hash) (*types.Block, error)
+ GetBlockByHeight(uint64) (*types.Block, error)
+ GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+ GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+ InMainChain(bc.Hash) bool
+ ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
+}
+
//SyncManager Sync Manager is responsible for the business layer information synchronization
type SyncManager struct {
- networkID uint64
- sw *p2p.Switch
+ sw *p2p.Switch
+ genesisHash bc.Hash
privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain *core.Chain
- txPool *core.TxPool
- fetcher *Fetcher
+ chain Chain
blockKeeper *blockKeeper
peers *peerSet
- newTxCh chan *types.Tx
- newBlockCh chan *bc.Hash
- newPeerCh chan struct{}
- txSyncCh chan *txsync
- dropPeerCh chan *string
- quitSync chan struct{}
- config *cfg.Config
- synchronising int32
+ newTxCh chan *types.Tx
+ txNotifyCh chan *types.Tx
+ newBlockCh chan *bc.Hash
+ newAddrCh chan *account.CtrlProgram
+ spvAddresses []*account.CtrlProgram
+ addrMutex sync.RWMutex
+ txSyncCh chan *txSyncMsg
+ quitSync chan struct{}
+ config *cfg.Config
}
//NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
+ genesisHeader, err := chain.GetHeaderByHeight(0)
+ if err != nil {
+ return nil, err
+ }
+
sw := p2p.NewSwitch(config)
- peers := newPeerSet()
- dropPeerCh := make(chan *string, maxQuitReq)
+ peers := newPeerSet(sw)
manager := &SyncManager{
sw: sw,
- txPool: txPool,
+ genesisHash: genesisHeader.Hash(),
chain: chain,
privKey: crypto.GenPrivKeyEd25519(),
- fetcher: NewFetcher(chain, sw, peers),
- blockKeeper: newBlockKeeper(chain, sw, peers, dropPeerCh),
+ blockKeeper: newBlockKeeper(chain, peers),
peers: peers,
newTxCh: make(chan *types.Tx, maxTxChanSize),
+ txNotifyCh: wallet.GetTxCh(),
newBlockCh: newBlockCh,
- newPeerCh: make(chan struct{}),
- txSyncCh: make(chan *txsync),
- dropPeerCh: dropPeerCh,
+ txSyncCh: make(chan *txSyncMsg),
quitSync: make(chan struct{}),
config: config,
+ newAddrCh: wallet.AccountMgr.NewAddrCh,
}
-
- protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+ manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
+ protocolReactor := NewProtocolReactor(manager, manager.peers)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
// Create & add listener
if !config.VaultMode {
p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
- manager.sw.AddListener(l)
+ //manager.sw.AddListener(l)
discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
if err != nil {
return nil, err
}
-
- pexReactor := pex.NewPEXReactor(discv)
- manager.sw.AddReactor("PEX", pexReactor)
+ manager.sw.SetDiscv(discv)
}
manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
manager.sw.SetNodePrivKey(manager.privKey)
return manager, nil
}
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+ bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+ if bestPeer != nil {
+ return bestPeer.getPeerInfo()
+ }
+ return nil
+}
+
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+ return sm.newTxCh
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+ return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+ peer := sm.peers.bestPeer(consensus.SFFullNode)
+ return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+ return sm.sw.NodeInfo()
+}
+
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+ if peer := sm.peers.getPeer(peerID); peer == nil {
+ return errors.New("peerId not exist")
+ }
+ sm.peers.removePeer(peerID)
+ return nil
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+ return sm.sw
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+ sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+}
+
+func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
+ sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+ blocks, err := msg.GetBlocks()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+ return
+ }
+
+ sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+ var block *types.Block
+ var err error
+ if msg.Height != 0 {
+ block, err = sm.chain.GetBlockByHeight(msg.Height)
+ } else {
+ block, err = sm.chain.GetBlockByHash(msg.GetHash())
+ }
+ if err != nil {
+ log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+ return
+ }
+
+ ok, err := peer.sendBlock(block)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+ blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(blocks) == 0 {
+ return
+ }
+
+ totalSize := 0
+ sendBlocks := []*types.Block{}
+ for _, block := range blocks {
+ rawData, err := block.MarshalText()
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+ continue
+ }
+
+ if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+ break
+ }
+ totalSize += len(rawData)
+ sendBlocks = append(sendBlocks, block)
+ }
+
+ ok, err := peer.sendBlocks(sendBlocks)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+ headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+ if err != nil || len(headers) == 0 {
+ log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+ return
+ }
+
+ ok, err := peer.sendHeaders(headers)
+ if !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+ }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+ headers, err := msg.GetHeaders()
+ if err != nil {
+ log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+ return
+ }
+
+ sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+ bestHeader := sm.chain.BestBlockHeader()
+ genesisBlock, err := sm.chain.GetBlockByHeight(0)
+ if err != nil {
+ log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+ }
+
+ genesisHash := genesisBlock.Hash()
+ msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+ if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+ sm.peers.removePeer(peer.ID())
+ }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+ if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+ peer.setStatus(msg.Height, msg.GetHash())
+ return
+ }
+
+ if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+ log.WithFields(log.Fields{
+ "remote genesis": genesisHash.String(),
+ "local genesis": sm.genesisHash.String(),
+ }).Warn("fail hand shake due to differnt genesis")
+ return
+ }
+ if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
+ log.WithFields(log.Fields{
+ "peer ServiceFlag": basePeer.ServiceFlag(),
+ }).Warn("fail hand shake due to remote peer is not full node support spv proof")
+ return
+ }
+ sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+ tx, err := msg.GetTransaction()
+ if err != nil {
+ sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+ return
+ }
+ sm.txNotifyCh <- tx
+}
+
+func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+ peer := sm.peers.getPeer(basePeer.ID())
+ if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
+ return
+ }
+
+ switch msg := msg.(type) {
+ case *StatusRequestMessage:
+ sm.handleStatusRequestMsg(basePeer)
+
+ case *StatusResponseMessage:
+ sm.handleStatusResponseMsg(basePeer, msg)
+
+ case *TransactionMessage:
+ sm.handleTransactionMsg(peer, msg)
+
+ case *HeadersMessage:
+ sm.handleHeadersMsg(peer, msg)
+
+ case *MerkleBlockMessage:
+ sm.handleMerkelBlockMsg(peer, msg)
+
+ default:
+ log.Errorf("unknown message type %v", reflect.TypeOf(msg))
+ }
+}
+
// Defaults to tcp
func protocolAndAddress(listenAddr string) (string, string) {
p, address := "tcp", listenAddr
if _, err := sm.sw.Start(); err != nil {
cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
}
+ go sm.spvAddressMgr()
// broadcast transactions
go sm.txBroadcastLoop()
-
- // broadcast mined blocks
go sm.minedBroadcastLoop()
-
- // start sync handlers
- go sm.syncer()
-
- go sm.txsyncLoop()
+ go sm.txSyncLoop()
}
//Stop stop sync manager
}
nodes := []*discover.Node{}
for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+ version.Status.AddSeed(seed)
url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
nodes = append(nodes, discover.MustParseNode(url))
}
return ntab, nil
}
-func (sm *SyncManager) txBroadcastLoop() {
- for {
- select {
- case newTx := <-sm.newTxCh:
- peers, err := sm.peers.BroadcastTx(newTx)
- if err != nil {
- log.Errorf("Broadcast new tx error. %v", err)
- return
- }
- for _, smPeer := range peers {
- if smPeer == nil {
- continue
- }
- swPeer := smPeer.getPeer()
- log.Info("Tx broadcast error. Stop Peer.")
- sm.sw.StopPeerGracefully(swPeer)
- }
- case <-sm.quitSync:
- return
- }
- }
-}
-
func (sm *SyncManager) minedBroadcastLoop() {
for {
select {
log.Errorf("Failed on mined broadcast loop get block %v", err)
return
}
- peers, err := sm.peers.BroadcastMinedBlock(block)
- if err != nil {
+ if err := sm.peers.broadcastMinedBlock(block); err != nil {
log.Errorf("Broadcast mine block error. %v", err)
return
}
- for _, smPeer := range peers {
- if smPeer == nil {
- continue
- }
- swPeer := smPeer.getPeer()
- log.Info("New mined block broadcast error. Stop Peer.")
- sm.sw.StopPeerGracefully(swPeer)
- }
case <-sm.quitSync:
return
}
}
}
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
- return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
- return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
- return sm.peers
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
- return sm.sw
-}
-
-// GetNewTxCh return a unconfirmed transaction feed channel
-func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
- return sm.newTxCh
-}