OSDN Git Service

improve net sync test cases (#375)
authoryahtoo <yahtoo.ma@gmail.com>
Fri, 2 Aug 2019 06:58:17 +0000 (14:58 +0800)
committerPaladz <yzhu101@uottawa.ca>
Fri, 2 Aug 2019 06:58:17 +0000 (14:58 +0800)
* Add netsync test case

* Add netsync test case

* Opz code format

12 files changed:
netsync/chainmgr/fast_sync.go
netsync/chainmgr/fast_sync_test.go
netsync/chainmgr/peers_test.go [new file with mode: 0644]
netsync/chainmgr/tool_test.go
netsync/chainmgr/tx_keeper_test.go
netsync/consensusmgr/block_fetcher.go
netsync/consensusmgr/consensus_msg.go
netsync/consensusmgr/handle.go
netsync/consensusmgr/handle_test.go [new file with mode: 0644]
netsync/peers/peer.go
netsync/peers/peer_test.go [new file with mode: 0644]
netsync/sync_manager.go

index 550d2df..c2e7bc0 100644 (file)
@@ -20,8 +20,10 @@ var (
        fastSyncPivotGap       = uint64(64)
        minGapStartFastSync    = uint64(128)
 
-       errNoSyncPeer   = errors.New("can't find sync peer")
-       errSkeletonSize = errors.New("fast sync skeleton size wrong")
+       errNoSyncPeer      = errors.New("can't find sync peer")
+       errSkeletonSize    = errors.New("fast sync skeleton size wrong")
+       errNoMainSkeleton  = errors.New("No main skeleton found")
+       errNoSkeletonFound = errors.New("No skeleton found")
 )
 
 type fastSync struct {
@@ -83,12 +85,12 @@ func (fs *fastSync) createFetchBlocksTasks(stopBlock *types.Block) ([]*fetchBloc
        stopHash := stopBlock.Hash()
        skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1)
        if len(skeletonMap) == 0 {
-               return nil, errors.New("No skeleton found")
+               return nil, errNoSkeletonFound
        }
 
        mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()]
        if !ok {
-               return nil, errors.New("No main skeleton found")
+               return nil, errNoMainSkeleton
        }
 
        if len(mainSkeleton) < minSizeOfSyncSkeleton || len(mainSkeleton) > maxSizeOfSyncSkeleton {
index ffacb56..dcb7186 100644 (file)
@@ -3,12 +3,15 @@ package chainmgr
 import (
        "io/ioutil"
        "os"
+       "reflect"
+       "sync"
        "testing"
        "time"
 
        "github.com/vapor/consensus"
        dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/errors"
+       "github.com/vapor/netsync/peers"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
        "github.com/vapor/test/mock"
@@ -181,3 +184,175 @@ func TestFastBlockSync(t *testing.T) {
                }
        }
 }
+
+type mockFetcher struct {
+       baseChain  []*types.Block
+       peerStatus map[string][]*types.Block
+       peers      []string
+       testType   int
+}
+
+func (mf *mockFetcher) resetParameter() {
+       return
+}
+
+func (mf *mockFetcher) addSyncPeer(peerID string) {
+       return
+}
+
+func (mf *mockFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
+       return nil, nil
+}
+
+func (mf *mockFetcher) parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
+       return
+}
+
+func (mf *mockFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
+       result := make(map[string][]*types.BlockHeader)
+       switch mf.testType {
+       case 1:
+               result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader,
+                       &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader,
+                       &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader,
+               }
+               result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+                       &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+                       &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader,
+               }
+
+       case 2:
+               result["peer1"] = []*types.BlockHeader{}
+       case 3:
+       case 4:
+               result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+                       &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+                       &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader,
+               }
+       case 5:
+               result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader,
+                       &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader,
+                       &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader,
+               }
+               result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+                       &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+                       &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader,
+               }
+       }
+       return result
+}
+
+func TestCreateFetchBlocksTasks(t *testing.T) {
+       baseChain := mockBlocks(nil, 1000)
+       chainX := append(baseChain, mockBlocks(baseChain[1000], 2000)...)
+       chainY := append(baseChain, mockBlocks(baseChain[1000], 1900)...)
+       peerStatus := make(map[string][]*types.Block)
+       peerStatus["peer1"] = chainX
+       peerStatus["peer2"] = chainY
+       type syncPeer struct {
+               peer               *P2PPeer
+               bestHeight         uint64
+               irreversibleHeight uint64
+       }
+
+       cases := []struct {
+               peers        []*syncPeer
+               mainSyncPeer string
+               testType     int
+               wantTasks    []*fetchBlocksWork
+               wantErr      error
+       }{
+               // normal test
+               {
+                       peers: []*syncPeer{
+                               {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+                               {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+                       },
+                       mainSyncPeer: "peer1",
+                       testType:     1,
+                       wantTasks: []*fetchBlocksWork{
+                               {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader},
+                               {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader},
+                               {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader},
+                               {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader},
+                       },
+                       wantErr: nil,
+               },
+               // test no sync peer
+               {
+                       peers:        []*syncPeer{},
+                       mainSyncPeer: "peer1",
+                       testType:     0,
+                       wantTasks:    nil,
+                       wantErr:      errNoSyncPeer,
+               },
+               // primary sync peer skeleton size error
+               {
+                       peers: []*syncPeer{
+                               {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+                               {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+                       },
+                       mainSyncPeer: "peer1",
+                       testType:     2,
+                       wantTasks:    nil,
+                       wantErr:      errSkeletonSize,
+               },
+               // no skeleton return
+               {
+                       peers: []*syncPeer{
+                               {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+                               {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+                       },
+                       mainSyncPeer: "peer1",
+                       testType:     3,
+                       wantTasks:    nil,
+                       wantErr:      errNoSkeletonFound,
+               },
+               // no main skeleton found
+               {
+                       peers: []*syncPeer{
+                               {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+                               {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+                       },
+                       mainSyncPeer: "peer1",
+                       testType:     4,
+                       wantTasks:    nil,
+                       wantErr:      errNoMainSkeleton,
+               },
+               // skeleton length mismatch
+               {
+                       peers: []*syncPeer{
+                               {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+                               {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+                       },
+                       mainSyncPeer: "peer1",
+                       testType:     5,
+                       wantTasks: []*fetchBlocksWork{
+                               {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader},
+                               {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader},
+                               {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader},
+                               {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader},
+                       },
+                       wantErr: nil,
+               },
+       }
+
+       for i, c := range cases {
+               peers := peers.NewPeerSet(NewPeerSet())
+               for _, syncPeer := range c.peers {
+                       peers.AddPeer(syncPeer.peer)
+                       peers.SetStatus(syncPeer.peer.id, syncPeer.bestHeight, nil)
+                       peers.SetIrreversibleStatus(syncPeer.peer.id, syncPeer.irreversibleHeight, nil)
+               }
+               mockChain := mock.NewChain(nil)
+               fs := newFastSync(mockChain, &mockFetcher{baseChain: baseChain, peerStatus: peerStatus, testType: c.testType}, nil, peers)
+               fs.mainSyncPeer = fs.peers.GetPeer(c.mainSyncPeer)
+               tasks, err := fs.createFetchBlocksTasks(baseChain[700])
+               if err != c.wantErr {
+                       t.Errorf("case %d: got %v want %v", i, err, c.wantErr)
+               }
+               if !reflect.DeepEqual(tasks, c.wantTasks) {
+                       t.Errorf("case %d: got %v want %v", i, tasks, c.wantTasks)
+               }
+       }
+}
diff --git a/netsync/chainmgr/peers_test.go b/netsync/chainmgr/peers_test.go
new file mode 100644 (file)
index 0000000..cc159fa
--- /dev/null
@@ -0,0 +1,116 @@
+package chainmgr
+
+import (
+       "testing"
+)
+
+func TestAddDel(t *testing.T) {
+       syncPeers := newFastSyncPeers()
+       peers := make(map[string]bool)
+       peers["Peer1"] = true
+       peers["Peer2"] = true
+       for k := range peers {
+               syncPeers.add(k)
+               syncPeers.add(k)
+       }
+       if syncPeers.size() != len(peers) {
+               t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers))
+       }
+
+       syncPeers.delete("Peer1")
+       if syncPeers.size() != 1 {
+               t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1)
+       }
+
+       syncPeers.delete("Peer1")
+       if syncPeers.size() != 1 {
+               t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1)
+       }
+}
+
+func TestIdlePeers(t *testing.T) {
+       syncPeers := newFastSyncPeers()
+       peers := make(map[string]bool)
+       peers["Peer1"] = true
+       peers["Peer2"] = true
+       for k := range peers {
+               syncPeers.add(k)
+               syncPeers.add(k)
+       }
+
+       idlePeers := syncPeers.selectIdlePeers()
+       if len(idlePeers) != len(peers) {
+               t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+       }
+
+       for _, peer := range idlePeers {
+               if ok := peers[peer]; !ok {
+                       t.Errorf("selcet idle peers test err: want peers %v got %v", peers, idlePeers)
+               }
+       }
+
+       idlePeers = syncPeers.selectIdlePeers()
+       if len(idlePeers) != 0 {
+               t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0)
+       }
+
+}
+
+func TestIdlePeer(t *testing.T) {
+       syncPeers := newFastSyncPeers()
+       peers := make(map[string]bool)
+       peers["Peer1"] = true
+       peers["Peer2"] = true
+       for k := range peers {
+               syncPeers.add(k)
+               syncPeers.add(k)
+       }
+       idlePeer, err := syncPeers.selectIdlePeer()
+       if err != nil {
+               t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil)
+       }
+
+       if ok := peers[idlePeer]; !ok {
+               t.Error("selcet idle peers test err.")
+       }
+       idlePeer, err = syncPeers.selectIdlePeer()
+       if err != nil {
+               t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil)
+       }
+
+       if ok := peers[idlePeer]; !ok {
+               t.Error("selcet idle peers test err.")
+       }
+       idlePeer, err = syncPeers.selectIdlePeer()
+       if err != errNoValidFastSyncPeer {
+               t.Errorf("selcet idle peers test err: got %v\nwant %v", err, errNoValidFastSyncPeer)
+       }
+}
+
+func TestSetIdle(t *testing.T) {
+       syncPeers := newFastSyncPeers()
+       peers := make(map[string]bool)
+       peers["Peer2"] = true
+       for k := range peers {
+               syncPeers.add(k)
+       }
+       if syncPeers.size() != len(peers) {
+               t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers))
+       }
+       idlePeers := syncPeers.selectIdlePeers()
+       if len(idlePeers) != len(peers) {
+               t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+       }
+
+       syncPeers.setIdle("Peer1")
+       idlePeers = syncPeers.selectIdlePeers()
+       if len(idlePeers) != 0 {
+               t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0)
+       }
+
+       syncPeers.setIdle("Peer2")
+       idlePeers = syncPeers.selectIdlePeers()
+       if len(idlePeers) != len(peers) {
+               t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+       }
+}
index 4a3badd..1905e11 100644 (file)
@@ -11,6 +11,7 @@ import (
        dbm "github.com/vapor/database/leveldb"
 
        "github.com/vapor/consensus"
+       "github.com/vapor/event"
        "github.com/vapor/netsync/peers"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
@@ -197,11 +198,12 @@ func mockSync(blocks []*types.Block, mempool *mock.Mempool, fastSyncDB dbm.DB) *
        }
 
        return &Manager{
-               chain:       chain,
-               blockKeeper: newBlockKeeper(chain, peers, fastSyncDB),
-               peers:       peers,
-               mempool:     mempool,
-               txSyncCh:    make(chan *txSyncMsg),
+               chain:           chain,
+               blockKeeper:     newBlockKeeper(chain, peers, fastSyncDB),
+               peers:           peers,
+               mempool:         mempool,
+               txSyncCh:        make(chan *txSyncMsg),
+               eventDispatcher: event.NewDispatcher(),
        }
 }
 
