fastSyncPivotGap = uint64(64)
minGapStartFastSync = uint64(128)
- errNoSyncPeer = errors.New("can't find sync peer")
- errSkeletonSize = errors.New("fast sync skeleton size wrong")
+ errNoSyncPeer = errors.New("can't find sync peer")
+ errSkeletonSize = errors.New("fast sync skeleton size wrong")
+ errNoMainSkeleton = errors.New("No main skeleton found")
+ errNoSkeletonFound = errors.New("No skeleton found")
)
type fastSync struct {
stopHash := stopBlock.Hash()
skeletonMap := fs.msgFetcher.parallelFetchHeaders(peers, fs.blockLocator(), &stopHash, numOfBlocksSkeletonGap-1)
if len(skeletonMap) == 0 {
- return nil, errors.New("No skeleton found")
+ return nil, errNoSkeletonFound
}
mainSkeleton, ok := skeletonMap[fs.mainSyncPeer.ID()]
if !ok {
- return nil, errors.New("No main skeleton found")
+ return nil, errNoMainSkeleton
}
if len(mainSkeleton) < minSizeOfSyncSkeleton || len(mainSkeleton) > maxSizeOfSyncSkeleton {
import (
"io/ioutil"
"os"
+ "reflect"
+ "sync"
"testing"
"time"
"github.com/vapor/consensus"
dbm "github.com/vapor/database/leveldb"
"github.com/vapor/errors"
+ "github.com/vapor/netsync/peers"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
"github.com/vapor/test/mock"
}
}
}
+
+type mockFetcher struct {
+ baseChain []*types.Block
+ peerStatus map[string][]*types.Block
+ peers []string
+ testType int
+}
+
+func (mf *mockFetcher) resetParameter() {
+ return
+}
+
+func (mf *mockFetcher) addSyncPeer(peerID string) {
+ return
+}
+
+func (mf *mockFetcher) requireBlock(peerID string, height uint64) (*types.Block, error) {
+ return nil, nil
+}
+
+func (mf *mockFetcher) parallelFetchBlocks(work []*fetchBlocksWork, downloadNotifyCh chan struct{}, ProcessStopCh chan struct{}, wg *sync.WaitGroup) {
+ return
+}
+
+func (mf *mockFetcher) parallelFetchHeaders(peers []*peers.Peer, locator []*bc.Hash, stopHash *bc.Hash, skip uint64) map[string][]*types.BlockHeader {
+ result := make(map[string][]*types.BlockHeader)
+ switch mf.testType {
+ case 1:
+ result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader,
+ &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader,
+ &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader,
+ }
+ result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+ &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+ &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader,
+ }
+
+ case 2:
+ result["peer1"] = []*types.BlockHeader{}
+ case 3:
+ case 4:
+ result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+ &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+ &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader, &mf.peerStatus["peer2"][1800].BlockHeader,
+ }
+ case 5:
+ result["peer1"] = []*types.BlockHeader{&mf.peerStatus["peer1"][1000].BlockHeader, &mf.peerStatus["peer1"][1100].BlockHeader, &mf.peerStatus["peer1"][1200].BlockHeader,
+ &mf.peerStatus["peer1"][1300].BlockHeader, &mf.peerStatus["peer1"][1400].BlockHeader, &mf.peerStatus["peer1"][1500].BlockHeader,
+ &mf.peerStatus["peer1"][1600].BlockHeader, &mf.peerStatus["peer1"][1700].BlockHeader, &mf.peerStatus["peer1"][1800].BlockHeader,
+ }
+ result["peer2"] = []*types.BlockHeader{&mf.peerStatus["peer2"][1000].BlockHeader, &mf.peerStatus["peer2"][1100].BlockHeader, &mf.peerStatus["peer2"][1200].BlockHeader,
+ &mf.peerStatus["peer2"][1300].BlockHeader, &mf.peerStatus["peer2"][1400].BlockHeader, &mf.peerStatus["peer2"][1500].BlockHeader,
+ &mf.peerStatus["peer2"][1600].BlockHeader, &mf.peerStatus["peer2"][1700].BlockHeader,
+ }
+ }
+ return result
+}
+
+func TestCreateFetchBlocksTasks(t *testing.T) {
+ baseChain := mockBlocks(nil, 1000)
+ chainX := append(baseChain, mockBlocks(baseChain[1000], 2000)...)
+ chainY := append(baseChain, mockBlocks(baseChain[1000], 1900)...)
+ peerStatus := make(map[string][]*types.Block)
+ peerStatus["peer1"] = chainX
+ peerStatus["peer2"] = chainY
+ type syncPeer struct {
+ peer *P2PPeer
+ bestHeight uint64
+ irreversibleHeight uint64
+ }
+
+ cases := []struct {
+ peers []*syncPeer
+ mainSyncPeer string
+ testType int
+ wantTasks []*fetchBlocksWork
+ wantErr error
+ }{
+ // normal test
+ {
+ peers: []*syncPeer{
+ {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+ {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+ },
+ mainSyncPeer: "peer1",
+ testType: 1,
+ wantTasks: []*fetchBlocksWork{
+ {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader},
+ {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader},
+ {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader},
+ {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader},
+ },
+ wantErr: nil,
+ },
+ // test no sync peer
+ {
+ peers: []*syncPeer{},
+ mainSyncPeer: "peer1",
+ testType: 0,
+ wantTasks: nil,
+ wantErr: errNoSyncPeer,
+ },
+ // primary sync peer skeleton size error
+ {
+ peers: []*syncPeer{
+ {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+ {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+ },
+ mainSyncPeer: "peer1",
+ testType: 2,
+ wantTasks: nil,
+ wantErr: errSkeletonSize,
+ },
+ // no skeleton return
+ {
+ peers: []*syncPeer{
+ {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+ {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+ },
+ mainSyncPeer: "peer1",
+ testType: 3,
+ wantTasks: nil,
+ wantErr: errNoSkeletonFound,
+ },
+ // no main skeleton found
+ {
+ peers: []*syncPeer{
+ {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+ {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+ },
+ mainSyncPeer: "peer1",
+ testType: 4,
+ wantTasks: nil,
+ wantErr: errNoMainSkeleton,
+ },
+ // skeleton length mismatch
+ {
+ peers: []*syncPeer{
+ {peer: &P2PPeer{id: "peer1", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 1000, irreversibleHeight: 1000},
+ {peer: &P2PPeer{id: "peer2", flag: consensus.SFFullNode | consensus.SFFastSync}, bestHeight: 800, irreversibleHeight: 800},
+ },
+ mainSyncPeer: "peer1",
+ testType: 5,
+ wantTasks: []*fetchBlocksWork{
+ {&chainX[1000].BlockHeader, &chainX[1100].BlockHeader}, {&chainX[1100].BlockHeader, &chainX[1200].BlockHeader},
+ {&chainX[1200].BlockHeader, &chainX[1300].BlockHeader}, {&chainX[1300].BlockHeader, &chainX[1400].BlockHeader},
+ {&chainX[1400].BlockHeader, &chainX[1500].BlockHeader}, {&chainX[1500].BlockHeader, &chainX[1600].BlockHeader},
+ {&chainX[1600].BlockHeader, &chainX[1700].BlockHeader}, {&chainX[1700].BlockHeader, &chainX[1800].BlockHeader},
+ },
+ wantErr: nil,
+ },
+ }
+
+ for i, c := range cases {
+ peers := peers.NewPeerSet(NewPeerSet())
+ for _, syncPeer := range c.peers {
+ peers.AddPeer(syncPeer.peer)
+ peers.SetStatus(syncPeer.peer.id, syncPeer.bestHeight, nil)
+ peers.SetIrreversibleStatus(syncPeer.peer.id, syncPeer.irreversibleHeight, nil)
+ }
+ mockChain := mock.NewChain(nil)
+ fs := newFastSync(mockChain, &mockFetcher{baseChain: baseChain, peerStatus: peerStatus, testType: c.testType}, nil, peers)
+ fs.mainSyncPeer = fs.peers.GetPeer(c.mainSyncPeer)
+ tasks, err := fs.createFetchBlocksTasks(baseChain[700])
+ if err != c.wantErr {
+ t.Errorf("case %d: got %v want %v", i, err, c.wantErr)
+ }
+ if !reflect.DeepEqual(tasks, c.wantTasks) {
+ t.Errorf("case %d: got %v want %v", i, tasks, c.wantTasks)
+ }
+ }
+}
--- /dev/null
+package chainmgr
+
+import (
+ "testing"
+)
+
+func TestAddDel(t *testing.T) {
+ syncPeers := newFastSyncPeers()
+ peers := make(map[string]bool)
+ peers["Peer1"] = true
+ peers["Peer2"] = true
+ for k := range peers {
+ syncPeers.add(k)
+ syncPeers.add(k)
+ }
+ if syncPeers.size() != len(peers) {
+ t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers))
+ }
+
+ syncPeers.delete("Peer1")
+ if syncPeers.size() != 1 {
+ t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1)
+ }
+
+ syncPeers.delete("Peer1")
+ if syncPeers.size() != 1 {
+ t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), 1)
+ }
+}
+
+func TestIdlePeers(t *testing.T) {
+ syncPeers := newFastSyncPeers()
+ peers := make(map[string]bool)
+ peers["Peer1"] = true
+ peers["Peer2"] = true
+ for k := range peers {
+ syncPeers.add(k)
+ syncPeers.add(k)
+ }
+
+ idlePeers := syncPeers.selectIdlePeers()
+ if len(idlePeers) != len(peers) {
+ t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+ }
+
+ for _, peer := range idlePeers {
+ if ok := peers[peer]; !ok {
+ t.Errorf("selcet idle peers test err: want peers %v got %v", peers, idlePeers)
+ }
+ }
+
+ idlePeers = syncPeers.selectIdlePeers()
+ if len(idlePeers) != 0 {
+ t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0)
+ }
+
+}
+
+func TestIdlePeer(t *testing.T) {
+ syncPeers := newFastSyncPeers()
+ peers := make(map[string]bool)
+ peers["Peer1"] = true
+ peers["Peer2"] = true
+ for k := range peers {
+ syncPeers.add(k)
+ syncPeers.add(k)
+ }
+ idlePeer, err := syncPeers.selectIdlePeer()
+ if err != nil {
+ t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil)
+ }
+
+ if ok := peers[idlePeer]; !ok {
+ t.Error("selcet idle peers test err.")
+ }
+ idlePeer, err = syncPeers.selectIdlePeer()
+ if err != nil {
+ t.Errorf("selcet idle peers test err: got %v\nwant %v", err, nil)
+ }
+
+ if ok := peers[idlePeer]; !ok {
+ t.Error("selcet idle peers test err.")
+ }
+ idlePeer, err = syncPeers.selectIdlePeer()
+ if err != errNoValidFastSyncPeer {
+ t.Errorf("selcet idle peers test err: got %v\nwant %v", err, errNoValidFastSyncPeer)
+ }
+}
+
+func TestSetIdle(t *testing.T) {
+ syncPeers := newFastSyncPeers()
+ peers := make(map[string]bool)
+ peers["Peer2"] = true
+ for k := range peers {
+ syncPeers.add(k)
+ }
+ if syncPeers.size() != len(peers) {
+ t.Errorf("add peer test err: got %d\nwant %d", syncPeers.size(), len(peers))
+ }
+ idlePeers := syncPeers.selectIdlePeers()
+ if len(idlePeers) != len(peers) {
+ t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+ }
+
+ syncPeers.setIdle("Peer1")
+ idlePeers = syncPeers.selectIdlePeers()
+ if len(idlePeers) != 0 {
+ t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), 0)
+ }
+
+ syncPeers.setIdle("Peer2")
+ idlePeers = syncPeers.selectIdlePeers()
+ if len(idlePeers) != len(peers) {
+ t.Errorf("selcet idle peers test err: got %d\nwant %d", len(idlePeers), len(peers))
+ }
+}
dbm "github.com/vapor/database/leveldb"
"github.com/vapor/consensus"
+ "github.com/vapor/event"
"github.com/vapor/netsync/peers"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
}
return &Manager{
- chain: chain,
- blockKeeper: newBlockKeeper(chain, peers, fastSyncDB),
- peers: peers,
- mempool: mempool,
- txSyncCh: make(chan *txSyncMsg),
+ chain: chain,
+ blockKeeper: newBlockKeeper(chain, peers, fastSyncDB),
+ peers: peers,
+ mempool: mempool,
+ txSyncCh: make(chan *txSyncMsg),
+ eventDispatcher: event.NewDispatcher(),
}
}
"github.com/vapor/consensus"
dbm "github.com/vapor/database/leveldb"
"github.com/vapor/protocol"
+ core "github.com/vapor/protocol"
"github.com/vapor/protocol/bc"
"github.com/vapor/protocol/bc/types"
"github.com/vapor/test/mock"
const txsNumber = 2000
-func getTransactions() []*types.Tx {
- txs := []*types.Tx{}
+type mempool struct {
+}
+
+func (m *mempool) GetTransactions() []*core.TxDesc {
+ txs := []*core.TxDesc{}
for i := 0; i < txsNumber; i++ {
txInput := types.NewSpendInput(nil, bc.NewHash([32]byte{0x01}), *consensus.BTMAssetID, uint64(i), 1, []byte{0x51})
txInput.CommitmentSuffix = []byte{0, 1, 2}
Outputs: []*types.TxOutput{
types.NewIntraChainOutput(*consensus.BTMAssetID, uint64(i), []byte{0x6a}),
},
+ SerializedSize: 1000,
},
Tx: &bc.Tx{
ID: bc.Hash{V0: uint64(i), V1: uint64(i), V2: uint64(i), V3: uint64(i)},
},
}
- txs = append(txs, tx)
+ txs = append(txs, &core.TxDesc{Tx: tx})
}
return txs
}
blocks := mockBlocks(nil, 5)
a := mockSync(blocks, &mock.Mempool{}, testDBA)
b := mockSync(blocks, &mock.Mempool{}, testDBB)
-
+ a.mempool = &mempool{}
netWork := NewNetWork()
netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
go a.syncMempoolLoop()
a.syncMempool("test node B")
- wantTxs := getTransactions()
- a.txSyncCh <- &txSyncMsg{"test node B", wantTxs}
+ wantTxs := a.mempool.GetTransactions()
timeout := time.NewTimer(2 * time.Second)
defer timeout.Stop()
for i, gotTx := range gotTxs {
index := gotTx.Tx.Inputs[0].Amount()
- if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Inputs[0].Amount()) {
- t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Inputs))
+ if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
+ }
+
+ if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
+ }
+ }
+}
+
+func TestBroadcastTxsLoop(t *testing.T) {
+ tmpDir, err := ioutil.TempDir(".", "")
+ if err != nil {
+ t.Fatalf("failed to create temporary data folder: %v", err)
+ }
+ defer os.RemoveAll(tmpDir)
+ testDBA := dbm.NewDB("testdba", "leveldb", tmpDir)
+ testDBB := dbm.NewDB("testdbb", "leveldb", tmpDir)
+
+ blocks := mockBlocks(nil, 5)
+ a := mockSync(blocks, &mock.Mempool{}, testDBA)
+ b := mockSync(blocks, &mock.Mempool{}, testDBB)
+ a.mempool = &mempool{}
+ netWork := NewNetWork()
+ netWork.Register(a, "192.168.0.1", "test node A", consensus.SFFullNode)
+ netWork.Register(b, "192.168.0.2", "test node B", consensus.SFFullNode)
+ if B2A, A2B, err := netWork.HandsShake(a, b); err != nil {
+ t.Errorf("fail on peer hands shake %v", err)
+ } else {
+ go B2A.postMan()
+ go A2B.postMan()
+ }
+ a.txMsgSub, err = a.eventDispatcher.Subscribe(core.TxMsgEvent{})
+ if err != nil {
+ t.Fatal("txMsgSub subscribe err", err)
+ }
+ go a.broadcastTxsLoop()
+ wantTxs := a.mempool.GetTransactions()
+ txsNum := 50
+ for i, txD := range wantTxs {
+ if i >= txsNum {
+ break
}
+ a.eventDispatcher.Post(core.TxMsgEvent{TxMsg: &core.TxPoolMsg{TxDesc: txD, MsgType: core.MsgNewTx}})
+ }
+ timeout := time.NewTimer(2 * time.Second)
+ defer timeout.Stop()
+ ticker := time.NewTicker(500 * time.Millisecond)
+ defer ticker.Stop()
- if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Outputs[0].AssetAmount()) {
- t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Outputs))
+ gotTxs := []*protocol.TxDesc{}
+ for {
+ select {
+ case <-ticker.C:
+ gotTxs = b.mempool.GetTransactions()
+ if len(gotTxs) >= txsNum {
+ goto out
+ }
+ case <-timeout.C:
+ t.Fatalf("mempool sync timeout")
}
+ }
+out:
+ if len(gotTxs) != txsNum {
+ t.Fatalf("mempool sync txs num err. got:%d want:%d", len(gotTxs), txsNumber)
}
+ for i, gotTx := range gotTxs {
+ index := gotTx.Tx.Inputs[0].Amount()
+ if !reflect.DeepEqual(gotTx.Tx.Inputs[0].Amount(), wantTxs[index].Tx.Inputs[0].Amount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Inputs), spew.Sdump(wantTxs[0].Tx.Inputs))
+ }
+
+ if !reflect.DeepEqual(gotTx.Tx.Outputs[0].AssetAmount(), wantTxs[index].Tx.Outputs[0].AssetAmount()) {
+ t.Fatalf("mempool tx err. index:%d\n,gotTx:%s\n,wantTx:%s", i, spew.Sdump(gotTx.Tx.Outputs), spew.Sdump(wantTxs[0].Tx.Outputs))
+ }
+ }
}
"github.com/sirupsen/logrus"
"gopkg.in/karalabe/cookiejar.v2/collections/prque"
- "github.com/vapor/netsync/peers"
"github.com/vapor/p2p/security"
"github.com/vapor/protocol/bc"
)
// and scheduling them for retrieval.
type blockFetcher struct {
chain Chain
- peers *peers.PeerSet
+ peers Peers
newBlockCh chan *blockMsg
queue *prque.Prque
}
//NewBlockFetcher creates a block fetcher to retrieve blocks of the new propose.
-func newBlockFetcher(chain Chain, peers *peers.PeerSet) *blockFetcher {
+func newBlockFetcher(chain Chain, peers Peers) *blockFetcher {
f := &blockFetcher{
chain: chain,
peers: peers,
// BroadcastFilterTargetPeers filter target peers to filter the nodes that need to send messages.
func (bs *BlockSignatureMsg) BroadcastFilterTargetPeers(ps *peers.PeerSet) []string {
- return ps.PeersWithoutSign(bs.Signature)
+ return ps.PeersWithoutSignature(bs.Signature)
}
// BlockProposeMsg block propose message transferred between nodes.
ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error
}
+type Peers interface {
+ AddPeer(peer peers.BasePeer)
+ BroadcastMsg(bm peers.BroadcastMsg) error
+ GetPeer(id string) *peers.Peer
+ MarkBlock(peerID string, hash *bc.Hash)
+ MarkBlockSignature(peerID string, signature []byte)
+ ProcessIllegal(peerID string, level byte, reason string)
+ RemovePeer(peerID string)
+ SetStatus(peerID string, height uint64, hash *bc.Hash)
+}
+
type blockMsg struct {
block *types.Block
peerID string
type Manager struct {
sw Switch
chain Chain
- peers *peers.PeerSet
+ peers Peers
blockFetcher *blockFetcher
eventDispatcher *event.Dispatcher
}
// NewManager create new manager.
-func NewManager(sw Switch, chain Chain, dispatcher *event.Dispatcher, peers *peers.PeerSet) *Manager {
+func NewManager(sw Switch, chain Chain, peers Peers, dispatcher *event.Dispatcher) *Manager {
manager := &Manager{
sw: sw,
chain: chain,
--- /dev/null
+package consensusmgr
+
+import (
+ "math/rand"
+ "net"
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/tendermint/tmlibs/flowrate"
+
+ "github.com/vapor/consensus"
+ "github.com/vapor/event"
+ "github.com/vapor/netsync/peers"
+ "github.com/vapor/p2p"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+type p2peer struct {
+}
+
+func (p *p2peer) Addr() net.Addr {
+ return nil
+}
+
+func (p *p2peer) ID() string {
+ return ""
+}
+
+func (p *p2peer) RemoteAddrHost() string {
+ return ""
+}
+func (p *p2peer) ServiceFlag() consensus.ServiceFlag {
+ return 0
+}
+func (p *p2peer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+ return nil, nil
+}
+func (p *p2peer) TrySend(byte, interface{}) bool {
+ return true
+}
+func (p *p2peer) IsLAN() bool {
+ return false
+}
+
+func mockBlocks(startBlock *types.Block, height uint64) []*types.Block {
+ blocks := []*types.Block{}
+ indexBlock := &types.Block{}
+ if startBlock == nil {
+ indexBlock = &types.Block{BlockHeader: types.BlockHeader{Version: uint64(rand.Uint32())}}
+ blocks = append(blocks, indexBlock)
+ } else {
+ indexBlock = startBlock
+ }
+
+ for indexBlock.Height < height {
+ block := &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: indexBlock.Height + 1,
+ PreviousBlockHash: indexBlock.Hash(),
+ Version: uint64(rand.Uint32()),
+ },
+ }
+ blocks = append(blocks, block)
+ indexBlock = block
+ }
+ return blocks
+}
+
+type mockSW struct {
+}
+
+func (s *mockSW) AddReactor(name string, reactor p2p.Reactor) p2p.Reactor {
+ return nil
+}
+
+type mockChain struct {
+}
+
+func (c *mockChain) BestBlockHeight() uint64 {
+ return 0
+}
+
+func (c *mockChain) GetHeaderByHash(*bc.Hash) (*types.BlockHeader, error) {
+ return nil, nil
+}
+
+func (c *mockChain) ProcessBlock(*types.Block) (bool, error) {
+ return false, nil
+}
+
+func (c *mockChain) ProcessBlockSignature(signature, pubkey []byte, blockHash *bc.Hash) error {
+ return nil
+}
+
+type mockPeers struct {
+ msgCount *int
+ knownBlock *bc.Hash
+ blockHeight *uint64
+ knownSignature *[]byte
+}
+
+func newMockPeers(msgCount *int, knownBlock *bc.Hash, blockHeight *uint64, signature *[]byte) *mockPeers {
+ return &mockPeers{
+ msgCount: msgCount,
+ knownBlock: knownBlock,
+ blockHeight: blockHeight,
+ knownSignature: signature,
+ }
+}
+
+func (ps *mockPeers) AddPeer(peer peers.BasePeer) {
+
+}
+
+func (ps *mockPeers) BroadcastMsg(bm peers.BroadcastMsg) error {
+ *ps.msgCount++
+ return nil
+}
+func (ps *mockPeers) GetPeer(id string) *peers.Peer {
+ return &peers.Peer{BasePeer: &p2peer{}}
+}
+func (ps *mockPeers) MarkBlock(peerID string, hash *bc.Hash) {
+ *ps.knownBlock = *hash
+}
+
+func (ps *mockPeers) MarkBlockSignature(peerID string, signature []byte) {
+ *ps.knownSignature = append(*ps.knownSignature, signature...)
+}
+
+func (ps *mockPeers) ProcessIllegal(peerID string, level byte, reason string) {
+
+}
+func (p *mockPeers) RemovePeer(peerID string) {
+
+}
+func (ps *mockPeers) SetStatus(peerID string, height uint64, hash *bc.Hash) {
+ *ps.blockHeight = height
+}
+
+func TestBlockProposeMsgBroadcastLoop(t *testing.T) {
+ dispatcher := event.NewDispatcher()
+ msgCount := 0
+ blockHeight := 100
+ mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher)
+ blocks := mockBlocks(nil, uint64(blockHeight))
+
+ mgr.Start()
+ defer mgr.Stop()
+ time.Sleep(10 * time.Millisecond)
+ for _, block := range blocks {
+ mgr.eventDispatcher.Post(event.NewProposedBlockEvent{Block: *block})
+ }
+ time.Sleep(10 * time.Millisecond)
+ if msgCount != blockHeight+1 {
+ t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1)
+ }
+}
+
+func TestBlockSignatureMsgBroadcastLoop(t *testing.T) {
+ dispatcher := event.NewDispatcher()
+ msgCount := 0
+ blockHeight := 100
+ mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, nil), dispatcher)
+ blocks := mockBlocks(nil, uint64(blockHeight))
+
+ mgr.Start()
+ defer mgr.Stop()
+ time.Sleep(10 * time.Millisecond)
+ for _, block := range blocks {
+ mgr.eventDispatcher.Post(event.BlockSignatureEvent{BlockHash: block.Hash(), Signature: []byte{0x1, 0x2}, XPub: []byte{0x011, 0x022}})
+ }
+ time.Sleep(10 * time.Millisecond)
+ if msgCount != blockHeight+1 {
+ t.Fatalf("broad propose block msg err. got:%d\n want:%d", msgCount, blockHeight+1)
+ }
+}
+
+func TestProcessBlockProposeMsg(t *testing.T) {
+ dispatcher := event.NewDispatcher()
+ msgCount := 0
+ var knownBlock bc.Hash
+ blockHeight := uint64(0)
+ peerID := "Peer1"
+ mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, &knownBlock, &blockHeight, nil), dispatcher)
+ block := &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: 100,
+ PreviousBlockHash: bc.NewHash([32]byte{0x1}),
+ Version: uint64(rand.Uint32()),
+ },
+ }
+ msg, err := NewBlockProposeMsg(block)
+ if err != nil {
+ t.Fatal("create new block propose msg err", err)
+ }
+
+ mgr.processMsg(peerID, 0, msg)
+ if knownBlock != block.Hash() {
+ t.Fatalf("mark propose block msg err. got:%d\n want:%d", knownBlock, block.Hash())
+ }
+
+ if blockHeight != block.Height {
+ t.Fatalf("set peer status err. got:%d\n want:%d", blockHeight, block.Height)
+ }
+}
+
+func TestProcessBlockSignatureMsg(t *testing.T) {
+ dispatcher := event.NewDispatcher()
+ msgCount := 0
+ knownSignature := []byte{}
+ peerID := "Peer1"
+ mgr := NewManager(&mockSW{}, &mockChain{}, newMockPeers(&msgCount, nil, nil, &knownSignature), dispatcher)
+ block := &types.Block{
+ BlockHeader: types.BlockHeader{
+ Height: 100,
+ PreviousBlockHash: bc.NewHash([32]byte{0x1}),
+ Version: uint64(rand.Uint32()),
+ },
+ }
+
+ signature := []byte{0x01, 0x02}
+ msg := NewBlockSignatureMsg(block.Hash(), signature, []byte{0x03, 0x04})
+
+ mgr.processMsg(peerID, 0, msg)
+
+ if !reflect.DeepEqual(knownSignature, signature) {
+ t.Fatalf("set peer status err. got:%d\n want:%d", knownSignature, signature)
+ }
+}
p.knownTxs.Add(hash.String())
}
-func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var peers []string
- for _, peer := range ps.peers {
- if !peer.knownBlocks.Has(hash.String()) {
- peers = append(peers, peer.ID())
- }
- }
- return peers
-}
-
-func (ps *PeerSet) PeersWithoutSign(signature []byte) []string {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- var peers []string
- for _, peer := range ps.peers {
- if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
- peers = append(peers, peer.ID())
- }
- }
- return peers
-}
-
func (p *Peer) SendBlock(block *types.Block) (bool, error) {
msg, err := msgs.NewBlockMessage(block)
if err != nil {
return nil
}
-func (ps *PeerSet) ErrorHandler(peerID string, level byte, err error) {
- if errors.Root(err) == ErrPeerMisbehave {
- ps.ProcessIllegal(peerID, level, err.Error())
- } else {
- ps.RemovePeer(peerID)
- }
-}
-
// Peer retrieves the registered peer with the given id.
func (ps *PeerSet) GetPeer(id string) *Peer {
ps.mtx.RLock()
peer.markTransaction(&txHash)
}
-func (ps *PeerSet) peersWithoutBlock(hash *bc.Hash) []*Peer {
+func (ps *PeerSet) PeersWithoutBlock(hash bc.Hash) []string {
ps.mtx.RLock()
defer ps.mtx.RUnlock()
- peers := []*Peer{}
+ var peers []string
for _, peer := range ps.peers {
if !peer.knownBlocks.Has(hash.String()) {
- peers = append(peers, peer)
+ peers = append(peers, peer.ID())
+ }
+ }
+ return peers
+}
+
+func (ps *PeerSet) PeersWithoutSignature(signature []byte) []string {
+ ps.mtx.RLock()
+ defer ps.mtx.RUnlock()
+
+ var peers []string
+ for _, peer := range ps.peers {
+ if !peer.knownSignatures.Has(hex.EncodeToString(signature)) {
+ peers = append(peers, peer.ID())
}
}
return peers
peer.SetIrreversibleStatus(height, hash)
}
-
-func (ps *PeerSet) Size() int {
- ps.mtx.RLock()
- defer ps.mtx.RUnlock()
-
- return len(ps.peers)
-}
--- /dev/null
+package peers
+
+import (
+ "net"
+ "reflect"
+ "testing"
+
+ "github.com/davecgh/go-spew/spew"
+ "github.com/tendermint/tmlibs/flowrate"
+ "github.com/vapor/consensus"
+ "github.com/vapor/p2p/security"
+ "github.com/vapor/protocol/bc"
+ "github.com/vapor/protocol/bc/types"
+)
+
+var (
+ peer1ID = "PEER1"
+ peer2ID = "PEER2"
+ peer3ID = "PEER3"
+ peer4ID = "PEER4"
+
+ block1000Hash = bc.NewHash([32]byte{0x01, 0x02})
+ block2000Hash = bc.NewHash([32]byte{0x02, 0x03})
+ block3000Hash = bc.NewHash([32]byte{0x03, 0x04})
+)
+
+type basePeer struct {
+ id string
+ serviceFlag consensus.ServiceFlag
+ isLan bool
+}
+
+func (bp *basePeer) Addr() net.Addr {
+ return nil
+}
+
+func (bp *basePeer) ID() string {
+ return bp.id
+}
+
+func (bp *basePeer) RemoteAddrHost() string {
+ switch bp.ID() {
+ case peer1ID:
+ return peer1ID
+ case peer2ID:
+ return peer2ID
+ case peer3ID:
+ return peer3ID
+ case peer4ID:
+ return peer4ID
+ }
+ return ""
+}
+
+func (bp *basePeer) ServiceFlag() consensus.ServiceFlag {
+ return bp.serviceFlag
+}
+
+func (bp *basePeer) TrafficStatus() (*flowrate.Status, *flowrate.Status) {
+ return nil, nil
+}
+
+func (bp *basePeer) TrySend(byte, interface{}) bool {
+ return true
+}
+
+func (bp *basePeer) IsLAN() bool {
+ return bp.isLan
+}
+
+func TestSetPeerStatus(t *testing.T) {
+ peer := newPeer(&basePeer{})
+ height := uint64(100)
+ hash := bc.NewHash([32]byte{0x1, 0x2})
+ peer.SetBestStatus(height, &hash)
+ if peer.Height() != height {
+ t.Fatalf("test set best status err. got %d want %d", peer.Height(), height)
+ }
+}
+
+func TestSetIrreversibleStatus(t *testing.T) {
+ peer := newPeer(&basePeer{})
+ height := uint64(100)
+ hash := bc.NewHash([32]byte{0x1, 0x2})
+ peer.SetIrreversibleStatus(height, &hash)
+ if peer.IrreversibleHeight() != height {
+ t.Fatalf("test set Irreversible status err. got %d want %d", peer.Height(), height)
+ }
+}
+
+func TestAddFilterAddresses(t *testing.T) {
+ peer := newPeer(&basePeer{})
+ tx := types.NewTx(types.TxData{
+ Inputs: []*types.TxInput{
+ types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")),
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")),
+ },
+ })
+
+ peer.AddFilterAddresses([][]byte{[]byte("spendProgram")})
+ if !peer.isRelatedTx(tx) {
+ t.Fatal("test filter addresses error.")
+ }
+
+ peer.AddFilterAddresses([][]byte{[]byte("testProgram")})
+ if peer.isRelatedTx(tx) {
+ t.Fatal("test filter addresses error.")
+ }
+}
+
+func TestFilterClear(t *testing.T) {
+ peer := newPeer(&basePeer{})
+ tx := types.NewTx(types.TxData{
+ Inputs: []*types.TxInput{
+ types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram")),
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram")),
+ },
+ })
+
+ peer.AddFilterAddresses([][]byte{[]byte("spendProgram")})
+ if !peer.isRelatedTx(tx) {
+ t.Fatal("test filter addresses error.")
+ }
+
+ peer.FilterClear()
+ if peer.isRelatedTx(tx) {
+ t.Fatal("test filter addresses error.")
+ }
+}
+
+func TestGetRelatedTxAndStatus(t *testing.T) {
+ peer := newPeer(&basePeer{})
+ txs := []*types.Tx{
+ types.NewTx(types.TxData{
+ Inputs: []*types.TxInput{
+ types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram1")),
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram1")),
+ },
+ }),
+ types.NewTx(types.TxData{
+ Inputs: []*types.TxInput{
+ types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram2")),
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram2")),
+ },
+ }),
+ types.NewTx(types.TxData{
+ Inputs: []*types.TxInput{
+ types.NewSpendInput(nil, bc.Hash{}, bc.NewAssetID([32]byte{1}), 5, 1, []byte("spendProgram3")),
+ },
+ Outputs: []*types.TxOutput{
+ types.NewIntraChainOutput(bc.NewAssetID([32]byte{3}), 8, []byte("outProgram3")),
+ },
+ }),
+ }
+ txStatuses := &bc.TransactionStatus{
+ VerifyStatus: []*bc.TxVerifyResult{{StatusFail: true}, {StatusFail: false}, {StatusFail: false}},
+ }
+ peer.AddFilterAddresses([][]byte{[]byte("spendProgram1"), []byte("outProgram3")})
+ gotTxs, gotStatus := peer.getRelatedTxAndStatus(txs, txStatuses)
+ if len(gotTxs) != 2 {
+ t.Error("TestGetRelatedTxAndStatus txs size error")
+ }
+
+ if !reflect.DeepEqual(*gotTxs[0].Tx, *txs[0].Tx) {
+ t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[0].Tx), spew.Sdump(txs[0].Tx))
+ }
+
+ if !reflect.DeepEqual(*gotTxs[1].Tx, *txs[2].Tx) {
+ t.Errorf("txs msg test err: got %s\nwant %s", spew.Sdump(gotTxs[1].Tx), spew.Sdump(txs[2].Tx))
+ }
+
+ if gotStatus[0].StatusFail != true || gotStatus[1].StatusFail != false {
+ t.Error("TestGetRelatedTxAndStatus txs status error")
+ }
+}
+
+type basePeerSet struct {
+}
+
+func (bp *basePeerSet) StopPeerGracefully(string) {
+
+}
+
+func (bp *basePeerSet) IsBanned(ip string, level byte, reason string) bool {
+ switch ip {
+ case peer1ID:
+ return true
+ case peer2ID:
+ return false
+ case peer3ID:
+ return true
+ case peer4ID:
+ return false
+ }
+ return false
+}
+
+func TestMarkBlock(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+ ps.AddPeer(&basePeer{id: peer3ID})
+
+ blockHash := bc.NewHash([32]byte{0x01, 0x02})
+ ps.MarkBlock(peer1ID, &blockHash)
+ targetPeers := []string{peer2ID, peer3ID}
+
+ peers := ps.PeersWithoutBlock(blockHash)
+ if len(peers) != len(targetPeers) {
+ t.Fatalf("test mark block err. Number of target peers %d got %d", 1, len(peers))
+ }
+
+ for _, targetPeer := range targetPeers {
+ flag := false
+ for _, gotPeer := range peers {
+ if gotPeer == targetPeer {
+ flag = true
+ break
+ }
+ }
+ if !flag {
+ t.Errorf("test mark block err. can't found target peer %s ", targetPeer)
+ }
+ }
+}
+
+func TestMarkStatus(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+ ps.AddPeer(&basePeer{id: peer3ID})
+
+ height := uint64(1024)
+ ps.MarkStatus(peer1ID, height)
+ targetPeers := []string{peer2ID, peer3ID}
+
+ peers := ps.peersWithoutNewStatus(height)
+ if len(peers) != len(targetPeers) {
+ t.Fatalf("test mark status err. Number of target peers %d got %d", 1, len(peers))
+ }
+
+ for _, targetPeer := range targetPeers {
+ flag := false
+ for _, gotPeer := range peers {
+ if gotPeer.ID() == targetPeer {
+ flag = true
+ break
+ }
+ }
+ if !flag {
+ t.Errorf("test mark status err. can't found target peer %s ", targetPeer)
+ }
+ }
+}
+
+func TestMarkBlockSignature(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+ ps.AddPeer(&basePeer{id: peer3ID})
+
+ signature := []byte{0x01, 0x02}
+ ps.MarkBlockSignature(peer1ID, signature)
+ targetPeers := []string{peer2ID, peer3ID}
+
+ peers := ps.PeersWithoutSignature(signature)
+ if len(peers) != len(targetPeers) {
+ t.Fatalf("test mark block signature err. Number of target peers %d got %d", 1, len(peers))
+ }
+
+ for _, targetPeer := range targetPeers {
+ flag := false
+ for _, gotPeer := range peers {
+ if gotPeer == targetPeer {
+ flag = true
+ break
+ }
+ }
+ if !flag {
+ t.Errorf("test mark block signature err. can't found target peer %s ", targetPeer)
+ }
+ }
+}
+
+func TestMarkTx(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+ ps.AddPeer(&basePeer{id: peer3ID})
+
+ txHash := bc.NewHash([32]byte{0x01, 0x02})
+ ps.MarkTx(peer1ID, txHash)
+ peers := ps.peersWithoutTx(&txHash)
+ targetPeers := []string{peer2ID, peer3ID}
+ if len(peers) != len(targetPeers) {
+ t.Fatalf("test mark tx err. Number of target peers %d got %d", 1, len(peers))
+ }
+
+ for _, targetPeer := range targetPeers {
+ flag := false
+ for _, gotPeer := range peers {
+ if gotPeer.ID() == targetPeer {
+ flag = true
+ break
+ }
+ }
+ if !flag {
+ t.Errorf("test mark tx err. can't found target peer %s ", targetPeer)
+ }
+ }
+}
+
+func TestSetStatus(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+ ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true})
+ ps.SetStatus(peer1ID, 1000, &block1000Hash)
+ ps.SetStatus(peer2ID, 2000, &block2000Hash)
+ ps.SetStatus(peer3ID, 3000, &block3000Hash)
+ ps.SetStatus(peer4ID, 2000, &block2000Hash)
+ targetPeer := peer4ID
+
+ peer := ps.BestPeer(consensus.SFFullNode)
+
+ if peer.ID() != targetPeer {
+ t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID())
+ }
+}
+
+func TestIrreversibleStatus(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+ ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFastSync, isLan: true})
+ ps.SetIrreversibleStatus(peer1ID, 1000, &block1000Hash)
+ ps.SetIrreversibleStatus(peer2ID, 2000, &block2000Hash)
+ ps.SetIrreversibleStatus(peer3ID, 3000, &block3000Hash)
+ ps.SetIrreversibleStatus(peer4ID, 3000, &block3000Hash)
+ targetPeer := peer4ID
+ peer := ps.BestIrreversiblePeer(consensus.SFFastSync)
+
+ if peer.ID() != targetPeer {
+ t.Fatalf("test set status err. Name of target peer %s got %s", peer4ID, peer.ID())
+ }
+}
+
+func TestGetPeersByHeight(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer2ID, serviceFlag: consensus.SFFullNode})
+ ps.AddPeer(&basePeer{id: peer3ID, serviceFlag: consensus.SFFastSync})
+ ps.AddPeer(&basePeer{id: peer4ID, serviceFlag: consensus.SFFullNode, isLan: true})
+ ps.SetStatus(peer1ID, 1000, &block1000Hash)
+ ps.SetStatus(peer2ID, 2000, &block2000Hash)
+ ps.SetStatus(peer3ID, 3000, &block3000Hash)
+ ps.SetStatus(peer4ID, 2000, &block2000Hash)
+ peers := ps.GetPeersByHeight(2000)
+ targetPeers := []string{peer2ID, peer3ID, peer4ID}
+ if len(peers) != len(targetPeers) {
+ t.Fatalf("test get peers by height err. Number of target peers %d got %d", 3, len(peers))
+ }
+
+ for _, targetPeer := range targetPeers {
+ flag := false
+ for _, gotPeer := range peers {
+ if gotPeer.ID() == targetPeer {
+ flag = true
+ break
+ }
+ }
+ if !flag {
+ t.Errorf("test get peers by height err. can't found target peer %s ", targetPeer)
+ }
+ }
+}
+
+func TestRemovePeer(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+
+ ps.RemovePeer(peer1ID)
+ if peer := ps.GetPeer(peer1ID); peer != nil {
+ t.Fatalf("remove peer %s err", peer1ID)
+ }
+
+ if peer := ps.GetPeer(peer2ID); peer == nil {
+ t.Fatalf("Error remove peer %s err", peer2ID)
+ }
+}
+
+func TestProcessIllegal(t *testing.T) {
+ ps := NewPeerSet(&basePeerSet{})
+ ps.AddPeer(&basePeer{id: peer1ID})
+ ps.AddPeer(&basePeer{id: peer2ID})
+
+ ps.ProcessIllegal(peer1ID, security.LevelMsgIllegal, "test")
+ if peer := ps.GetPeer(peer1ID); peer != nil {
+ t.Fatalf("remove peer %s err", peer1ID)
+ }
+
+ ps.ProcessIllegal(peer2ID, security.LevelMsgIllegal, "test")
+ if peer := ps.GetPeer(peer2ID); peer == nil {
+ t.Fatalf("Error remove peer %s err", peer2ID)
+ }
+}
if err != nil {
return nil, err
}
- consensusMgr := consensusmgr.NewManager(sw, chain, dispatcher, peers)
+ consensusMgr := consensusmgr.NewManager(sw, chain, peers, dispatcher)
return &SyncManager{
config: config,
sw: sw,