if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && err != core.ErrDustTx && !isOrphan {
sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
}
+ sm.peers.markTx(peer.ID(), tx.ID)
+}
+
+func (sm *SyncManager) handleTransactionsMsg(peer *peer, msg *TransactionsMessage) {
+ txs, err := msg.GetTransactions()
+ if err != nil {
+ sm.peers.addBanScore(peer.ID(), 0, 20, "fail on get txs from message")
+ return
+ }
+
+ if len(txs) > txsMsgMaxTxNum {
+ sm.peers.addBanScore(peer.ID(), 20, 0, "exceeded the maximum tx number limit")
+ return
+ }
+
+ for _, tx := range txs {
+ if isOrphan, err := sm.chain.ValidateTx(tx); err != nil && !isOrphan {
+ sm.peers.addBanScore(peer.ID(), 10, 0, "fail on validate tx transaction")
+ return
+ }
+ sm.peers.markTx(peer.ID(), tx.ID)
+ }
}
func (sm *SyncManager) IsListening() bool {
case *TransactionMessage:
sm.handleTransactionMsg(peer, msg)
+ case *TransactionsMessage:
+ sm.handleTransactionsMsg(peer, msg)
+
case *MineBlockMessage:
sm.handleMineBlockMsg(peer, msg)
BlocksResponseByte = byte(0x15)
StatusByte = byte(0x21)
NewTransactionByte = byte(0x30)
+ NewTransactionsByte = byte(0x31)
NewMineBlockByte = byte(0x40)
FilterLoadByte = byte(0x50)
FilterAddByte = byte(0x51)
MerkleResponseByte = byte(0x61)
maxBlockchainResponseSize = 22020096 + 2
+ txsMsgMaxTxNum = 1024
)
//BlockchainMessage is a generic message for this reactor.
wire.ConcreteType{&BlocksMessage{}, BlocksResponseByte},
wire.ConcreteType{&StatusMessage{}, StatusByte},
wire.ConcreteType{&TransactionMessage{}, NewTransactionByte},
+ wire.ConcreteType{&TransactionsMessage{}, NewTransactionsByte},
wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
wire.ConcreteType{&FilterLoadMessage{}, FilterLoadByte},
wire.ConcreteType{&FilterAddMessage{}, FilterAddByte},
return fmt.Sprintf("{tx_size: %d, tx_hash: %s}", len(m.RawTx), tx.ID.String())
}
+//TransactionsMessage notify new txs msg
+type TransactionsMessage struct {
+ RawTxs [][]byte
+}
+
+//NewTransactionsMessage construct notify new txs msg
+func NewTransactionsMessage(txs []*types.Tx) (*TransactionsMessage, error) {
+ rawTxs := make([][]byte, 0, len(txs))
+ for _, tx := range txs {
+ rawTx, err := tx.TxData.MarshalText()
+ if err != nil {
+ return nil, err
+ }
+
+ rawTxs = append(rawTxs, rawTx)
+ }
+ return &TransactionsMessage{RawTxs: rawTxs}, nil
+}
+
+//GetTransactions get txs from msg
+func (m *TransactionsMessage) GetTransactions() ([]*types.Tx, error) {
+ txs := make([]*types.Tx, 0, len(m.RawTxs))
+ for _, rawTx := range m.RawTxs {
+ tx := &types.Tx{}
+ if err := tx.UnmarshalText(rawTx); err != nil {
+ return nil, err
+ }
+
+ txs = append(txs, tx)
+ }
+ return txs, nil
+}
+
+func (m *TransactionsMessage) String() string {
+ return fmt.Sprintf("{tx_num: %d}", len(m.RawTxs))
+}
+
//MineBlockMessage new mined block msg
type MineBlockMessage struct {
RawBlock []byte
"github.com/davecgh/go-spew/spew"
+ "github.com/vapor/consensus"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
)
+var txs = []*types.Tx{
+ types.NewTx(types.TxData{
+ SerializedSize: uint64(52),
+ Inputs: []*types.TxInput{types.NewCoinbaseInput([]byte{0x01})},
+ Outputs: []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+ }),
+ types.NewTx(types.TxData{
+ SerializedSize: uint64(53),
+ Inputs: []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02})},
+ Outputs: []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+ }),
+ types.NewTx(types.TxData{
+ SerializedSize: uint64(54),
+ Inputs: []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+ Outputs: []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 5000, nil)},
+ }),
+ types.NewTx(types.TxData{
+ SerializedSize: uint64(54),
+ Inputs: []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+ Outputs: []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 2000, nil)},
+ }),
+ types.NewTx(types.TxData{
+ SerializedSize: uint64(54),
+ Inputs: []*types.TxInput{types.NewCoinbaseInput([]byte{0x01, 0x02, 0x03})},
+ Outputs: []*types.TxOutput{types.NewIntraChainOutput(*consensus.BTMAssetID, 10000, nil)},
+ }),
+}
+
+func TestTransactionMessage(t *testing.T) {
+ for _, tx := range txs {
+ txMsg, err := NewTransactionMessage(tx)
+ if err != nil {
+ t.Fatalf("create tx msg err:%s", err)
+ }
+
+ gotTx, err := txMsg.GetTransaction()
+ if err != nil {
+ t.Fatalf("get txs from txsMsg err:%s", err)
+ }
+ if !reflect.DeepEqual(*tx.Tx, *gotTx.Tx) {
+ t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTx.Tx))
+ }
+ }
+}
+
+func TestTransactionsMessage(t *testing.T) {
+ txsMsg, err := NewTransactionsMessage(txs)
+ if err != nil {
+ t.Fatalf("create txs msg err:%s", err)
+ }
+
+ gotTxs, err := txsMsg.GetTransactions()
+ if err != nil {
+ t.Fatalf("get txs from txsMsg err:%s", err)
+ }
+
+ if len(gotTxs) != len(txs) {
+ t.Fatal("txs msg test err: number of txs not match ")
+ }
+
+ for i, tx := range txs {
+ if !reflect.DeepEqual(tx.Tx, gotTxs[i].Tx) {
+ t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(tx.Tx), spew.Sdump(gotTxs[i].Tx))
+ }
+ }
+}
+
var testBlock = &types.Block{
BlockHeader: types.BlockHeader{
Version: 1,
return ok, nil
}
-func (p *peer) sendTransactions(txs []*types.Tx) (bool, error) {
- for _, tx := range txs {
- if p.isSPVNode() && !p.isRelatedTx(tx) {
+func (p *peer) sendTransactions(txs []*types.Tx) error {
+ validTxs := make([]*types.Tx, 0, len(txs))
+ for i, tx := range txs {
+ if p.isSPVNode() && !p.isRelatedTx(tx) || p.knownTxs.Has(tx.ID.String()) {
continue
}
- msg, err := NewTransactionMessage(tx)
- if err != nil {
- return false, errors.Wrap(err, "failed to tx msg")
- }
- if p.knownTxs.Has(tx.ID.String()) {
+ validTxs = append(validTxs, tx)
+ if len(validTxs) != txsMsgMaxTxNum && i != len(txs)-1 {
continue
}
+
+ msg, err := NewTransactionsMessage(validTxs)
+ if err != nil {
+ return err
+ }
+
if ok := p.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg}); !ok {
- return ok, nil
+ return errors.New("failed to send txs msg")
+ }
+
+ for _, validTx := range validTxs {
+ p.knownTxs.Add(validTx.ID.String())
}
- p.knownTxs.Add(tx.ID.String())
+
+ validTxs = make([]*types.Tx, 0, len(txs))
}
- return true, nil
+
+ return nil
}
func (p *peer) sendStatus(header *types.BlockHeader) error {
return result
}
+func (ps *peerSet) markTx(peerID string, txHash bc.Hash) {
+ ps.mtx.Lock()
+ peer := ps.peers[peerID]
+ ps.mtx.Unlock()
+
+ if peer == nil {
+ return
+ }
+ peer.markTransaction(&txHash)
+}
+
func (ps *peerSet) peersWithoutBlock(hash *bc.Hash) []*peer {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
}).Debug("txSyncLoop sending transactions")
sending = true
go func() {
- ok, err := peer.sendTransactions(sendTxs)
- if !ok {
+ err := peer.sendTransactions(sendTxs)
+ if err != nil {
sm.peers.removePeer(msg.peerID)
}
done <- err