index dd269fd..b5ac991 100644 (file)
@@ -12,6 +12,7 @@ import (
        "github.com/vapor/consensus"
        dbm "github.com/vapor/database/leveldb"
        "github.com/vapor/protocol"
+       core "github.com/vapor/protocol"
        "github.com/vapor/protocol/bc"
        "github.com/vapor/protocol/bc/types"
        "github.com/vapor/test/mock"
@@ -19,8 +20,11 @@ import (
 
 const txsNumber = 2000
 
-func getTransactions() []*types.Tx {
-       txs := []*types.Tx{}
+type mempool struct {
+}
+
+func (m *mempool) GetTransactions() []*core.TxDesc {
+       txs := []*core.TxDesc{}
        for i := 0; i < txsNumber; i++ {
                txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51})
                txInput.CommitmentSuffix = []byte{0, 1, 2}
@@ -36,12 +40,13 @@ func getTransactions() []*types.Tx {
                                Outputs: []*types.TxOutput{
                                        types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}),
                                },
+                               SerializedSize: 1000,
                        },
                        Tx: &bc.Tx{
                                ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)},
                        },
                }
-               txs = append(txs, tx)
+               txs = append(txs, &core.TxDesc{Tx: tx})
        }
        return txs
 }
@@ -58,7 +63,7 @@ func TestSyncMempool(t *testing.T) {
        blocks := mockBlocks(nil, 5)
        a := mockSync(blocks, &mock.Mempool{}, testDBA)
        b := mockSync(blocks, &mock.Mempool{}, testDBB)
-
+       a.mempool = &mempool{}
        netWork := NewNetWork()
        netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
        netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
@@ -71,8 +76,7 @@ func TestSyncMempool(t *testing.T) {
 
        go a.syncMempoolLoop()
        a.syncMempool("test node B")
-       wantTxs := getTransactions()
-       a.txSyncCh <- &txSyncMsg{"test node B", wantTxs}
+       wantTxs := a.mempool.GetTransactions()
 
        timeout := time.NewTimer(2 * time.Second)
        defer timeout.Stop()
@@ -99,14 +103,82 @@ out:
 
        for i, gotTx := range gotTxs {
                index := gotTx.Tx.Inputs[0].Amount()
-               if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Inputs[0].Amount()) {
-                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Inputs))
+               if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
+               }
+
+               if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
+               }
+       }
+}
+
+func TestBroadcastTxsLoop(t *testing.T) {
+       tmpDir, err := ioutil.TempDir(".", "")
+       if err != nil {
+               t.Fatalf("failed to create temporary data folder: %v", err)
+       }
+       defer os.RemoveAll(tmpDir)
+       testDBA := dbm.NewDB("testdba", "leveldb", tmpDir)
+       testDBB := dbm.NewDB("testdbb", "leveldb", tmpDir)
+
+       blocks := mockBlocks(nil, 5)
+       a := mockSync(blocks, &mock.Mempool{}, testDBA)
+       b := mockSync(blocks, &mock.Mempool{}, testDBB)
+       a.mempool = &mempool{}
+       netWork := NewNetWork()
+       netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
+       netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
+       if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
+               t.Errorf("fail on peer hands shake %v", err)
+       } else {
+               go B2A.postMan()
+               go A2B.postMan()
+       }
+       a.txMsgSub, err = a.eventDispatcher.Subscribe(core.TxMsgEvent{})
+       if err != nil {
+               t.Fatal("txMsgSub subscribe err", err)
+       }
+       go a.broadcastTxsLoop()
+       wantTxs := a.mempool.GetTransactions()
+       txsNum := 50
+       for i, txD := range wantTxs {
+               if i >= txsNum {
+                       break
                }
+               a.eventDispatcher.Post(core.TxMsgEvent{TxMsg: &core.TxPoolMsg{TxDesc: txD, MsgType: core.MsgNewTx}})
+       }
+       timeout := time.NewTimer(2 * time.Second)
+       defer timeout.Stop()
+       ticker := time.NewTicker(500 * time.Millisecond)
+       defer ticker.Stop()
 
