OSDN Git Service

Merge dev branch
[bytom/bytom-spv.git] / netsync / handle.go
index 2438e09..8c4c84f 100644 (file)
 package netsync
 
 import (
+       "encoding/hex"
+       "errors"
+       "net"
+       "path"
+       "reflect"
+       "strconv"
        "strings"
+       "sync"
 
        log "github.com/sirupsen/logrus"
        "github.com/tendermint/go-crypto"
-       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
-       dbm "github.com/tendermint/tmlibs/db"
 
+       "github.com/bytom/account"
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/p2p"
-       core "github.com/bytom/protocol"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
        "github.com/bytom/version"
+       "github.com/bytom/wallet"
 )
 
+const (
+       maxTxChanSize = 10000
+)
+
+// Chain is the interface for Bytom core
+type Chain interface {
+       BestBlockHeader() *types.BlockHeader
+       BestBlockHeight() uint64
+       CalcNextSeed(*bc.Hash) (*bc.Hash, error)
+       GetBlockByHash(*bc.Hash) (*types.Block, error)
+       GetBlockByHeight(uint64) (*types.Block, error)
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+       GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+       InMainChain(bc.Hash) bool
+       ProcessBlock(*types.Block, *bc.TransactionStatus) (bool, error)
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
-       networkID uint64
-       sw        *p2p.Switch
-       addrBook  *p2p.AddrBook // known peers
+       sw          *p2p.Switch
+       genesisHash bc.Hash
 
        privKey     crypto.PrivKeyEd25519 // local node's p2p key
-       chain       *core.Chain
-       txPool      *core.TxPool
-       fetcher     *Fetcher
+       chain       Chain
        blockKeeper *blockKeeper
        peers       *peerSet
 
-       newBlockCh    chan *bc.Hash
-       newPeerCh     chan struct{}
-       txSyncCh      chan *txsync
-       dropPeerCh    chan *string
-       quitSync      chan struct{}
-       config        *cfg.Config
-       synchronising int32
+       newTxCh      chan *types.Tx
+       txNotifyCh   chan *types.Tx
+       newBlockCh   chan *bc.Hash
+       newAddrCh    chan *account.CtrlProgram
+       spvAddresses []*account.CtrlProgram
+       addrMutex    sync.RWMutex
+       txSyncCh     chan *txSyncMsg
+       quitSync     chan struct{}
+       config       *cfg.Config
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
-       // Create the protocol manager with the base fields
+func NewSyncManager(config *cfg.Config, chain Chain, newBlockCh chan *bc.Hash, wallet *wallet.Wallet) (*SyncManager, error) {
+       genesisHeader, err := chain.GetHeaderByHeight(0)
+       if err != nil {
+               return nil, err
+       }
+
+       sw := p2p.NewSwitch(config)
+       peers := newPeerSet(sw)
        manager := &SyncManager{
-               txPool:     txPool,
-               chain:      chain,
-               privKey:    crypto.GenPrivKeyEd25519(),
-               config:     config,
-               quitSync:   make(chan struct{}),
-               newBlockCh: newBlockCh,
-               newPeerCh:  make(chan struct{}),
-               txSyncCh:   make(chan *txsync),
-               dropPeerCh: make(chan *string, maxQuitReq),
-               peers:      newPeerSet(),
+               sw:          sw,
+               genesisHash: genesisHeader.Hash(),
+               chain:       chain,
+               privKey:     crypto.GenPrivKeyEd25519(),
+               blockKeeper: newBlockKeeper(chain, peers),
+               peers:       peers,
+               newTxCh:     make(chan *types.Tx, maxTxChanSize),
+               txNotifyCh:  wallet.GetTxCh(),
+               newBlockCh:  newBlockCh,
+               txSyncCh:    make(chan *txSyncMsg),
+               quitSync:    make(chan struct{}),
+               config:      config,
+               newAddrCh:   wallet.AccountMgr.NewAddrCh,
        }
+       manager.spvAddresses, _ = wallet.AccountMgr.ListControlProgram()
+       protocolReactor := NewProtocolReactor(manager, manager.peers)
+       manager.sw.AddReactor("PROTOCOL", protocolReactor)
 
-       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-       manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+       // Create & add listener
+       var listenerStatus bool
+       var l p2p.Listener
+       if !config.VaultMode {
+               p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
+               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP)
+               //manager.sw.AddListener(l)
+
+               discv, err := initDiscover(config, &manager.privKey, l.ExternalAddress().Port)
+               if err != nil {
+                       return nil, err
+               }
+               manager.sw.SetDiscv(discv)
+       }
+       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
+       manager.sw.SetNodePrivKey(manager.privKey)
+       return manager, nil
+}
 
