OSDN Git Service

Merge dev branch
[bytom/bytom-spv.git] / netsync / handle.go
index 0793a6a..8c4c84f 100644 (file)
@@ -8,20 +8,21 @@ import (
        "reflect"
        "strconv"
        "strings"
+       "sync"
 
        log "github.com/sirupsen/logrus"
        "github.com/tendermint/go-crypto"
        cmn "github.com/tendermint/tmlibs/common"
 
+       "github.com/bytom/account"
        cfg "github.com/bytom/config"
        "github.com/bytom/consensus"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/discover"
-       "github.com/bytom/p2p/pex"
-       core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
        "github.com/bytom/version"
+       "github.com/bytom/wallet"
 )
 
 const (
@@ -38,8 +39,7 @@ type Chain interface {
        GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
        GetHeaderByHeight(uint64) (*types.BlockHeader, error)
        InMainChain(bc.Hash) bool
-       ProcessBlock(*types.Block) (bool, error)
-       ValidateTx(*types.Tx) (bool, error)
+       ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
 }
 
 //SyncManager Sync Manager is responsible for the business layer information synchronization
@@ -47,22 +47,24 @@ type SyncManager struct {
        sw          *p2p.Switch
        genesisHash bc.Hash
 
-       privKey      crypto.PrivKeyEd25519 // local node's p2p key
-       chain        Chain
-       txPool       *core.TxPool
-       blockFetcher *blockFetcher
-       blockKeeper  *blockKeeper
-       peers        *peerSet
-
-       newTxCh    chan *types.Tx
-       newBlockCh chan *bc.Hash
-       txSyncCh   chan *txSyncMsg
-       quitSync   chan struct{}
-       config     *cfg.Config
+       privKey     crypto.PrivKeyEd25519 // local node's p2p key
+       chain       Chain
+       blockKeeper *blockKeeper
+       peers       *peerSet
+
+       newTxCh      chan *types.Tx
+       txNotifyCh   chan *types.Tx
+       newBlockCh   chan *bc.Hash
+       newAddrCh    chan *account.CtrlProgram
+       spvAddresses []*account.CtrlProgram
+       addrMutex    sync.RWMutex
+       txSyncCh     chan *txSyncMsg
+       quitSync     chan struct{}
+       config       *cfg.Config
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
@@ -71,21 +73,21 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo
        sw := p2p.NewSwitch(config)
        peers := newPeerSet(sw)
        manager := &SyncManager{
-               sw:           sw,
-               genesisHash:  genesisHeader.Hash(),
-               txPool:       txPool,
-               chain:        chain,
-               privKey:      crypto.GenPrivKeyEd25519(),
-               blockFetcher: newBlockFetcher(chain, peers),
-               blockKeeper:  newBlockKeeper(chain, peers),
-               peers:        peers,
-               newTxCh:      make(chan *types.Tx, maxTxChanSize),
-               newBlockCh:   newBlockCh,
-               txSyncCh:     make(chan *txSyncMsg),
-               quitSync:     make(chan struct{}),
-               config:       config,
-       }
-
+               sw:          sw,
+               genesisHash: genesisHeader.Hash(),
+               chain:       chain,
+               privKey:     crypto.GenPrivKeyEd25519(),
+               blockKeeper: newBlockKeeper(chain, peers),
+               peers:       peers,
+               newTxCh:     make(chan *types.Tx, maxTxChanSize),
+               txNotifyCh:  wallet.GetTxCh(),
+               newBlockCh:  newBlockCh,
+               txSyncCh:    make(chan *txSyncMsg),
+               quitSync:    make(chan struct{}),
+               config:      config,
+               newAddrCh:   wallet.AccountMgr.NewAddrCh,
+       }
+       manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
        protocolReactor := NewProtocolReactor(manager, manager.peers)
        manager.sw.AddReactor("PROTOCOL", protocolReactor)
 
@@ -95,15 +97,13 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo
        if !config.VaultMode {
                p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
                l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
-               manager.sw.AddListener(l)
+               //manager.sw.AddListener(l)
 
                discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
                if err != nil {
                        return nil, err
                }
-
-               pexReactor := pex.NewPEXReactor(discv)
-               manager.sw.AddReactor("PEX", pexReactor)
+               manager.sw.SetDiscv(discv)
        }
        manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
@@ -158,6 +158,10 @@ func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
        sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
 }
 
+func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
+       sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
+}
+
 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
        blocks, err := msg.GetBlocks()
        if err != nil {
@@ -247,19 +251,6 @@ func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
        sm.blockKeeper.processHeaders(peer.ID(), headers)
 }
 
-func (sm *SyncManager) handleMineBlockMsg(peer *peer, msg *MineBlockMessage) {
-       block, err := msg.GetMineBlock()
-       if err != nil {
-               log.WithField("err", err).Warning("fail on handleMineBlockMsg GetMineBlock")
-               return
-       }
-
-       hash := block.Hash()
-       peer.markBlock(&hash)
-       sm.blockFetcher.processNewBlock(&blockMsg{peerID: peer.ID(), block: block})
-       peer.setStatus(block.Height, &hash)
-}
-
 func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
        bestHeader := sm.chain.BestBlockHeader()
        genesisBlock, err := sm.chain.GetBlockByHeight(0)
@@ -287,7 +278,12 @@ func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusRes
                }).Warn("fail hand shake due to differnt genesis")
                return
        }
-
+       if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
+               log.WithFields(log.Fields{
+                       "peer ServiceFlag": basePeer.ServiceFlag(),
+               }).Warn("fail hand shake due to remote peer is not full node support spv proof")
+               return
+       }
        sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
 }
 
@@ -297,10 +293,7 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
                sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
                return
        }
-
-       if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && isOrphan == false {
-               sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
-       }
+       sm.txNotifyCh <- tx
 }
 
 func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
@@ -310,12 +303,6 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        }
 
        switch msg := msg.(type) {
-       case *GetBlockMessage:
-               sm.handleGetBlockMsg(peer, msg)
-
-       case *BlockMessage:
-               sm.handleBlockMsg(peer, msg)
-
        case *StatusRequestMessage:
                sm.handleStatusRequestMsg(basePeer)
 
@@ -325,20 +312,11 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *TransactionMessage:
                sm.handleTransactionMsg(peer, msg)
 
-       case *MineBlockMessage:
-               sm.handleMineBlockMsg(peer, msg)
-
-       case *GetHeadersMessage:
-               sm.handleGetHeadersMsg(peer, msg)
-
        case *HeadersMessage:
                sm.handleHeadersMsg(peer, msg)
 
-       case *GetBlocksMessage:
-               sm.handleGetBlocksMsg(peer, msg)
-
-       case *BlocksMessage:
-               sm.handleBlocksMsg(peer, msg)
+       case *MerkleBlockMessage:
+               sm.handleMerkelBlockMsg(peer, msg)
 
        default:
                log.Errorf("unknown message type %v", reflect.TypeOf(msg))
@@ -386,6 +364,7 @@ func (sm *SyncManager) Start() {
        if _, err := sm.sw.Start(); err != nil {
                cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
        }
+       go sm.spvAddressMgr()
        // broadcast transactions
        go sm.txBroadcastLoop()
        go sm.minedBroadcastLoop()
@@ -421,6 +400,7 @@ func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16)
        }
        nodes := []*discover.Node{}
        for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+               version.Status.AddSeed(seed)
                url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
                nodes = append(nodes, discover.MustParseNode(url))
        }