-               if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Outputs[0].AssetAmount()) {
-                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Outputs))
+       gotTxs := []*protocol.TxDesc{}
+       for {
+               select {
+               case <-ticker.C:
+                       gotTxs = b.mempool.GetTransactions()
+                       if len(gotTxs) >= txsNum {
+                               goto out
+                       }
+               case <-timeout.C:
+                       t.Fatalf("mempool sync timeout")
                }
+       }
 
+out:
+       if len(gotTxs) != txsNum {
+               t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
        }
 
+       for i, gotTx := range gotTxs {
+               index := gotTx.Tx.Inputs[0].Amount()
+               if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
+               }
+
+               if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
+                       t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
+               }
+       }
 }
index 3c92b38..8c28ff9 100644 (file)
@@ -4,7 +4,6 @@ import (
        "github.com/sirupsen/logrus"
        "gopkg.in/karalabe/cookiejar.v2/collections/prque"
 
-       "github.com/vapor/netsync/peers"
        "github.com/vapor/p2p/security"
        "github.com/vapor/protocol/bc"
 )
@@ -19,7 +18,7 @@ const (
 // and scheduling them for retrieval.
 type blockFetcher struct {
        chain Chain
-       peers *peers.PeerSet
+       peers Peers
 
        newBlockCh chan *blockMsg
        queue      *prque.Prque
@@ -27,7 +26,7 @@ type blockFetcher struct {
 }
 
 //NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
-func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
+func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
        f := &blockFetcher{
                chain:      chain,
                peers:      peers,
index 33fee3d..f47cf81 100644 (file)
@@ -69,7 +69,7 @@ func (bs *BlockSignatureMsg) BroadcastMarkSendRecord(ps *peers.PeerSet, peers []
 
 // BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
 func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
-       return ps.PeersWithoutSign(bs.Signature)
+       return ps.PeersWithoutSignature(bs.Signature)
 }
 
 // BlockProposeMsg block propose message transferred between nodes.
index 65fb9f2..37f7ee8 100644 (file)
@@ -26,6 +26,17 @@ type Chain interface {
        ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
 }
 
+type Peers interface {
+       AddPeer(peer peers.BasePeer)
+       BroadcastMsg(bm peers.BroadcastMsg) error
+       GetPeer(id string) *peers.Peer
+       MarkBlock(peerID string, hash *bc.Hash)
+       MarkBlockSignature(peerID string, signature []byte)
+       ProcessIllegal(peerID string, level byte, reason string)
+       RemovePeer(peerID string)
+       SetStatus(peerID string, height uint64, hash *bc.Hash)
+}
+
 type blockMsg struct {
        block  *types.Block
        peerID string
@@ -35,7 +46,7 @@ type blockMsg struct {
 type Manager struct {
        sw              Switch
        chain           Chain
-       peers           *peers.PeerSet
+       peers           Peers
        blockFetcher    *blockFetcher
        eventDispatcher *event.Dispatcher
 
@@ -43,7 +54,7 @@ type Manager struct {
 }
 
 // NewManager create new manager.
-func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
+func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
        manager := &Manager{
                sw:              sw,
                chain:           chain,
diff --git a/netsync/consensusmgr/handle_test.go b/netsync/consensusmgr/handle_test.go
new file mode 100644 (file)
index 0000000..d5e1569
--- /dev/null
@@ -0,0 +1,231 @@
+package consensusmgr
+
+import (
+       "math/rand"
+       "net"
+       "reflect"
+       "testing"
+       "time"
+
+       "github.com/tendermint/tmlibs/flowrate"
+
+       "github.com/vapor/consensus"
+       "github.com/vapor/event"
+       "github.com/vapor/netsync/peers"
+       "github.com/vapor/p2p"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+type p2peer struct {
+}
+
+func (p *p2peer) Addr() net.Addr {
+       return nil
+}
+
+func (p *p2peer) ID() string {
+       return ""
+}
+
+func (p *p2peer) RemoteAddrHost() string {
+       return ""
+}
+func (p *p2peer) ServiceFlag() consensus.ServiceFlag {
+       return 0
+}
+func (p *p2peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+       return nil, nil
+}
+func (p *p2peer) TrySend(byte, interface{}) bool {
+       return true
+}
+func (p *p2peer) IsLAN() bool {
+       return false
+}
+
+func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
+       blocks := []*types.Block{}
+       indexBlock := &types.Block{}
+       if startBlock == nil {
+               indexBlock = &types.Block{BlockHeader: types.BlockHeader{Version: uint64(rand.Uint32())}}
+               blocks = append(blocks, indexBlock)
+       } else {
+               indexBlock = startBlock
+       }
+
+       for indexBlock.Height < height {
+               block := &types.Block{
+                       BlockHeader: types.BlockHeader{
+                               Height:            indexBlock.Height + 1,
+                               PreviousBlockHash: indexBlock.Hash(),
+                               Version:           uint64(rand.Uint32()),
+                       },
+               }
+               blocks = append(blocks, block)
+               indexBlock = block
+       }
+       return blocks
+}
+
+type mockSW struct {
+}
+
+func (s *mockSW) AddReactor(name string, reactor p2p.Reactor) p2p.Reactor {
+       return nil
+}
+
+type mockChain struct {
+}
+
+func (c *mockChain) BestBlockHeight() uint64 {
+       return 0
+}
+
+func (c *mockChain) GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) {
+       return nil, nil
+}
+
+func (c *mockChain) ProcessBlock(*types.Block) (bool, error) {
+       return false, nil
+}
+
+func (c *mockChain) ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error {
+       return nil
+}
+
+type mockPeers struct {
+       msgCount       *int
+       knownBlock     *bc.Hash
+       blockHeight    *uint64
+       knownSignature *[]byte
+}
+
+func newMockPeers(msgCount *int, knownBlock *bc.Hash, blockHeight *uint64, signature *[]byte) *mockPeers {
+       return &mockPeers{
+               msgCount:       msgCount,
+               knownBlock:     knownBlock,
+               blockHeight:    blockHeight,
+               knownSignature: signature,
+       }
+}
+
+func (ps *mockPeers) AddPeer(peer peers.BasePeer) {
+
+}
+
+func (ps *mockPeers) BroadcastMsg(bm peers.BroadcastMsg) error {
+       *ps.msgCount++
+       return nil
+}
+func (ps *mockPeers) GetPeer(id string) *peers.Peer {
+       return &peers.Peer{BasePeer: &p2peer{}}
+}
+func (ps *mockPeers) MarkBlock(peerID string, hash *bc.Hash) {
+       *ps.knownBlock = *hash
+}
+
+func (ps *mockPeers) MarkBlockSignature(peerID string, signature []byte) {
+       *ps.knownSignature = append(*ps.knownSignature, signature...)
+}
+
+func (ps *mockPeers) ProcessIllegal(peerID string, level byte, reason string) {
+
+}
+func (p *mockPeers) RemovePeer(peerID string) {
+
+}
+func (ps *mockPeers) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+       *ps.blockHeight = height
+}
+
+func TestBlockProposeMsgBroadcastLoop(t *testing.T) {
+       dispatcher := event.NewDispatcher()
+       msgCount := 0
+       blockHeight := 100
+       mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher)
+       blocks := mockBlocks(nil, uint64(blockHeight))
+
+       mgr.Start()
+       defer mgr.Stop()
+       time.Sleep(10 * time.Millisecond)
+       for _, block := range blocks {
+               mgr.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block})
+       }
+       time.Sleep(10 * time.Millisecond)
+       if msgCount != blockHeight+1 {
+               t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1)
+       }
+}
+
+func TestBlockSignatureMsgBroadcastLoop(t *testing.T) {
+       dispatcher := event.NewDispatcher()
+       msgCount := 0
+       blockHeight := 100
+       mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher)
+       blocks := mockBlocks(nil, uint64(blockHeight))
+
+       mgr.Start()
+       defer mgr.Stop()
+       time.Sleep(10 * time.Millisecond)
+       for _, block := range blocks {
+               mgr.eventDispatcher.Post(event.BlockSignatureEvent{BlockHash: block.Hash(), Signature: []byte{0x1, 0x2}, XPub: []byte{0x011, 0x022}})
+       }
+       time.Sleep(10 * time.Millisecond)
+       if msgCount != blockHeight+1 {
+               t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1)
+       }
+}
+
+func TestProcessBlockProposeMsg(t *testing.T) {
+       dispatcher := event.NewDispatcher()
+       msgCount := 0
+       var knownBlock bc.Hash
+       blockHeight := uint64(0)
+       peerID := "Peer1"
+       mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, &knownBlock, &blockHeight, nil), dispatcher)
+       block := &types.Block{
+               BlockHeader: types.BlockHeader{
+                       Height:            100,
+                       PreviousBlockHash: bc.NewHash([32]byte{0x1}),
+                       Version:           uint64(rand.Uint32()),
+               },
+       }
+       msg, err := NewBlockProposeMsg(block)
+       if err != nil {
+               t.Fatal("create new block propose msg err", err)
+       }
+
+       mgr.processMsg(peerID, 0, msg)
+       if knownBlock != block.Hash() {
+               t.Fatalf("mark propose block msg err. got:%d\n want:%d", knownBlock, block.Hash())
+       }
+
+       if blockHeight != block.Height {
+               t.Fatalf("set peer status err. got:%d\n want:%d", blockHeight, block.Height)
+       }
+}
+
+func TestProcessBlockSignatureMsg(t *testing.T) {
+       dispatcher := event.NewDispatcher()
+       msgCount := 0
+       knownSignature := []byte{}
+       peerID := "Peer1"
+       mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, &knownSignature), dispatcher)
+       block := &types.Block{
+               BlockHeader: types.BlockHeader{
+                       Height:            100,
+                       PreviousBlockHash: bc.NewHash([32]byte{0x1}),
+                       Version:           uint64(rand.Uint32()),
+               },
+       }
+
+       signature := []byte{0x01, 0x02}
+       msg := NewBlockSignatureMsg(block.Hash(), signature, []byte{0x03, 0x04})
+
+       mgr.processMsg(peerID, 0, msg)
+
+       if !reflect.DeepEqual(knownSignature, signature) {
+               t.Fatalf("set peer status err. got:%d\n want:%d", knownSignature, signature)
+       }
+}
index ef90812..e0c52f5 100644 (file)
@@ -252,32 +252,6 @@ func (p *Peer) markTransaction(hash *bc.Hash) {
        p.knownTxs.Add(hash.String())
 }
 
