OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
index 033a93b..eea401b 100644 (file)
@@ -5,6 +5,7 @@ import (
        "errors"
        "net"
        "path"
+       "reflect"
        "strconv"
        "strings"
 
@@ -16,7 +17,6 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/discover"
-       "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -24,16 +24,33 @@ import (
 )
 
 const (
-       maxTxChanSize = 10000
+       maxTxChanSize         = 10000
+       maxFilterAddressSize  = 50
+       maxFilterAddressCount = 1000
 )
 
+// Chain is the interface for Bytom core
+type Chain interface {
+       BestBlockHeader() *types.BlockHeader
+       BestBlockHeight() uint64
+       CalcNextSeed(*bc.Hash) (*bc.Hash, error)
+       GetBlockByHash(*bc.Hash) (*types.Block, error)
+       GetBlockByHeight(uint64) (*types.Block, error)
+       GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
+       GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
+       InMainChain(bc.Hash) bool
+       ProcessBlock(*types.Block) (bool, error)
+       ValidateTx(*types.Tx) (bool, error)
+}
+
 //SyncManager Sync Manager is responsible for the business layer information synchronization
 type SyncManager struct {
        sw          *p2p.Switch
        genesisHash bc.Hash
 
        privKey      crypto.PrivKeyEd25519 // local node's p2p key
-       chain        *core.Chain
+       chain        Chain
        txPool       *core.TxPool
        blockFetcher *blockFetcher
        blockKeeper  *blockKeeper
@@ -47,7 +64,7 @@ type SyncManager struct {
 }
 
 //NewSyncManager create a sync manager
-func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
        genesisHeader, err := chain.GetHeaderByHeight(0)
        if err != nil {
                return nil, err
@@ -86,9 +103,7 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
                if err != nil {
                        return nil, err
                }
-
-               pexReactor := pex.NewPEXReactor(discv)
-               manager.sw.AddReactor("PEX", pexReactor)
+               manager.sw.SetDiscv(discv)
        }
        manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
@@ -140,7 +155,11 @@ func (sm *SyncManager) Switch() *p2p.Switch {
 }
 
 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
-       sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+       block, err := msg.GetBlock()
+       if err != nil {
+               return
+       }
+       sm.blockKeeper.processBlock(peer.ID(), block)
 }
 
 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
@@ -153,6 +172,18 @@ func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
        sm.blockKeeper.processBlocks(peer.ID(), blocks)
 }
 
+func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
+       peer.addFilterAddress(msg.Address)
+}
+
+func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
+       peer.filterAdds.Clear()
+}
+
+func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
+       peer.addFilterAddresses(msg.Addresses)
+}
+
 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
        var block *types.Block
        var err error
@@ -190,7 +221,7 @@ func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
                        continue
                }
 
-               if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
+               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
                        break
                }
                totalSize += len(rawData)
@@ -222,6 +253,37 @@ func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
        }
 }
 
+func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
+       var err error
+       var block *types.Block
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
+       } else {
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get block from chain")
+               return
+       }
+
+       blockHash := block.Hash()
+       txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get transaction status")
+               return
+       }
+
+       ok, err := peer.sendMerkleBlock(block, txStatus)
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+               return
+       }
+
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
@@ -288,6 +350,60 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
        }
 }
 
+func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg BlockchainMessage) {
+       peer := sm.peers.getPeer(basePeer.ID())
+       if peer == nil && msgType != StatusResponseByte && msgType != StatusRequestByte {
+               return
+       }
+
+       switch msg := msg.(type) {
+       case *GetBlockMessage:
+               sm.handleGetBlockMsg(peer, msg)
+
+       case *BlockMessage:
+               sm.handleBlockMsg(peer, msg)
+
+       case *StatusRequestMessage:
+               sm.handleStatusRequestMsg(basePeer)
+
+       case *StatusResponseMessage:
+               sm.handleStatusResponseMsg(basePeer, msg)
+
+       case *TransactionMessage:
+               sm.handleTransactionMsg(peer, msg)
+
+       case *MineBlockMessage:
+               sm.handleMineBlockMsg(peer, msg)
+
+       case *GetHeadersMessage:
+               sm.handleGetHeadersMsg(peer, msg)
+
+       case *HeadersMessage:
+               sm.handleHeadersMsg(peer, msg)
+
+       case *GetBlocksMessage:
+               sm.handleGetBlocksMsg(peer, msg)
+
+       case *BlocksMessage:
+               sm.handleBlocksMsg(peer, msg)
+
+       case *FilterLoadMessage:
+               sm.handleFilterLoadMsg(peer, msg)
+
+       case *FilterAddMessage:
+               sm.handleFilterAddMsg(peer, msg)
+
+       case *FilterClearMessage:
+               sm.handleFilterClearMsg(peer)
+
+       case *GetMerkleBlockMessage:
+               sm.handleGetMerkleBlockMsg(peer, msg)
+
+       default:
+               log.Errorf("unknown message type %v", reflect.TypeOf(msg))
+       }
+}
+
 // Defaults to tcp
 func protocolAndAddress(listenAddr string) (string, string) {
        p, address := "tcp", listenAddr
@@ -364,6 +480,7 @@ func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16)
        }
        nodes := []*discover.Node{}
        for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+               version.Status.AddSeed(seed)
                url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
                nodes = append(nodes, discover.MustParseNode(url))
        }