-       manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
-       manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
+//BestPeer return the highest p2p peerInfo
+func (sm *SyncManager) BestPeer() *PeerInfo {
+       bestPeer := sm.peers.bestPeer(consensus.SFFullNode)
+       if bestPeer != nil {
+               return bestPeer.getPeerInfo()
+       }
+       return nil
+}
 
-       protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
-       manager.sw.AddReactor("PROTOCOL", protocolReactor)
+// GetNewTxCh return a unconfirmed transaction feed channel
+func (sm *SyncManager) GetNewTxCh() chan *types.Tx {
+       return sm.newTxCh
+}
+
+//GetPeerInfos return peer info of all peers
+func (sm *SyncManager) GetPeerInfos() []*PeerInfo {
+       return sm.peers.getPeerInfos()
+}
+
+//IsCaughtUp check wheather the peer finish the sync
+func (sm *SyncManager) IsCaughtUp() bool {
+       peer := sm.peers.bestPeer(consensus.SFFullNode)
+       return peer == nil || peer.Height() <= sm.chain.BestBlockHeight()
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       return sm.sw.NodeInfo()
+}
 
-       // Optionally, start the pex reactor
-       //var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(manager.addrBook)
-               manager.sw.AddReactor("PEX", pexReactor)
+//StopPeer try to stop peer by given ID
+func (sm *SyncManager) StopPeer(peerID string) error {
+       if peer := sm.peers.getPeer(peerID); peer == nil {
+               return errors.New("peerId not exist")
        }
+       sm.peers.removePeer(peerID)
+       return nil
+}
 
