OSDN Git Service

Merge pull request #1386 from Bytom/dev
[bytom/bytom.git] / netsync / handle.go
index 6a4526b..eea401b 100644 (file)
@@ -17,7 +17,6 @@ import (
        "github.com/bytom/consensus"
        "github.com/bytom/p2p"
        "github.com/bytom/p2p/discover"
-       "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -25,7 +24,9 @@ import (
 )
 
 const (
-       maxTxChanSize = 10000
+       maxTxChanSize         = 10000
+       maxFilterAddressSize  = 50
+       maxFilterAddressCount = 1000
 )
 
 // Chain is the interface for Bytom core
@@ -37,6 +38,7 @@ type Chain interface {
        GetBlockByHeight(uint64) (*types.Block, error)
        GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error)
        GetHeaderByHeight(uint64) (*types.BlockHeader, error)
+       GetTransactionStatus(*bc.Hash) (*bc.TransactionStatus, error)
        InMainChain(bc.Hash) bool
        ProcessBlock(*types.Block) (bool, error)
        ValidateTx(*types.Tx) (bool, error)
@@ -101,9 +103,7 @@ func NewSyncManager(config *cfg.Config, chain Chain, txPool *core.TxPool, newBlo
                if err != nil {
                        return nil, err
                }
-
-               pexReactor := pex.NewPEXReactor(discv)
-               manager.sw.AddReactor("PEX", pexReactor)
+               manager.sw.SetDiscv(discv)
        }
        manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
@@ -155,7 +155,11 @@ func (sm *SyncManager) Switch() *p2p.Switch {
 }
 
 func (sm *SyncManager) handleBlockMsg(peer *peer, msg *BlockMessage) {
-       sm.blockKeeper.processBlock(peer.ID(), msg.GetBlock())
+       block, err := msg.GetBlock()
+       if err != nil {
+               return
+       }
+       sm.blockKeeper.processBlock(peer.ID(), block)
 }
 
 func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
@@ -168,6 +172,18 @@ func (sm *SyncManager) handleBlocksMsg(peer *peer, msg *BlocksMessage) {
        sm.blockKeeper.processBlocks(peer.ID(), blocks)
 }
 
+func (sm *SyncManager) handleFilterAddMsg(peer *peer, msg *FilterAddMessage) {
+       peer.addFilterAddress(msg.Address)
+}
+
+func (sm *SyncManager) handleFilterClearMsg(peer *peer) {
+       peer.filterAdds.Clear()
+}
+
+func (sm *SyncManager) handleFilterLoadMsg(peer *peer, msg *FilterLoadMessage) {
+       peer.addFilterAddresses(msg.Addresses)
+}
+
 func (sm *SyncManager) handleGetBlockMsg(peer *peer, msg *GetBlockMessage) {
        var block *types.Block
        var err error
@@ -205,7 +221,7 @@ func (sm *SyncManager) handleGetBlocksMsg(peer *peer, msg *GetBlocksMessage) {
                        continue
                }
 
-               if totalSize+len(rawData) > maxBlockchainResponseSize-16 {
+               if totalSize+len(rawData) > maxBlockchainResponseSize/2 {
                        break
                }
                totalSize += len(rawData)
@@ -237,6 +253,37 @@ func (sm *SyncManager) handleGetHeadersMsg(peer *peer, msg *GetHeadersMessage) {
        }
 }
 
+func (sm *SyncManager) handleGetMerkleBlockMsg(peer *peer, msg *GetMerkleBlockMessage) {
+       var err error
+       var block *types.Block
+       if msg.Height != 0 {
+               block, err = sm.chain.GetBlockByHeight(msg.Height)
+       } else {
+               block, err = sm.chain.GetBlockByHash(msg.GetHash())
+       }
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get block from chain")
+               return
+       }
+
+       blockHash := block.Hash()
+       txStatus, err := sm.chain.GetTransactionStatus(&blockHash)
+       if err != nil {
+               log.WithField("err", err).Warning("fail on handleGetMerkleBlockMsg get transaction status")
+               return
+       }
+
+       ok, err := peer.sendMerkleBlock(block, txStatus)
+       if err != nil {
+               log.WithField("err", err).Error("fail on handleGetMerkleBlockMsg sentMerkleBlock")
+               return
+       }
+
+       if !ok {
+               sm.peers.removePeer(peer.ID())
+       }
+}
+
 func (sm *SyncManager) handleHeadersMsg(peer *peer, msg *HeadersMessage) {
        headers, err := msg.GetHeaders()
        if err != nil {
@@ -340,6 +387,18 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *BlocksMessage:
                sm.handleBlocksMsg(peer, msg)
 
+       case *FilterLoadMessage:
+               sm.handleFilterLoadMsg(peer, msg)
+
+       case *FilterAddMessage:
+               sm.handleFilterAddMsg(peer, msg)
+
+       case *FilterClearMessage:
+               sm.handleFilterClearMsg(peer)
+
+       case *GetMerkleBlockMessage:
+               sm.handleGetMerkleBlockMsg(peer, msg)
+
        default:
                log.Errorf("unknown message type %v", reflect.TypeOf(msg))
        }
@@ -421,6 +480,7 @@ func initDiscover(config *cfg.Config, priv *crypto.PrivKeyEd25519, port uint16)
        }
        nodes := []*discover.Node{}
        for _, seed := range strings.Split(config.P2P.Seeds, ",") {
+               version.Status.AddSeed(seed)
                url := "enode://" + hex.EncodeToString(crypto.Sha256([]byte(seed))) + "@" + seed
                nodes = append(nodes, discover.MustParseNode(url))
        }