"reflect"
"strconv"
"strings"
+ "sync"
log "github.com/sirupsen/logrus"
"github.com/tendermint/go-crypto"
cmn "github.com/tendermint/tmlibs/common"
+ "github.com/bytom/account"
cfg "github.com/bytom/config"
"github.com/bytom/consensus"
"github.com/bytom/p2p"
"github.com/bytom/p2p/discover"
- "github.com/bytom/p2p/pex"
- core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
"github.com/bytom/version"
+ "github.com/bytom/wallet"
)
const (
GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
GetHeaderByHeight(uint64) (*types.BlockHeader, error)
InMainChain(bc.Hash) bool
- ProcessBlock(*types.Block) (bool, error)
- ValidateTx(*types.Tx) (bool, error)
+ ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
}
//SyncManager Sync Manager is responsible for the business layer information synchronization
sw *p2p.Switch
genesisHash bc.Hash
- privKey crypto.PrivKeyEd25519 // local node's p2p key
- chain Chain
- txPool *core.TxPool
- blockFetcher *blockFetcher
- blockKeeper *blockKeeper
- peers *peerSet
-
- newTxCh chan *types.Tx
- newBlockCh chan *bc.Hash
- txSyncCh chan *txSyncMsg
- quitSync chan struct{}
- config *cfg.Config
+ privKey crypto.PrivKeyEd25519 // local node's p2p key
+ chain Chain
+ blockKeeper *blockKeeper
+ peers *peerSet
+
+ newTxCh chan *types.Tx
+ txNotifyCh chan *types.Tx
+ newBlockCh chan *bc.Hash
+ newAddrCh chan *account.CtrlProgram
+ spvAddresses []*account.CtrlProgram
+ addrMutex sync.RWMutex
+ txSyncCh chan *txSyncMsg
+ quitSync chan struct{}
+ config *cfg.Config
}
//NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
genesisHeader, err := chain.GetHeaderByHeight(0)
if err != nil {
return nil, err
sw := p2p.NewSwitch(config)
peers := newPeerSet(sw)
manager := &SyncManager{
- sw: sw,
- genesisHash: genesisHeader.Hash(),
- txPool: txPool,
- chain: chain,
- privKey: crypto.GenPrivKeyEd25519(),
- blockFetcher: newBlockFetcher(chain, peers),
- blockKeeper: newBlockKeeper(chain, peers),
- peers: peers,
- newTxCh: make(chan *types.Tx, maxTxChanSize),
- newBlockCh: newBlockCh,
- txSyncCh: make(chan *txSyncMsg),
- quitSync: make(chan struct{}),
- config: config,
- }
-
+ sw: sw,
+ genesisHash: genesisHeader.Hash(),
+ chain: chain,
+ privKey: crypto.GenPrivKeyEd25519(),
+ blockKeeper: newBlockKeeper(chain, peers),
+ peers: peers,
+ newTxCh: make(chan *types.Tx, maxTxChanSize),
+ txNotifyCh: wallet.GetTxCh(),
+ newBlockCh: newBlockCh,
+ txSyncCh: make(chan *txSyncMsg),
+ quitSync: make(chan struct{}),
+ config: config,
+ newAddrCh: wallet.AccountMgr.NewAddrCh,
+ }
+ manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
protocolReactor := NewProtocolReactor(manager, manager.peers)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
if !config.VaultMode {
p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
- manager.sw.AddListener(l)
+ //manager.sw.AddListener(l)
discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
if err != nil {
return nil, err
}
-
- pexReactor := pex.NewPEXReactor(discv)
- manager.sw.AddReactor("PEX", pexReactor)
+ manager.sw.SetDiscv(discv)
}
manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
manager.sw.SetNodePrivKey(manager.privKey)
sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
}
+func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
+ sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
+}
+
func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
blocks, err := msg.GetBlocks()
if err != nil {
sm.blockKeeper.processHeaders(peer.ID(), headers)
}
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
- block, err := msg.GetMineBlock()
- if err != nil {
- log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
- return
- }
-
- hash := block.Hash()
- peer.markBlock(&hash)
- sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
- peer.setStatus(block.Height, &hash)
-}
-
func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
bestHeader := sm.chain.BestBlockHeader()
genesisBlock, err := sm.chain.GetBlockByHeight(0)
}).Warn("fail hand shake due to differnt genesis")
return
}
-
+ if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
+ log.WithFields(log.Fields{
+ "peer ServiceFlag": basePeer.ServiceFlag(),
+ }).Warn("fail hand shake due to remote peer is not full node support spv proof")
+ return
+ }
sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
}
sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
return
}
-
- if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
- sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
- }
+ sm.txNotifyCh <- tx
}
func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
}
switch msg := msg.(type) {
- case *GetBlockMessage:
- sm.handleGetBlockMsg(peer, msg)
-
- case *BlockMessage:
- sm.handleBlockMsg(peer, msg)
-
case *StatusRequestMessage:
sm.handleStatusRequestMsg(basePeer)
case *TransactionMessage:
sm.handleTransactionMsg(peer, msg)
- case *MineBlockMessage:
- sm.handleMineBlockMsg(peer, msg)
-
- case *GetHeadersMessage:
- sm.handleGetHeadersMsg(peer, msg)
-
case *HeadersMessage:
sm.handleHeadersMsg(peer, msg)
- case *GetBlocksMessage:
- sm.handleGetBlocksMsg(peer, msg)
-
- case *BlocksMessage:
- sm.handleBlocksMsg(peer, msg)
+ case *MerkleBlockMessage:
+ sm.handleMerkelBlockMsg(peer, msg)
default:
log.Errorf("unknown message type %v", reflect.TypeOf(msg))
if _, err := sm.sw.Start(); err != nil {
cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
}
+ go sm.spvAddressMgr()
// broadcast transactions
go sm.txBroadcastLoop()
go sm.minedBroadcastLoop()
}
nodes := []*discover.Node{}
for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+ version.Status.AddSeed(seed)
url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
nodes = append(nodes, discover.MustParseNode(url))
}