-       return manager, nil
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+       return sm.sw
+}
+
+func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
+       sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+}
+
+func (sm *SyncManager) handleMerkelBlockMsg(peer *peer, msg *MerkleBlockMessage) {
+       sm.blockKeeper.processMerkleBlock(peer.ID(), msg.GetMerkleBlock())
+}
+
+func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
+       blocks, err := msg.GetBlocks()
+       if err != nil {
+               log.WithField("err", err).Debug("fail on handleBlocksMsg GetBlocks")
+               return
+       }
+
+       sm.blockKeeper.processBlocks(peer.ID(), blocks)
+}
+
+func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
+       var block *types.Block
+       var err error
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
+       } else {
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetBlockMsg get block from chain")
+               return
+       }
+
+       ok, err := peer.sendBlock(block)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetBlockMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
+       blocks, err := sm.blockKeeper.locateBlocks(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(blocks) == 0 {
+               return
+       }
+
+       totalSize := 0
+       sendBlocks := []*types.Block{}
+       for _, block := range blocks {
+               rawData, err := block.MarshalText()
+               if err != nil {
+                       log.WithField("err", err).Error("fail on handleGetBlocksMsg marshal block")
+                       continue
+               }
+
+               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
+                       break
+               }
+               totalSize += len(rawData)
+               sendBlocks = append(sendBlocks, block)
+       }
+
+       ok, err := peer.sendBlocks(sendBlocks)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetBlocksMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
+       headers, err := sm.blockKeeper.locateHeaders(msg.GetBlockLocator(), msg.GetStopHash())
+       if err != nil || len(headers) == 0 {
+               log.WithField("err", err).Debug("fail on handleGetHeadersMsg locateHeaders")
+               return
+       }
+
+       ok, err := peer.sendHeaders(headers)
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetHeadersMsg sentBlock")
+       }
+}
+
+func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
+       headers, err := msg.GetHeaders()
+       if err != nil {
+               log.WithField("err", err).Debug("fail on handleHeadersMsg GetHeaders")
+               return
+       }
+
+       sm.blockKeeper.processHeaders(peer.ID(), headers)
+}
+
+func (sm *SyncManager) handleStatusRequestMsg(peer BasePeer) {
+       bestHeader := sm.chain.BestBlockHeader()
+       genesisBlock, err := sm.chain.GetBlockByHeight(0)
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleStatusRequestMsg get genesis")
+       }
+
+       genesisHash := genesisBlock.Hash()
+       msg := NewStatusResponseMessage(bestHeader, &genesisHash)
+       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
+func (sm *SyncManager) handleStatusResponseMsg(basePeer BasePeer, msg *StatusResponseMessage) {
+       if peer := sm.peers.getPeer(basePeer.ID()); peer != nil {
+               peer.setStatus(msg.Height, msg.GetHash())
+               return
+       }
+
+       if genesisHash := msg.GetGenesisHash(); sm.genesisHash != *genesisHash {
+               log.WithFields(log.Fields{
+                       "remote genesis": genesisHash.String(),
+                       "local genesis":  sm.genesisHash.String(),
+               }).Warn("fail hand shake due to differnt genesis")
+               return
+       }
+       if basePeer.ServiceFlag().IsEnable(consensus.SFFullNode|consensus.SFSpvProof) == false {
+               log.WithFields(log.Fields{
+                       "peer ServiceFlag": basePeer.ServiceFlag(),
+               }).Warn("fail hand shake due to remote peer is not full node support spv proof")
+               return
+       }
+       sm.peers.addPeer(basePeer, msg.Height, msg.GetHash())
+}
+
+func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage) {
+       tx, err := msg.GetTransaction()
+       if err != nil {
+               sm.peers.addBanScore(peer.ID(), 0, 10, "fail on get tx from message")
+               return
+       }
+       sm.txNotifyCh <- tx
+}
+
+func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+       peer := sm.peers.getPeer(basePeer.ID())
+       if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
+               return
+       }
+
+       switch msg := msg.(type) {
+       case *StatusRequestMessage:
+               sm.handleStatusRequestMsg(basePeer)
+
+       case *StatusResponseMessage:
+               sm.handleStatusResponseMsg(basePeer, msg)
+
+       case *TransactionMessage:
+               sm.handleTransactionMsg(peer, msg)
+
+       case *HeadersMessage:
+               sm.handleHeadersMsg(peer, msg)
+
+       case *MerkleBlockMessage:
+               sm.handleMerkelBlockMsg(peer, msg)
+
+       default:
+               log.Errorf("unknown message type %v", reflect.TypeOf(msg))
+       }
 }
 
 // Defaults to tcp
@@ -84,16 +333,13 @@ func protocolAndAddress(listenAddr string) (string, string) {
        return p, address
 }
 
-func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
+func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
        nodeInfo := &p2p.NodeInfo{
                PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
                Moniker: sm.config.Moniker,
-               Network: "bytom",
+               Network: sm.config.ChainID,
                Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
+               Other:   []string{strconv.FormatUint(uint64(consensus.DefaultServices), 10)},
        }
 
        if !sm.sw.IsListening() {
@@ -101,57 +347,28 @@ func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
        }
 
        p2pListener := sm.sw.Listeners()[0]
-       p2pHost := p2pListener.ExternalAddress().IP.String()
-       p2pPort := p2pListener.ExternalAddress().Port
 
        // We assume that the rpcListener has the same ExternalAddress.
        // This is probably true because both P2P and RPC listeners use UPnP,
        // except of course if the rpc is only bound to localhost