-func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       var peers []string
-       for _, peer := range ps.peers {
-               if !peer.knownBlocks.Has(hash.String()) {
-                       peers = append(peers, peer.ID())
-               }
-       }
-       return peers
-}
-
-func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       var peers []string
-       for _, peer := range ps.peers {
-               if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
-                       peers = append(peers, peer.ID())
-               }
-       }
-       return peers
-}
-
 func (p *Peer) SendBlock(block *types.Block) (bool, error) {
        msg, err := msgs.NewBlockMessage(block)
        if err != nil {
@@ -544,14 +518,6 @@ func (ps *PeerSet) BroadcastTx(tx *types.Tx) error {
        return nil
 }
 
-func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
-       if errors.Root(err) == ErrPeerMisbehave {
-               ps.ProcessIllegal(peerID, level, err.Error())
-       } else {
-               ps.RemovePeer(peerID)
-       }
-}
-
 // Peer retrieves the registered peer with the given id.
 func (ps *PeerSet) GetPeer(id string) *Peer {
        ps.mtx.RLock()
@@ -618,14 +584,27 @@ func (ps *PeerSet) MarkTx(peerID string, txHash bc.Hash) {
        peer.markTransaction(&txHash)
 }
 
-func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
        ps.mtx.RLock()
        defer ps.mtx.RUnlock()
 
-       peers := []*Peer{}
+       var peers []string
        for _, peer := range ps.peers {
                if !peer.knownBlocks.Has(hash.String()) {
-                       peers = append(peers, peer)
+                       peers = append(peers, peer.ID())
+               }
+       }
+       return peers
+}
+
+func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
+       ps.mtx.RLock()
+       defer ps.mtx.RUnlock()
+
+       var peers []string
+       for _, peer := range ps.peers {
+               if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
+                       peers = append(peers, peer.ID())
                }
        }
        return peers
