OSDN Git Service

netsync: add txs msg (#78)
authoryahtoo <yahtoo.ma@gmail.com>
Tue, 21 May 2019 06:22:26 +0000 (14:22 +0800)
committerPaladz <yzhu101@uottawa.ca>
Tue, 21 May 2019 06:22:26 +0000 (14:22 +0800)
netsync/handle.go
netsync/message.go
netsync/message_test.go
netsync/peer.go
netsync/tx_keeper.go

index ce10e1a..7dc35b9 100644 (file)
@@ -323,6 +323,28 @@ func (sm *SyncManager) handleTransactionMsg(peer *peer, msg *TransactionMessage)
        if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
                sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
        }
+       sm.peers.markTx(peer.ID(), tx.ID)
+}
+
+func (sm *SyncManager) handleTransactionsMsg(peer *peer, msg *TransactionsMessage) {
+       txs, err := msg.GetTransactions()
+       if err != nil {
+               sm.peers.addBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+               return
+       }
+
+       if len(txs) > txsMsgMaxTxNum {
+               sm.peers.addBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+               return
+       }
+
+       for _, tx := range txs {
+               if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
+                       sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+                       return
+               }
+               sm.peers.markTx(peer.ID(), tx.ID)
+       }
 }
 
 func (sm *SyncManager) IsListening() bool {
@@ -365,6 +387,9 @@ func (sm *SyncManager) processMsg(basePeer BasePeer, msgType byte, msg Blockchai
        case *TransactionMessage:
                sm.handleTransactionMsg(peer, msg)
 
+       case *TransactionsMessage:
+               sm.handleTransactionsMsg(peer, msg)
+
        case *MineBlockMessage:
                sm.handleMineBlockMsg(peer, msg)
 
index 1c86d14..b4f702a 100644 (file)
@@ -25,6 +25,7 @@ const (
        BlocksResponseByte  = byte(0x15)
        StatusByte          = byte(0x21)
        NewTransactionByte  = byte(0x30)
+       NewTransactionsByte = byte(0x31)
        NewMineBlockByte    = byte(0x40)
        FilterLoadByte      = byte(0x50)
        FilterAddByte       = byte(0x51)
@@ -33,6 +34,7 @@ const (
        MerkleResponseByte  = byte(0x61)
 
        maxBlockchainResponseSize = 22020096 + 2
+       txsMsgMaxTxNum            = 1024
 )
 
 //BlockchainMessage is a generic message for this reactor.
@@ -50,6 +52,7 @@ var _ = wire.RegisterInterface(
        wire.ConcreteType{&BlocksMessage{}, BlocksResponseByte},
        wire.ConcreteType{&StatusMessage{}, StatusByte},
        wire.ConcreteType{&TransactionMessage{}, NewTransactionByte},
+       wire.ConcreteType{&TransactionsMessage{}, NewTransactionsByte},
        wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
        wire.ConcreteType{&FilterLoadMessage{}, FilterLoadByte},
        wire.ConcreteType{&FilterAddMessage{}, FilterAddByte},
@@ -327,6 +330,43 @@ func (m *TransactionMessage) String() string {
        return fmt.Sprintf("{tx_size: %d, tx_hash: %s}", len(m.RawTx), tx.ID.String())
 }
 
+//TransactionsMessage notify new txs msg
+type TransactionsMessage struct {
+       RawTxs [][]byte
+}
+
+//NewTransactionsMessage construct notify new txs msg
+func NewTransactionsMessage(txs []*types.Tx) (*TransactionsMessage, error) {
+       rawTxs := make([][]byte, 0, len(txs))
+       for _, tx := range txs {
+               rawTx, err := tx.TxData.MarshalText()
+               if err != nil {
+                       return nil, err
+               }
+
+               rawTxs = append(rawTxs, rawTx)
+       }
+       return &TransactionsMessage{RawTxs: rawTxs}, nil
+}
+
+//GetTransactions get txs from msg
+func (m *TransactionsMessage) GetTransactions() ([]*types.Tx, error) {
+       txs := make([]*types.Tx, 0, len(m.RawTxs))
+       for _, rawTx := range m.RawTxs {
+               tx := &types.Tx{}
+               if err := tx.UnmarshalText(rawTx); err != nil {
+                       return nil, err
+               }
+
+               txs = append(txs, tx)
+       }
+       return txs, nil
+}
+
+func (m *TransactionsMessage) String() string {
+       return fmt.Sprintf("{tx_num: %d}", len(m.RawTxs))
+}
+
 //MineBlockMessage new mined block msg
 type MineBlockMessage struct {
        RawBlock []byte
index 5c40a68..14bb71f 100644 (file)
@@ -6,10 +6,78 @@ import (
 
        "github.com/davecgh/go-spew/spew"
 
+       "github.com/vapor/consensus"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
 )
 
+var txs = []*types.Tx{
+       types.NewTx(types.TxData{
+               SerializedSize: uint64(52),
+               Inputs:         []*types.TxInput{types.NewCoinbaseInput([]byte{0x01})},
+               Outputs:        []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+       }),
+       types.NewTx(types.TxData{
+               SerializedSize: uint64(53),
+               Inputs:         []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02})},
+               Outputs:        []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+       }),
+       types.NewTx(types.TxData{
+               SerializedSize: uint64(54),
+               Inputs:         []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+               Outputs:        []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+       }),
+       types.NewTx(types.TxData{
+               SerializedSize: uint64(54),
+               Inputs:         []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+               Outputs:        []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 2000, nil)},
+       }),
+       types.NewTx(types.TxData{
+               SerializedSize: uint64(54),
+               Inputs:         []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+               Outputs:        []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 10000, nil)},
+       }),
+}
+
+func TestTransactionMessage(t *testing.T) {
+       for _, tx := range txs {
+               txMsg, err := NewTransactionMessage(tx)
+               if err != nil {
+                       t.Fatalf("create tx msg err:%s", err)
+               }
+
+               gotTx, err := txMsg.GetTransaction()
+               if err != nil {
+                       t.Fatalf("get txs from txsMsg err:%s", err)
+               }
+               if !reflect.DeepEqual(*tx.Tx, *gotTx.Tx) {
+                       t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTx.Tx))
+               }
+       }
+}
+
+func TestTransactionsMessage(t *testing.T) {
+       txsMsg, err := NewTransactionsMessage(txs)
+       if err != nil {
+               t.Fatalf("create txs msg err:%s", err)
+       }
+
+       gotTxs, err := txsMsg.GetTransactions()
+       if err != nil {
+               t.Fatalf("get txs from txsMsg err:%s", err)
+       }
+
+       if len(gotTxs) != len(txs) {
+               t.Fatal("txs msg test err: number of txs not match ")
+       }
+
+       for i, tx := range txs {
+               if !reflect.DeepEqual(tx.Tx, gotTxs[i].Tx) {
+                       t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTxs[i].Tx))
+               }
+       }
+}
+
 var testBlock = &types.Block{
        BlockHeader: types.BlockHeader{
                Version:   1,
index 246f1bc..794501b 100644 (file)
@@ -297,25 +297,35 @@ func (p *peer) sendMerkleBlock(block *types.Block, txStatuses *bc.TransactionSta
        return ok, nil
 }
 
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
-       for _, tx := range txs {
-               if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+       validTxs := make([]*types.Tx, 0, len(txs))
+       for i, tx := range txs {
+               if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
                        continue
                }
-               msg, err := NewTransactionMessage(tx)
-               if err != nil {
-                       return false, errors.Wrap(err, "failed to tx msg")
-               }
 
-               if p.knownTxs.Has(tx.ID.String()) {
+               validTxs = append(validTxs, tx)
+               if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
                        continue
                }
+
+               msg, err := NewTransactionsMessage(validTxs)
+               if err != nil {
+                       return err
+               }
+
                if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
-                       return ok, nil
+                       return errors.New("failed to send txs msg")
+               }
+
+               for _, validTx := range validTxs {
+                       p.knownTxs.Add(validTx.ID.String())
                }
-               p.knownTxs.Add(tx.ID.String())
+
+               validTxs = make([]*types.Tx, 0, len(txs))
        }
-       return true, nil
+
+       return nil
 }
 
 func (p *peer) sendStatus(header *types.BlockHeader) error {
@@ -481,6 +491,17 @@ func (ps *peerSet) getPeerInfos() []*PeerInfo {
        return result
 }
 
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+       ps.mtx.Lock()
+       peer := ps.peers[peerID]
+       ps.mtx.Unlock()
+
+       if peer == nil {
+               return
+       }
+       peer.markTransaction(&txHash)
+}
+
 func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
index 6c4a8bb..5b95ba9 100644 (file)
@@ -100,8 +100,8 @@ func (sm *SyncManager) txSyncLoop() {
                }).Debug("txSyncLoop sending transactions")
                sending = true
                go func() {
-                       ok, err := peer.sendTransactions(sendTxs)
-                       if !ok {
+                       err := peer.sendTransactions(sendTxs)
+                       if err != nil {
                                sm.peers.removePeer(msg.peerID)
                        }
                        done <- err