-       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       return nodeInfo
-}
-
-func (sm *SyncManager) netStart() error {
-       // Create & add listener
-       p, address := protocolAndAddress(sm.config.P2P.ListenAddress)
-
-       l := p2p.NewDefaultListener(p, address, sm.config.P2P.SkipUPNP, nil)
-
-       sm.sw.AddListener(l)
-
-       // Start the switch
-       sm.sw.SetNodeInfo(sm.makeNodeInfo())
-       sm.sw.SetNodePrivKey(sm.privKey)
-       _, err := sm.sw.Start()
-       if err != nil {
-               return err
+       if listenerStatus {
+               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
+       } else {
+               nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
        }
-
-       // If seeds exist, add them to the address book and dial out
-       if sm.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(sm.config.P2P.Seeds, ",")
-               if err := sm.DialSeeds(seeds); err != nil {
-                       return err
-               }
-       }
-
-       return nil
+       return nodeInfo
 }
 
 //Start start sync manager service
 func (sm *SyncManager) Start() {
-       sm.netStart()
+       if _, err := sm.sw.Start(); err != nil {
+               cmn.Exit(cmn.Fmt("fail on start SyncManager: %v", err))
+       }
+       go sm.spvAddressMgr()
        // broadcast transactions
        go sm.txBroadcastLoop()
-
-       // broadcast mined blocks
        go sm.minedBroadcastLoop()
-
-       // start sync handlers
-       go sm.syncer()
-
-       go sm.txsyncLoop()
+       go sm.txSyncLoop()
 }
 
 //Stop stop sync manager
@@ -160,17 +377,37 @@ func (sm *SyncManager) Stop() {
        sm.sw.Stop()
 }
 
-func (sm *SyncManager) txBroadcastLoop() {
-       newTxCh := sm.txPool.GetNewTxCh()
-       for {
-               select {
-               case newTx := <-newTxCh:
-                       sm.peers.BroadcastTx(newTx)
+func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16) (*discover.Network, error) {
+       addr, err := net.ResolveUDPAddr("udp", net.JoinHostPort("0.0.0.0", strconv.FormatUint(uint64(port), 10)))
+       if err != nil {
+               return nil, err
+       }
 
-               case <-sm.quitSync:
-                       return
-               }
+       conn, err := net.ListenUDP("udp", addr)
+       if err != nil {
+               return nil, err
+       }
+
+       realaddr := conn.LocalAddr().(*net.UDPAddr)
+       ntab, err := discover.ListenUDP(priv, conn, realaddr, path.Join(config.DBDir(), "discover.db"), nil)
+       if err != nil {
+               return nil, err
+       }
+
+       // add the seeds node to the discover table
+       if config.P2P.Seeds == "" {
+               return ntab, nil
+       }
+       nodes := []*discover.Node{}
+       for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+               version.Status.AddSeed(seed)
+               url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
+               nodes = append(nodes, discover.MustParseNode(url))
        }
+       if err = ntab.SetFallbackNodes(nodes); err != nil {
+               return nil, err
+       }
+       return ntab, nil
 }
 
 func (sm *SyncManager) minedBroadcastLoop() {
@@ -182,34 +419,12 @@ func (sm *SyncManager) minedBroadcastLoop() {
                                log.Errorf("Failed on mined broadcast loop get block %v", err)
                                return
                        }
-                       sm.peers.BroadcastMinedBlock(block)
+                       if err := sm.peers.broadcastMinedBlock(block); err != nil {
+                               log.Errorf("Broadcast mine block error. %v", err)
+                               return
+                       }
                case <-sm.quitSync:
                        return
                }
        }
 }
-
-//NodeInfo get P2P peer node info
-func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
-       return sm.sw.NodeInfo()
-}
-
-//BlockKeeper get block keeper
-func (sm *SyncManager) BlockKeeper() *blockKeeper {
-       return sm.blockKeeper
-}
-
-//Peers get sync manager peer set
-func (sm *SyncManager) Peers() *peerSet {
-       return sm.peers
-}
-
-//DialSeeds dial seed peers
-func (sm *SyncManager) DialSeeds(seeds []string) error {
-       return sm.sw.DialSeeds(sm.addrBook, seeds)
-}
-
-//Switch get sync manager switch
-func (sm *SyncManager) Switch() *p2p.Switch {
-       return sm.sw
-}