@@ -681,10 +660,3 @@ func (ps *PeerSet) SetIrreversibleStatus(peerID string, height uint64, hash *bc.
 
        peer.SetIrreversibleStatus(height, hash)
 }
-
-func (ps *PeerSet) Size() int {
-       ps.mtx.RLock()
-       defer ps.mtx.RUnlock()
-
-       return len(ps.peers)
-}
diff --git a/netsync/peers/peer_test.go b/netsync/peers/peer_test.go
new file mode 100644 (file)
index 0000000..27a1625
--- /dev/null
@@ -0,0 +1,417 @@
+package peers
+
+import (
+       "net"
+       "reflect"
+       "testing"
+
+       "github.com/davecgh/go-spew/spew"
+       "github.com/tendermint/tmlibs/flowrate"
+       "github.com/vapor/consensus"
+       "github.com/vapor/p2p/security"
+       "github.com/vapor/protocol/bc"
+       "github.com/vapor/protocol/bc/types"
+)
+
+var (
+       peer1ID = "PEER1"
+       peer2ID = "PEER2"
+       peer3ID = "PEER3"
+       peer4ID = "PEER4"
+
+       block1000Hash = bc.NewHash([32]byte{0x01, 0x02})
+       block2000Hash = bc.NewHash([32]byte{0x02, 0x03})
+       block3000Hash = bc.NewHash([32]byte{0x03, 0x04})
+)
+
+type basePeer struct {
+       id          string
+       serviceFlag consensus.ServiceFlag
+       isLan       bool
+}
+
+func (bp *basePeer) Addr() net.Addr {
+       return nil
+}
+
+func (bp *basePeer) ID() string {
+       return bp.id
+}
+
+func (bp *basePeer) RemoteAddrHost() string {
+       switch bp.ID() {
+       case peer1ID:
+               return peer1ID
+       case peer2ID:
+               return peer2ID
+       case peer3ID:
+               return peer3ID
+       case peer4ID:
+               return peer4ID
+       }
+       return ""
+}
+
+func (bp *basePeer) ServiceFlag() consensus.ServiceFlag {
+       return bp.serviceFlag
+}
+
+func (bp *basePeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+       return nil, nil
+}
+
+func (bp *basePeer) TrySend(byte, interface{}) bool {
+       return true
+}
+
+func (bp *basePeer) IsLAN() bool {
+       return bp.isLan
+}
+
+func TestSetPeerStatus(t *testing.T) {
+       peer := newPeer(&basePeer{})
+       height := uint64(100)
+       hash := bc.NewHash([32]byte{0x1, 0x2})
+       peer.SetBestStatus(height, &hash)
+       if peer.Height() != height {
+               t.Fatalf("test set best status err. got %d want %d", peer.Height(), height)
+       }
+}
+
+func TestSetIrreversibleStatus(t *testing.T) {
+       peer := newPeer(&basePeer{})
+       height := uint64(100)
+       hash := bc.NewHash([32]byte{0x1, 0x2})
+       peer.SetIrreversibleStatus(height, &hash)
+       if peer.IrreversibleHeight() != height {
+               t.Fatalf("test set Irreversible status err. got %d want %d", peer.Height(), height)
+       }
+}
+
+func TestAddFilterAddresses(t *testing.T) {
+       peer := newPeer(&basePeer{})
+       tx := types.NewTx(types.TxData{
+               Inputs: []*types.TxInput{
+                       types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")),
+               },
+               Outputs: []*types.TxOutput{
+                       types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")),
+               },
+       })
+
+       peer.AddFilterAddresses([][]byte{[]byte("spendProgram")})
+       if !peer.isRelatedTx(tx) {
+               t.Fatal("test filter addresses error.")
+       }
+
+       peer.AddFilterAddresses([][]byte{[]byte("testProgram")})
+       if peer.isRelatedTx(tx) {
+               t.Fatal("test filter addresses error.")
+       }
+}
+
+func TestFilterClear(t *testing.T) {
+       peer := newPeer(&basePeer{})
+       tx := types.NewTx(types.TxData{
+               Inputs: []*types.TxInput{
+                       types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")),
+               },
+               Outputs: []*types.TxOutput{
+                       types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")),
+               },
+       })
+
+       peer.AddFilterAddresses([][]byte{[]byte("spendProgram")})
+       if !peer.isRelatedTx(tx) {
+               t.Fatal("test filter addresses error.")
+       }
+
+       peer.FilterClear()
+       if peer.isRelatedTx(tx) {
+               t.Fatal("test filter addresses error.")
+       }
+}
+
+func TestGetRelatedTxAndStatus(t *testing.T) {
+       peer := newPeer(&basePeer{})
+       txs := []*types.Tx{
+               types.NewTx(types.TxData{
+                       Inputs: []*types.TxInput{
+                               types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram1")),
+                       },
+                       Outputs: []*types.TxOutput{
+                               types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram1")),
+                       },
+               }),
+               types.NewTx(types.TxData{
+                       Inputs: []*types.TxInput{
+                               types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram2")),
+                       },
+                       Outputs: []*types.TxOutput{
+                               types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram2")),
+                       },
+               }),
+               types.NewTx(types.TxData{
+                       Inputs: []*types.TxInput{
+                               types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram3")),
+                       },
+                       Outputs: []*types.TxOutput{
+                               types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram3")),
+                       },
+               }),
+       }
+       txStatuses := &bc.TransactionStatus{
+               VerifyStatus: []*bc.TxVerifyResult{{StatusFail: true}, {StatusFail: false}, {StatusFail: false}},
+       }
+       peer.AddFilterAddresses([][]byte{[]byte("spendProgram1"), []byte("outProgram3")})
+       gotTxs, gotStatus := peer.getRelatedTxAndStatus(txs, txStatuses)
+       if len(gotTxs) != 2 {
+               t.Error("TestGetRelatedTxAndStatus txs size error")
+       }
+
+       if !reflect.DeepEqual(*gotTxs[0].Tx, *txs[0].Tx) {
+               t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[0].Tx), spew.Sdump(txs[0].Tx))
+       }
+
+       if !reflect.DeepEqual(*gotTxs[1].Tx, *txs[2].Tx) {
+               t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[1].Tx), spew.Sdump(txs[2].Tx))
+       }
+
+       if gotStatus[0].StatusFail != true || gotStatus[1].StatusFail != false {
+               t.Error("TestGetRelatedTxAndStatus txs status error")
+       }
+}
+
+type basePeerSet struct {
+}
+
+func (bp *basePeerSet) StopPeerGracefully(string) {
+
+}
+
+func (bp *basePeerSet) IsBanned(ip string, level byte, reason string) bool {
+       switch ip {
+       case peer1ID:
+               return true
+       case peer2ID:
+               return false
+       case peer3ID:
+               return true
+       case peer4ID:
+               return false
+       }
+       return false
+}
+
+func TestMarkBlock(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+       ps.AddPeer(&basePeer{id: peer3ID})
+
+       blockHash := bc.NewHash([32]byte{0x01, 0x02})
+       ps.MarkBlock(peer1ID, &blockHash)
+       targetPeers := []string{peer2ID, peer3ID}
+
+       peers := ps.PeersWithoutBlock(blockHash)
+       if len(peers) != len(targetPeers) {
+               t.Fatalf("test mark block err. Number of target peers %d got %d", 1, len(peers))
+       }
+
+       for _, targetPeer := range targetPeers {
+               flag := false
+               for _, gotPeer := range peers {
+                       if gotPeer == targetPeer {
+                               flag = true
+                               break
+                       }
+               }
+               if !flag {
+                       t.Errorf("test mark block err. can't found target peer %s ", targetPeer)
+               }
+       }
+}
+
+func TestMarkStatus(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+       ps.AddPeer(&basePeer{id: peer3ID})
+
+       height := uint64(1024)
+       ps.MarkStatus(peer1ID, height)
+       targetPeers := []string{peer2ID, peer3ID}
+
+       peers := ps.peersWithoutNewStatus(height)
+       if len(peers) != len(targetPeers) {
+               t.Fatalf("test mark status err. Number of target peers %d got %d", 1, len(peers))
+       }
+
+       for _, targetPeer := range targetPeers {
+               flag := false
+               for _, gotPeer := range peers {
+                       if gotPeer.ID() == targetPeer {
+                               flag = true
+                               break
+                       }
+               }
+               if !flag {
+                       t.Errorf("test mark status err. can't found target peer %s ", targetPeer)
+               }
+       }
+}
+
+func TestMarkBlockSignature(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+       ps.AddPeer(&basePeer{id: peer3ID})
+
+       signature := []byte{0x01, 0x02}
+       ps.MarkBlockSignature(peer1ID, signature)
+       targetPeers := []string{peer2ID, peer3ID}
+
+       peers := ps.PeersWithoutSignature(signature)
+       if len(peers) != len(targetPeers) {
+               t.Fatalf("test mark block signature err. Number of target peers %d got %d", 1, len(peers))
+       }
+
+       for _, targetPeer := range targetPeers {
+               flag := false
+               for _, gotPeer := range peers {
+                       if gotPeer == targetPeer {
+                               flag = true
+                               break
+                       }
+               }
+               if !flag {
+                       t.Errorf("test mark block signature err. can't found target peer %s ", targetPeer)
+               }
+       }
+}
+
+func TestMarkTx(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+       ps.AddPeer(&basePeer{id: peer3ID})
+
+       txHash := bc.NewHash([32]byte{0x01, 0x02})
+       ps.MarkTx(peer1ID, txHash)
+       peers := ps.peersWithoutTx(&txHash)
+       targetPeers := []string{peer2ID, peer3ID}
+       if len(peers) != len(targetPeers) {
+               t.Fatalf("test mark tx err. Number of target peers %d got %d", 1, len(peers))
+       }
+
+       for _, targetPeer := range targetPeers {
+               flag := false
+               for _, gotPeer := range peers {
+                       if gotPeer.ID() == targetPeer {
+                               flag = true
+                               break
+                       }
+               }
+               if !flag {
+                       t.Errorf("test mark tx err. can't found target peer %s ", targetPeer)
+               }
+       }
+}
+
+func TestSetStatus(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+       ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true})
+       ps.SetStatus(peer1ID, 1000, &block1000Hash)
+       ps.SetStatus(peer2ID, 2000, &block2000Hash)
+       ps.SetStatus(peer3ID, 3000, &block3000Hash)
+       ps.SetStatus(peer4ID, 2000, &block2000Hash)
+       targetPeer := peer4ID
+
+       peer := ps.BestPeer(consensus.SFFullNode)
+
+       if peer.ID() != targetPeer {
+               t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID())
+       }
+}
+
+func TestIrreversibleStatus(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+       ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFastSync, isLan: true})
+       ps.SetIrreversibleStatus(peer1ID, 1000, &block1000Hash)
+       ps.SetIrreversibleStatus(peer2ID, 2000, &block2000Hash)
+       ps.SetIrreversibleStatus(peer3ID, 3000, &block3000Hash)
+       ps.SetIrreversibleStatus(peer4ID, 3000, &block3000Hash)
+       targetPeer := peer4ID
+       peer := ps.BestIrreversiblePeer(consensus.SFFastSync)
+
+       if peer.ID() != targetPeer {
+               t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID())
+       }
+}
+
+func TestGetPeersByHeight(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+       ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+       ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true})
+       ps.SetStatus(peer1ID, 1000, &block1000Hash)
+       ps.SetStatus(peer2ID, 2000, &block2000Hash)
+       ps.SetStatus(peer3ID, 3000, &block3000Hash)
+       ps.SetStatus(peer4ID, 2000, &block2000Hash)
+       peers := ps.GetPeersByHeight(2000)
+       targetPeers := []string{peer2ID, peer3ID, peer4ID}
+       if len(peers) != len(targetPeers) {
+               t.Fatalf("test get peers by height err. Number of target peers %d got %d", 3, len(peers))
+       }
+
+       for _, targetPeer := range targetPeers {
+               flag := false
+               for _, gotPeer := range peers {
+                       if gotPeer.ID() == targetPeer {
+                               flag = true
+                               break
+                       }
+               }
+               if !flag {
+                       t.Errorf("test get peers by height err. can't found target peer %s ", targetPeer)
+               }
+       }
+}
+
+func TestRemovePeer(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+
+       ps.RemovePeer(peer1ID)
+       if peer := ps.GetPeer(peer1ID); peer != nil {
+               t.Fatalf("remove peer %s err", peer1ID)
+       }
+
+       if peer := ps.GetPeer(peer2ID); peer == nil {
+               t.Fatalf("Error remove peer %s err", peer2ID)
+       }
+}
+
+func TestProcessIllegal(t *testing.T) {
+       ps := NewPeerSet(&basePeerSet{})
+       ps.AddPeer(&basePeer{id: peer1ID})
+       ps.AddPeer(&basePeer{id: peer2ID})
+
+       ps.ProcessIllegal(peer1ID, security.LevelMsgIllegal, "test")
+       if peer := ps.GetPeer(peer1ID); peer != nil {
+               t.Fatalf("remove peer %s err", peer1ID)
+       }
+
+       ps.ProcessIllegal(peer2ID, security.LevelMsgIllegal, "test")
+       if peer := ps.GetPeer(peer2ID); peer == nil {
+               t.Fatalf("Error remove peer %s err", peer2ID)
+       }
+}
index 95ffbb4..88cf9d4 100644 (file)
@@ -67,7 +67,7 @@ func NewSyncManager(config *config.Config, chain *protocol.Chain, txPool *protoc
        if err != nil {
                return nil, err
        }
-       consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers)
+       consensusMgr := consensusmgr.NewManager(sw, chain, peers, dispatcher)
        return &SyncManager{
                config:       config,
                sw:           sw,