OSDN Git Service

Merge pull request #973 from freewind/refactoring-complete-missing-ids
authorPaladz <yzhu101@uottawa.ca>
Wed, 23 May 2018 07:53:57 +0000 (15:53 +0800)
committerGitHub <noreply@github.com>
Wed, 23 May 2018 07:53:57 +0000 (15:53 +0800)
refactoring code by extracting some meaningful methods

23 files changed:
blockchain/query/annotated.go
crypto/crypto.go
mining/cpuminer/cpuminer.go
netsync/fetcher.go
netsync/handle.go
p2p/addrbook.go [deleted file]
p2p/connection.go
p2p/listener.go
p2p/pex/addrbook.go [new file with mode: 0644]
p2p/pex/addrbook_test.go [moved from p2p/addrbook_test.go with 76% similarity]
p2p/pex/file.go [new file with mode: 0644]
p2p/pex/file_test.go [new file with mode: 0644]
p2p/pex/know_address.go [new file with mode: 0644]
p2p/pex/params.go [new file with mode: 0644]
p2p/pex/pex_message.go [new file with mode: 0644]
p2p/pex/pex_reactor.go [new file with mode: 0644]
p2p/pex_reactor.go [deleted file]
p2p/pex_reactor_test.go [deleted file]
p2p/switch.go
p2p/test_util.go
p2p/util.go [deleted file]
protocol/block.go
wallet/annotated.go

index 4af2e23..6e94c21 100644 (file)
@@ -19,6 +19,7 @@ type AnnotatedTx struct {
        Inputs                 []*AnnotatedInput  `json:"inputs"`
        Outputs                []*AnnotatedOutput `json:"outputs"`
        StatusFail             bool               `json:"status_fail"`
+       Size                   uint64             `json:"size"`
 }
 
 //AnnotatedInput means an annotated transaction input.
@@ -55,11 +56,11 @@ type AnnotatedOutput struct {
 
 //AnnotatedAccount means an annotated account.
 type AnnotatedAccount struct {
-       ID       string           `json:"id"`
-       Alias    string           `json:"alias,omitempty"`
-       XPubs    []chainkd.XPub   `json:"xpubs"`
-       Quorum   int              `json:"quorum"`
-       KeyIndex uint64           `json:"key_index"`
+       ID       string         `json:"id"`
+       Alias    string         `json:"alias,omitempty"`
+       XPubs    []chainkd.XPub `json:"xpubs"`
+       Quorum   int            `json:"quorum"`
+       KeyIndex uint64         `json:"key_index"`
 }
 
 //AnnotatedAsset means an annotated asset.
@@ -79,6 +80,7 @@ type AssetKey struct {
        AssetDerivationPath []chainjson.HexBytes `json:"asset_derivation_path"`
 }
 
+//AnnotatedUTXO means an annotated utxo.
 type AnnotatedUTXO struct {
        Alias               string `json:"account_alias"`
        OutputID            string `json:"id"`
index cd33037..1cfc690 100644 (file)
@@ -22,6 +22,15 @@ import (
        "golang.org/x/crypto/sha3"
 )
 
+func DoubleSha256(b []byte) []byte {
+       hasher := sha3.New256()
+       hasher.Write(b)
+       sum := hasher.Sum(nil)
+       hasher.Reset()
+       hasher.Write(sum)
+       return hasher.Sum(nil)
+}
+
 func Sha256(data ...[]byte) []byte {
        d := sha3.New256()
        for _, b := range data {
index 5d43ece..71f8ba6 100644 (file)
@@ -34,12 +34,8 @@ type CPUMiner struct {
        numWorkers        uint64
        started           bool
        discreteMining    bool
-       wg                sync.WaitGroup
        workerWg          sync.WaitGroup
        updateNumWorkers  chan struct{}
-       queryHashesPerSec chan float64
-       updateHashes      chan uint64
-       speedMonitorQuit  chan struct{}
        quit              chan struct{}
        newBlockCh        chan *bc.Hash
 }
@@ -173,11 +169,7 @@ out:
                }
        }
 
-       // Wait until all workers shut down to stop the speed monitor since
-       // they rely on being able to send updates to it.
        m.workerWg.Wait()
-       close(m.speedMonitorQuit)
-       m.wg.Done()
 }
 
 // Start begins the CPU mining process as well as the speed monitor used to
@@ -189,15 +181,12 @@ func (m *CPUMiner) Start() {
        m.Lock()
        defer m.Unlock()
 
-       // Nothing to do if the miner is already running or if running in
-       // discrete mode (using GenerateNBlocks).
-       if m.started || m.discreteMining {
+       // Nothing to do if the miner is already running
+       if m.started {
                return
        }
 
        m.quit = make(chan struct{})
-       m.speedMonitorQuit = make(chan struct{})
-       m.wg.Add(1)
        go m.miningWorkerController()
 
        m.started = true
@@ -213,14 +202,12 @@ func (m *CPUMiner) Stop() {
        m.Lock()
        defer m.Unlock()
 
-       // Nothing to do if the miner is not currently running or if running in
-       // discrete mode (using GenerateNBlocks).
-       if !m.started || m.discreteMining {
+       // Nothing to do if the miner is not currently running
+       if !m.started {
                return
        }
 
        close(m.quit)
-       m.wg.Wait()
        m.started = false
        log.Info("CPU miner stopped")
 }
@@ -286,8 +273,6 @@ func NewCPUMiner(c *protocol.Chain, accountManager *account.Manager, txPool *pro
                txPool:            txPool,
                numWorkers:        defaultNumWorkers,
                updateNumWorkers:  make(chan struct{}),
-               queryHashesPerSec: make(chan float64),
-               updateHashes:      make(chan uint64),
                newBlockCh:        newBlockCh,
        }
 }
index 34aa1ef..0fe3e80 100644 (file)
@@ -46,7 +46,6 @@ func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
                newMinedBlock: make(chan *blockPending),
                quit:          make(chan struct{}),
                queue:         prque.New(),
-               queues:        make(map[string]int),
                queued:        make(map[bc.Hash]*blockPending),
        }
 }
@@ -181,10 +180,6 @@ func (f *Fetcher) insert(peerID string, block *types.Block) {
 // state.
 func (f *Fetcher) forgetBlock(hash bc.Hash) {
        if insert := f.queued[hash]; insert != nil {
-               f.queues[insert.peerID]--
-               if f.queues[insert.peerID] == 0 {
-                       delete(f.queues, insert.peerID)
-               }
                delete(f.queued, hash)
        }
 }
index 2fdc03b..ac1619b 100644 (file)
@@ -11,6 +11,7 @@ import (
 
        cfg "github.com/bytom/config"
        "github.com/bytom/p2p"
+       "github.com/bytom/p2p/pex"
        core "github.com/bytom/protocol"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/version"
@@ -54,11 +55,14 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
        }
 
        trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-       manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+       addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
+       manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
+
+       pexReactor := pex.NewPEXReactor(addrBook)
+       manager.sw.AddReactor("PEX", pexReactor)
 
        manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
        manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
-
        protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
        manager.sw.AddReactor("PROTOCOL", protocolReactor)
 
@@ -116,21 +120,8 @@ func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
 }
 
 func (sm *SyncManager) netStart() error {
-       // Start the switch
        _, err := sm.sw.Start()
-       if err != nil {
-               return err
-       }
-
-       // If seeds exist, add them to the address book and dial out
-       if sm.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(sm.config.P2P.Seeds, ",")
-               if err := sm.DialSeeds(seeds); err != nil {
-                       return err
-               }
-       }
-       return nil
+       return err
 }
 
 //Start start sync manager service
@@ -221,11 +212,6 @@ func (sm *SyncManager) Peers() *peerSet {
        return sm.peers
 }
 
-//DialSeeds dial seed peers
-func (sm *SyncManager) DialSeeds(seeds []string) error {
-       return sm.sw.DialSeeds(seeds)
-}
-
 //Switch get sync manager switch
 func (sm *SyncManager) Switch() *p2p.Switch {
        return sm.sw
diff --git a/p2p/addrbook.go b/p2p/addrbook.go
deleted file mode 100644 (file)
index ed82a75..0000000
+++ /dev/null
@@ -1,861 +0,0 @@
-// Modified for Tendermint
-// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
-// https://github.com/conformal/btcd/blob/master/LICENSE
-
-package p2p
-
-import (
-       "encoding/binary"
-       "encoding/json"
-       "math"
-       "math/rand"
-       "net"
-       "os"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       cmn "github.com/tendermint/tmlibs/common"
-)
-
-const (
-       // addresses under which the address manager will claim to need more addresses.
-       needAddressThreshold = 1000
-
-       // interval used to dump the address cache to disk for future use.
-       dumpAddressInterval = time.Minute * 2
-
-       // max addresses in each old address bucket.
-       oldBucketSize = 64
-
-       // buckets we split old addresses over.
-       oldBucketCount = 64
-
-       // max addresses in each new address bucket.
-       newBucketSize = 64
-
-       // buckets that we spread new addresses over.
-       newBucketCount = 256
-
-       // old buckets over which an address group will be spread.
-       oldBucketsPerGroup = 4
-
-       // new buckets over which an source address group will be spread.
-       newBucketsPerGroup = 32
-
-       // buckets a frequently seen new address may end up in.
-       maxNewBucketsPerAddress = 4
-
-       // days before which we assume an address has vanished
-       // if we have not seen it announced in that long.
-       numMissingDays = 30
-
-       // tries without a single success before we assume an address is bad.
-       numRetries = 3
-
-       // max failures we will accept without a success before considering an address bad.
-       maxFailures = 10
-
-       // days since the last success before we will consider evicting an address.
-       minBadDays = 7
-
-       // % of total addresses known returned by GetSelection.
-       getSelectionPercent = 23
-
-       // min addresses that must be returned by GetSelection. Useful for bootstrapping.
-       minGetSelection = 32
-
-       // max addresses returned by GetSelection
-       // NOTE: this must match "maxPexMessageSize"
-       maxGetSelection = 250
-)
-
-const (
-       bucketTypeNew = 0x01
-       bucketTypeOld = 0x02
-)
-
-// AddrBook - concurrency safe peer address manager.
-type AddrBook struct {
-       cmn.BaseService
-
-       // immutable after creation
-       filePath          string
-       routabilityStrict bool
-       key               string
-
-       mtx        sync.Mutex
-       rand       *rand.Rand
-       ourAddrs   map[string]*NetAddress
-       addrLookup map[string]*knownAddress // new & old
-       addrNew    []map[string]*knownAddress
-       addrOld    []map[string]*knownAddress
-       nOld       int
-       nNew       int
-
-       wg sync.WaitGroup
-}
-
-// NewAddrBook creates a new address book.
-// Use Start to begin processing asynchronous address updates.
-func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
-       am := &AddrBook{
-               rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
-               ourAddrs:          make(map[string]*NetAddress),
-               addrLookup:        make(map[string]*knownAddress),
-               filePath:          filePath,
-               routabilityStrict: routabilityStrict,
-       }
-       am.init()
-       am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
-       return am
-}
-
-// When modifying this, don't forget to update loadFromFile()
-func (a *AddrBook) init() {
-       a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
-       // New addr buckets
-       a.addrNew = make([]map[string]*knownAddress, newBucketCount)
-       for i := range a.addrNew {
-               a.addrNew[i] = make(map[string]*knownAddress)
-       }
-       // Old addr buckets
-       a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
-       for i := range a.addrOld {
-               a.addrOld[i] = make(map[string]*knownAddress)
-       }
-}
-
-// OnStart implements Service.
-func (a *AddrBook) OnStart() error {
-       a.BaseService.OnStart()
-       a.loadFromFile(a.filePath)
-       a.wg.Add(1)
-       go a.saveRoutine()
-       return nil
-}
-
-// OnStop implements Service.
-func (a *AddrBook) OnStop() {
-       a.BaseService.OnStop()
-}
-
-func (a *AddrBook) Wait() {
-       a.wg.Wait()
-}
-
-func (a *AddrBook) AddOurAddress(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       log.WithField("addr", addr).Info("Add our address to book")
-
-       a.ourAddrs[addr.String()] = addr
-}
-
-func (a *AddrBook) OurAddresses() []*NetAddress {
-       addrs := []*NetAddress{}
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       for _, addr := range a.ourAddrs {
-               addrs = append(addrs, addr)
-       }
-       return addrs
-}
-
-// NOTE: addr must not be nil
-func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       log.WithFields(log.Fields{
-               "addr": addr,
-               "src":  src,
-       }).Debug("Add address to book")
-       a.addAddress(addr, src)
-}
-
-func (a *AddrBook) NeedMoreAddrs() bool {
-       return a.Size() < needAddressThreshold
-}
-
-func (a *AddrBook) Size() int {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       return a.size()
-}
-
-func (a *AddrBook) size() int {
-       return a.nNew + a.nOld
-}
-
-// Pick an address to connect to with new/old bias.
-func (a *AddrBook) PickAddress(newBias int) *NetAddress {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       if a.size() == 0 {
-               return nil
-       }
-       if newBias > 100 {
-               newBias = 100
-       }
-       if newBias < 0 {
-               newBias = 0
-       }
-
-       // Bias between new and old addresses.
-       oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
-       newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
-
-       if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
-               // pick random Old bucket.
-               var bucket map[string]*knownAddress = nil
-               num := 0
-               for len(bucket) == 0 && num < oldBucketCount {
-                       bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
-                       num++
-               }
-               if num == oldBucketCount {
-                       return nil
-               }
-               // pick a random ka from bucket.
-               randIndex := a.rand.Intn(len(bucket))
-               for _, ka := range bucket {
-                       if randIndex == 0 {
-                               return ka.Addr
-                       }
-                       randIndex--
-               }
-               cmn.PanicSanity("Should not happen")
-       } else {
-               // pick random New bucket.
-               var bucket map[string]*knownAddress = nil
-               num := 0
-               for len(bucket) == 0 && num < newBucketCount {
-                       bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
-                       num++
-               }
-               if num == newBucketCount {
-                       return nil
-               }
-               // pick a random ka from bucket.
-               randIndex := a.rand.Intn(len(bucket))
-               for _, ka := range bucket {
-                       if randIndex == 0 {
-                               return ka.Addr
-                       }
-                       randIndex--
-               }
-               cmn.PanicSanity("Should not happen")
-       }
-       return nil
-}
-
-func (a *AddrBook) MarkGood(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       ka := a.addrLookup[addr.String()]
-       if ka == nil {
-               return
-       }
-       ka.markGood()
-       if ka.isNew() {
-               a.moveToOld(ka)
-       }
-}
-
-func (a *AddrBook) MarkAttempt(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       ka := a.addrLookup[addr.String()]
-       if ka == nil {
-               return
-       }
-       ka.markAttempt()
-}
-
-// MarkBad currently just ejects the address. In the future, consider
-// blacklisting.
-func (a *AddrBook) MarkBad(addr *NetAddress) {
-       a.RemoveAddress(addr)
-}
-
-// RemoveAddress removes the address from the book.
-func (a *AddrBook) RemoveAddress(addr *NetAddress) {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       ka := a.addrLookup[addr.String()]
-       if ka == nil {
-               return
-       }
-       log.WithField("addr", addr).Info("Remove address from book")
-       a.removeFromAllBuckets(ka)
-}
-
-/* Peer exchange */
-
-// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
-func (a *AddrBook) GetSelection() []*NetAddress {
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-
-       if a.size() == 0 {
-               return nil
-       }
-
-       allAddr := make([]*NetAddress, a.size())
-       i := 0
-       for _, v := range a.addrLookup {
-               allAddr[i] = v.Addr
-               i++
-       }
-
-       numAddresses := cmn.MaxInt(
-               cmn.MinInt(minGetSelection, len(allAddr)),
-               len(allAddr)*getSelectionPercent/100)
-       numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
-       // Fisher-Yates shuffle the array. We only need to do the first
-       // `numAddresses' since we are throwing the rest.
-       for i := 0; i < numAddresses; i++ {
-               // pick a number between current index and the end
-               j := rand.Intn(len(allAddr)-i) + i
-               allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
-       }
-
-       // slice off the limit we are willing to share.
-       return allAddr[:numAddresses]
-}
-
-/* Loading & Saving */
-
-type addrBookJSON struct {
-       Key   string
-       Addrs []*knownAddress
-}
-
-func (a *AddrBook) saveToFile(filePath string) {
-       log.WithField("size", a.Size()).Info("Saving AddrBook to file")
-
-       a.mtx.Lock()
-       defer a.mtx.Unlock()
-       // Compile Addrs
-       addrs := []*knownAddress{}
-       for _, ka := range a.addrLookup {
-               addrs = append(addrs, ka)
-       }
-
-       aJSON := &addrBookJSON{
-               Key:   a.key,
-               Addrs: addrs,
-       }
-
-       jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
-       if err != nil {
-               log.WithField("err", err).Error("Failed to save AddrBook to file")
-               return
-       }
-       err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
-       if err != nil {
-               log.WithFields(log.Fields{
-                       "file": filePath,
-                       "err":  err,
-               }).Error("Failed to save AddrBook to file")
-       }
-}
-
-// Returns false if file does not exist.
-// cmn.Panics if file is corrupt.
-func (a *AddrBook) loadFromFile(filePath string) bool {
-       // If doesn't exist, do nothing.
-       _, err := os.Stat(filePath)
-       if os.IsNotExist(err) {
-               return false
-       }
-
-       // Load addrBookJSON{}
-       r, err := os.Open(filePath)
-       if err != nil {
-               cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
-       }
-       defer r.Close()
-       aJSON := &addrBookJSON{}
-       dec := json.NewDecoder(r)
-       err = dec.Decode(aJSON)
-       if err != nil {
-               cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
-       }
-
-       // Restore all the fields...
-       // Restore the key
-       a.key = aJSON.Key
-       // Restore .addrNew & .addrOld
-       for _, ka := range aJSON.Addrs {
-               for _, bucketIndex := range ka.Buckets {
-                       bucket := a.getBucket(ka.BucketType, bucketIndex)
-                       bucket[ka.Addr.String()] = ka
-               }
-               a.addrLookup[ka.Addr.String()] = ka
-               if ka.BucketType == bucketTypeNew {
-                       a.nNew++
-               } else {
-                       a.nOld++
-               }
-       }
-       return true
-}
-
-// Save saves the book.
-func (a *AddrBook) Save() {
-       log.WithField("size", a.Size()).Info("Saving AddrBook to file")
-       a.saveToFile(a.filePath)
-}
-
-/* Private methods */
-
-func (a *AddrBook) saveRoutine() {
-       dumpAddressTicker := time.NewTicker(dumpAddressInterval)
-out:
-       for {
-               select {
-               case <-dumpAddressTicker.C:
-                       a.saveToFile(a.filePath)
-               case <-a.Quit:
-                       break out
-               }
-       }
-       dumpAddressTicker.Stop()
-       a.saveToFile(a.filePath)
-       a.wg.Done()
-       log.Info("Address handler done")
-}
-
-func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
-       switch bucketType {
-       case bucketTypeNew:
-               return a.addrNew[bucketIdx]
-       case bucketTypeOld:
-               return a.addrOld[bucketIdx]
-       default:
-               cmn.PanicSanity("Should not happen")
-               return nil
-       }
-}
-
-// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
-// NOTE: currently it always returns true.
-func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
-       // Sanity check
-       if ka.isOld() {
-               log.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka))
-               return false
-       }
-
-       addrStr := ka.Addr.String()
-       bucket := a.getBucket(bucketTypeNew, bucketIdx)
-
-       // Already exists?
-       if _, ok := bucket[addrStr]; ok {
-               return true
-       }
-
-       // Enforce max addresses.
-       if len(bucket) > newBucketSize {
-               log.Info("new bucket is full, expiring old ")
-               a.expireNew(bucketIdx)
-       }
-
-       // Add to bucket.
-       bucket[addrStr] = ka
-       if ka.addBucketRef(bucketIdx) == 1 {
-               a.nNew++
-       }
-
-       // Ensure in addrLookup
-       a.addrLookup[addrStr] = ka
-
-       return true
-}
-
-// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
-func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
-       // Sanity check
-       if ka.isNew() {
-               log.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka))
-               return false
-       }
-       if len(ka.Buckets) != 0 {
-               log.Error(cmn.Fmt("Cannot add already old address to another old bucket: %v", ka))
-               return false
-       }
-
-       addrStr := ka.Addr.String()
-       bucket := a.getBucket(bucketTypeOld, bucketIdx)
-
-       // Already exists?
-       if _, ok := bucket[addrStr]; ok {
-               return true
-       }
-
-       // Enforce max addresses.
-       if len(bucket) > oldBucketSize {
-               return false
-       }
-
-       // Add to bucket.
-       bucket[addrStr] = ka
-       if ka.addBucketRef(bucketIdx) == 1 {
-               a.nOld++
-       }
-
-       // Ensure in addrLookup
-       a.addrLookup[addrStr] = ka
-
-       return true
-}
-
-func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
-       if ka.BucketType != bucketType {
-               log.Error(cmn.Fmt("Bucket type mismatch: %v", ka))
-               return
-       }
-       bucket := a.getBucket(bucketType, bucketIdx)
-       delete(bucket, ka.Addr.String())
-       if ka.removeBucketRef(bucketIdx) == 0 {
-               if bucketType == bucketTypeNew {
-                       a.nNew--
-               } else {
-                       a.nOld--
-               }
-               delete(a.addrLookup, ka.Addr.String())
-       }
-}
-
-func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
-       for _, bucketIdx := range ka.Buckets {
-               bucket := a.getBucket(ka.BucketType, bucketIdx)
-               delete(bucket, ka.Addr.String())
-       }
-       ka.Buckets = nil
-       if ka.BucketType == bucketTypeNew {
-               a.nNew--
-       } else {
-               a.nOld--
-       }
-       delete(a.addrLookup, ka.Addr.String())
-}
-
-func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
-       bucket := a.getBucket(bucketType, bucketIdx)
-       var oldest *knownAddress
-       for _, ka := range bucket {
-               if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
-                       oldest = ka
-               }
-       }
-       return oldest
-}
-
-func (a *AddrBook) addAddress(addr, src *NetAddress) {
-       if a.routabilityStrict && !addr.Routable() {
-               log.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
-               return
-       }
-       if _, ok := a.ourAddrs[addr.String()]; ok {
-               // Ignore our own listener address.
-               return
-       }
-
-       ka := a.addrLookup[addr.String()]
-
-       if ka != nil {
-               // Already old.
-               if ka.isOld() {
-                       return
-               }
-               // Already in max new buckets.
-               if len(ka.Buckets) == maxNewBucketsPerAddress {
-                       return
-               }
-               // The more entries we have, the less likely we are to add more.
-               factor := int32(2 * len(ka.Buckets))
-               if a.rand.Int31n(factor) != 0 {
-                       return
-               }
-       } else {
-               ka = newKnownAddress(addr, src)
-       }
-
-       bucket := a.calcNewBucket(addr, src)
-       a.addToNewBucket(ka, bucket)
-
-       log.Debug("Added new address ", "address:", addr, " total:", a.size())
-}
-
-// Make space in the new buckets by expiring the really bad entries.
-// If no bad entries are available we remove the oldest.
-func (a *AddrBook) expireNew(bucketIdx int) {
-       for addrStr, ka := range a.addrNew[bucketIdx] {
-               // If an entry is bad, throw it away
-               if ka.isBad() {
-                       log.Info(cmn.Fmt("expiring bad address %v", addrStr))
-                       a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
-                       return
-               }
-       }
-
-       // If we haven't thrown out a bad entry, throw out the oldest entry
-       oldest := a.pickOldest(bucketTypeNew, bucketIdx)
-       a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
-}
-
-// Promotes an address from new to old.
-// TODO: Move to old probabilistically.
-// The better a node is, the less likely it should be evicted from an old bucket.
-func (a *AddrBook) moveToOld(ka *knownAddress) {
-       // Sanity check
-       if ka.isOld() {
-               log.Error(cmn.Fmt("Cannot promote address that is already old %v", ka))
-               return
-       }
-       if len(ka.Buckets) == 0 {
-               log.Error(cmn.Fmt("Cannot promote address that isn't in any new buckets %v", ka))
-               return
-       }
-
-       // Remember one of the buckets in which ka is in.
-       freedBucket := ka.Buckets[0]
-       // Remove from all (new) buckets.
-       a.removeFromAllBuckets(ka)
-       // It's officially old now.
-       ka.BucketType = bucketTypeOld
-
-       // Try to add it to its oldBucket destination.
-       oldBucketIdx := a.calcOldBucket(ka.Addr)
-       added := a.addToOldBucket(ka, oldBucketIdx)
-       if !added {
-               // No room, must evict something
-               oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
-               a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
-               // Find new bucket to put oldest in
-               newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
-               added := a.addToNewBucket(oldest, newBucketIdx)
-               // No space in newBucket either, just put it in freedBucket from above.
-               if !added {
-                       added := a.addToNewBucket(oldest, freedBucket)
-                       if !added {
-                               log.Error(cmn.Fmt("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket))
-                       }
-               }
-               // Finally, add to bucket again.
-               added = a.addToOldBucket(ka, oldBucketIdx)
-               if !added {
-                       log.Error(cmn.Fmt("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx))
-               }
-       }
-}
-
-// doublesha256(  key + sourcegroup +
-//                int64(doublesha256(key + group + sourcegroup))%bucket_per_group  ) % num_new_buckets
-func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
-       data1 := []byte{}
-       data1 = append(data1, []byte(a.key)...)
-       data1 = append(data1, []byte(a.groupKey(addr))...)
-       data1 = append(data1, []byte(a.groupKey(src))...)
-       hash1 := doubleSha256(data1)
-       hash64 := binary.BigEndian.Uint64(hash1)
-       hash64 %= newBucketsPerGroup
-       var hashbuf [8]byte
-       binary.BigEndian.PutUint64(hashbuf[:], hash64)
-       data2 := []byte{}
-       data2 = append(data2, []byte(a.key)...)
-       data2 = append(data2, a.groupKey(src)...)
-       data2 = append(data2, hashbuf[:]...)
-
-       hash2 := doubleSha256(data2)
-       return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
-}
-
-// doublesha256(  key + group +
-//                int64(doublesha256(key + addr))%buckets_per_group  ) % num_old_buckets
-func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
-       data1 := []byte{}
-       data1 = append(data1, []byte(a.key)...)
-       data1 = append(data1, []byte(addr.String())...)
-       hash1 := doubleSha256(data1)
-       hash64 := binary.BigEndian.Uint64(hash1)
-       hash64 %= oldBucketsPerGroup
-       var hashbuf [8]byte
-       binary.BigEndian.PutUint64(hashbuf[:], hash64)
-       data2 := []byte{}
-       data2 = append(data2, []byte(a.key)...)
-       data2 = append(data2, a.groupKey(addr)...)
-       data2 = append(data2, hashbuf[:]...)
-
-       hash2 := doubleSha256(data2)
-       return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
-}
-
-// Return a string representing the network group of this address.
-// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string
-// "local" for a local address and the string "unroutable for an unroutable
-// address.
-func (a *AddrBook) groupKey(na *NetAddress) string {
-       if a.routabilityStrict && na.Local() {
-               return "local"
-       }
-       if a.routabilityStrict && !na.Routable() {
-               return "unroutable"
-       }
-
-       if ipv4 := na.IP.To4(); ipv4 != nil {
-               return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
-       }
-       if na.RFC6145() || na.RFC6052() {
-               // last four bytes are the ip address
-               ip := net.IP(na.IP[12:16])
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-       }
-
-       if na.RFC3964() {
-               ip := net.IP(na.IP[2:7])
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-
-       }
-       if na.RFC4380() {
-               // teredo tunnels have the last 4 bytes as the v4 address XOR
-               // 0xff.
-               ip := net.IP(make([]byte, 4))
-               for i, byte := range na.IP[12:16] {
-                       ip[i] = byte ^ 0xff
-               }
-               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-       }
-
-       // OK, so now we know ourselves to be a IPv6 address.
-       // bitcoind uses /32 for everything, except for Hurricane Electric's
-       // (he.net) IP range, which it uses /36 for.
-       bits := 32
-       heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
-               Mask: net.CIDRMask(32, 128)}
-       if heNet.Contains(na.IP) {
-               bits = 36
-       }
-
-       return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
-}
-
-//-----------------------------------------------------------------------------
-
-/*
-   knownAddress
-
-   tracks information about a known network address that is used
-   to determine how viable an address is.
-*/
-type knownAddress struct {
-       Addr        *NetAddress
-       Src         *NetAddress
-       Attempts    int32
-       LastAttempt time.Time
-       LastSuccess time.Time
-       BucketType  byte
-       Buckets     []int
-}
-
-func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress {
-       return &knownAddress{
-               Addr:        addr,
-               Src:         src,
-               Attempts:    0,
-               LastAttempt: time.Now(),
-               BucketType:  bucketTypeNew,
-               Buckets:     nil,
-       }
-}
-
-func (ka *knownAddress) isOld() bool {
-       return ka.BucketType == bucketTypeOld
-}
-
-func (ka *knownAddress) isNew() bool {
-       return ka.BucketType == bucketTypeNew
-}
-
-func (ka *knownAddress) markAttempt() {
-       now := time.Now()
-       ka.LastAttempt = now
-       ka.Attempts += 1
-}
-
-func (ka *knownAddress) markGood() {
-       now := time.Now()
-       ka.LastAttempt = now
-       ka.Attempts = 0
-       ka.LastSuccess = now
-}
-
-func (ka *knownAddress) addBucketRef(bucketIdx int) int {
-       for _, bucket := range ka.Buckets {
-               if bucket == bucketIdx {
-                       // TODO refactor to return error?
-                       // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka))
-                       return -1
-               }
-       }
-       ka.Buckets = append(ka.Buckets, bucketIdx)
-       return len(ka.Buckets)
-}
-
-func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
-       buckets := []int{}
-       for _, bucket := range ka.Buckets {
-               if bucket != bucketIdx {
-                       buckets = append(buckets, bucket)
-               }
-       }
-       if len(buckets) != len(ka.Buckets)-1 {
-               // TODO refactor to return error?
-               // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka))
-               return -1
-       }
-       ka.Buckets = buckets
-       return len(ka.Buckets)
-}
-
-/*
-   An address is bad if the address in question has not been tried in the last
-   minute and meets one of the following criteria:
-
-   1) It claims to be from the future
-   2) It hasn't been seen in over a month
-   3) It has failed at least three times and never succeeded
-   4) It has failed ten times in the last week
-
-   All addresses that meet these criteria are assumed to be worthless and not
-   worth keeping hold of.
-*/
-func (ka *knownAddress) isBad() bool {
-       // Has been attempted in the last minute --> bad
-       if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
-               return true
-       }
-
-       // Over a month old?
-       if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
-               return true
-       }
-
-       // Never succeeded?
-       if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
-               return true
-       }
-
-       // Hasn't succeeded in too long?
-       if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
-               return true
-       }
-
-       return false
-}
index 0b966f0..7e16b15 100644 (file)
@@ -465,11 +465,7 @@ FOR_LOOP:
                        }
                        channel, ok := c.channelsIdx[pkt.ChannelID]
                        if !ok || channel == nil {
-                               if pkt.ChannelID == PexChannel {
-                                       continue
-                               } else {
-                                       cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
-                               }
+                               cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
                        }
                        msgBytes, err := channel.recvMsgPacket(pkt)
                        if err != nil {
index 4844a2c..a81abbe 100644 (file)
@@ -167,9 +167,6 @@ func (l *DefaultListener) listenRoutine() {
 
        // Cleanup
        close(l.connections)
-       for _ = range l.connections {
-               // Drain
-       }
 }
 
 //Connections a channel of inbound connections. It gets closed when the listener closes.
diff --git a/p2p/pex/addrbook.go b/p2p/pex/addrbook.go
new file mode 100644 (file)
index 0000000..1d3d9d6
--- /dev/null
@@ -0,0 +1,438 @@
+package pex
+
+import (
+       "encoding/binary"
+       "errors"
+       "math"
+       "math/rand"
+       "net"
+       "sync"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       tcrypto "github.com/tendermint/go-crypto"
+       cmn "github.com/tendermint/tmlibs/common"
+
+       "github.com/bytom/crypto"
+       "github.com/bytom/p2p"
+)
+
+// AddrBook - concurrency safe peer address manager.
+type AddrBook struct {
+       cmn.BaseService
+
+       // immutable after creation
+       filePath          string
+       routabilityStrict bool
+       key               string
+
+       mtx        sync.RWMutex
+       rand       *rand.Rand
+       ourAddrs   map[string]*p2p.NetAddress
+       addrLookup map[string]*knownAddress // new & old
+       bucketsNew []map[string]*knownAddress
+       bucketsOld []map[string]*knownAddress
+       nOld       int
+       nNew       int
+}
+
+// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
+func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
+       a := &AddrBook{
+               filePath:          filePath,
+               routabilityStrict: routabilityStrict,
+               key:               tcrypto.CRandHex(24),
+               rand:              rand.New(rand.NewSource(time.Now().UnixNano())),
+               ourAddrs:          make(map[string]*p2p.NetAddress),
+               addrLookup:        make(map[string]*knownAddress),
+               bucketsNew:        make([]map[string]*knownAddress, newBucketCount),
+               bucketsOld:        make([]map[string]*knownAddress, oldBucketCount),
+       }
+       for i := range a.bucketsNew {
+               a.bucketsNew[i] = make(map[string]*knownAddress)
+       }
+       for i := range a.bucketsOld {
+               a.bucketsOld[i] = make(map[string]*knownAddress)
+       }
+       a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
+       return a
+}
+
+// OnStart implements Service.
+func (a *AddrBook) OnStart() error {
+       if err := a.BaseService.OnStart(); err != nil {
+               return err
+       }
+
+       if err := a.loadFromFile(); err != nil {
+               return err
+       }
+
+       go a.saveRoutine()
+       return nil
+}
+
+// OnStop implements Service.
+func (a *AddrBook) OnStop() {
+       a.BaseService.OnStop()
+}
+
+// AddAddress add address to address book
+func (a *AddrBook) AddAddress(addr, src *p2p.NetAddress) error {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+       return a.addAddress(addr, src)
+}
+
+// AddOurAddress one of our addresses.
+func (a *AddrBook) AddOurAddress(addr *p2p.NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       a.ourAddrs[addr.String()] = addr
+}
+
+// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
+func (a *AddrBook) GetSelection() []*p2p.NetAddress {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       bookSize := a.size()
+       if bookSize == 0 {
+               return nil
+       }
+
+       numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
+       numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
+       allAddr := []*p2p.NetAddress{}
+       for _, ka := range a.addrLookup {
+               allAddr = append(allAddr, ka.Addr)
+       }
+
+       for i := 0; i < numAddresses; i++ {
+               j := rand.Intn(len(allAddr)-i) + i
+               allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
+       }
+       return allAddr[:numAddresses]
+}
+
+// MarkGood marks the peer as good and moves it into an "old" bucket.
+func (a *AddrBook) MarkGood(addr *p2p.NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       ka := a.addrLookup[addr.String()]
+       if ka == nil {
+               return
+       }
+
+       ka.markGood()
+       if ka.isNew() {
+               if err := a.moveToOld(ka); err != nil {
+                       log.WithField("err", err).Error("fail on move to old bucket")
+               }
+       }
+}
+
+// MarkAttempt marks that an attempt was made to connect to the address.
+func (a *AddrBook) MarkAttempt(addr *p2p.NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       if ka := a.addrLookup[addr.String()]; ka != nil {
+               ka.markAttempt()
+       }
+}
+
+// NeedMoreAddrs check does the address number meet the threshold
+func (a *AddrBook) NeedMoreAddrs() bool {
+       return a.Size() < needAddressThreshold
+}
+
+// PickAddress picks a random address from random bucket
+func (a *AddrBook) PickAddress(bias int) *p2p.NetAddress {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       if a.size() == 0 {
+               return nil
+       }
+
+       // make sure bias is in the range [0, 100]
+       if bias > 100 {
+               bias = 100
+       } else if bias < 0 {
+               bias = 0
+       }
+
+       oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
+       newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
+       pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
+       if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
+               return nil
+       }
+
+       var bucket map[string]*knownAddress
+       for len(bucket) == 0 {
+               if pickFromOldBucket {
+                       bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
+               } else {
+                       bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
+               }
+       }
+
+       randIndex := a.rand.Intn(len(bucket))
+       for _, ka := range bucket {
+               if randIndex == 0 {
+                       return ka.Addr
+               }
+               randIndex--
+       }
+       return nil
+}
+
+// RemoveAddress removes the address from the book.
+func (a *AddrBook) RemoveAddress(addr *p2p.NetAddress) {
+       a.mtx.Lock()
+       defer a.mtx.Unlock()
+
+       if ka := a.addrLookup[addr.String()]; ka != nil {
+               a.removeFromAllBuckets(ka)
+       }
+}
+
+// Size count the number of know address
+func (a *AddrBook) Size() int {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+       return a.size()
+}
+
+func (a *AddrBook) addAddress(addr, src *p2p.NetAddress) error {
+       if addr == nil || src == nil {
+               return errors.New("can't add nil to address book")
+       }
+       if _, ok := a.ourAddrs[addr.String()]; ok {
+               return errors.New("add ourselves to address book")
+       }
+       if a.routabilityStrict && !addr.Routable() {
+               return errors.New("cannot add non-routable address")
+       }
+
+       ka := a.addrLookup[addr.String()]
+       if ka != nil {
+               if ka.isOld() {
+                       return nil
+               }
+               if len(ka.Buckets) == maxNewBucketsPerAddress {
+                       return nil
+               }
+               if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
+                       return nil
+               }
+       } else {
+               ka = newKnownAddress(addr, src)
+       }
+
+       bucket := a.calcNewBucket(addr, src)
+       return a.addToNewBucket(ka, bucket)
+}
+
+func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) error {
+       if ka.isOld() {
+               return errors.New("cant add old address to new bucket")
+       }
+
+       addrStr := ka.Addr.String()
+       bucket := a.getBucket(bucketTypeNew, bucketIdx)
+       if _, ok := bucket[addrStr]; ok {
+               return nil
+       }
+
+       if len(bucket) > newBucketSize {
+               a.expireNew(bucketIdx)
+       }
+
+       bucket[addrStr] = ka
+       a.addrLookup[addrStr] = ka
+       if ka.addBucketRef(bucketIdx) == 1 {
+               a.nNew++
+       }
+       return nil
+}
+
+func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) error {
+       if ka.isNew() {
+               return errors.New("cannot add old address to new bucket")
+       }
+       if len(ka.Buckets) != 0 {
+               return errors.New("cannot add already old address to another old bucket")
+       }
+
+       bucket := a.getBucket(bucketTypeOld, bucketIdx)
+       if len(bucket) > oldBucketSize {
+               return errors.New("old bucket is full")
+       }
+
+       addrStr := ka.Addr.String()
+       bucket[addrStr] = ka
+       a.addrLookup[addrStr] = ka
+       if ka.addBucketRef(bucketIdx) == 1 {
+               a.nOld++
+       }
+       return nil
+}
+
+func (a *AddrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
+       data1 := []byte{}
+       data1 = append(data1, []byte(a.key)...)
+       data1 = append(data1, []byte(a.groupKey(addr))...)
+       data1 = append(data1, []byte(a.groupKey(src))...)
+       hash1 := crypto.DoubleSha256(data1)
+       hash64 := binary.BigEndian.Uint64(hash1)
+       hash64 %= newBucketsPerGroup
+       var hashbuf [8]byte
+       binary.BigEndian.PutUint64(hashbuf[:], hash64)
+       data2 := []byte{}
+       data2 = append(data2, []byte(a.key)...)
+       data2 = append(data2, a.groupKey(src)...)
+       data2 = append(data2, hashbuf[:]...)
+
+       hash2 := crypto.DoubleSha256(data2)
+       return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
+}
+
+func (a *AddrBook) calcOldBucket(addr *p2p.NetAddress) int {
+       data1 := []byte{}
+       data1 = append(data1, []byte(a.key)...)
+       data1 = append(data1, []byte(addr.String())...)
+       hash1 := crypto.DoubleSha256(data1)
+       hash64 := binary.BigEndian.Uint64(hash1)
+       hash64 %= oldBucketsPerGroup
+       var hashbuf [8]byte
+       binary.BigEndian.PutUint64(hashbuf[:], hash64)
+       data2 := []byte{}
+       data2 = append(data2, []byte(a.key)...)
+       data2 = append(data2, a.groupKey(addr)...)
+       data2 = append(data2, hashbuf[:]...)
+
+       hash2 := crypto.DoubleSha256(data2)
+       return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
+}
+
+func (a *AddrBook) expireNew(bucketIdx int) {
+       for _, ka := range a.bucketsNew[bucketIdx] {
+               if ka.isBad() {
+                       a.removeFromBucket(ka, bucketIdx)
+                       return
+               }
+       }
+
+       oldest := a.pickOldest(bucketTypeNew, bucketIdx)
+       a.removeFromBucket(oldest, bucketIdx)
+}
+
+func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
+       switch bucketType {
+       case bucketTypeNew:
+               return a.bucketsNew[bucketIdx]
+       case bucketTypeOld:
+               return a.bucketsOld[bucketIdx]
+       default:
+               log.Error("try to access an unknow address book bucket type")
+               return nil
+       }
+}
+
+func (a *AddrBook) groupKey(na *p2p.NetAddress) string {
+       if a.routabilityStrict && na.Local() {
+               return "local"
+       }
+       if a.routabilityStrict && !na.Routable() {
+               return "unroutable"
+       }
+       if ipv4 := na.IP.To4(); ipv4 != nil {
+               return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
+       }
+       if na.RFC6145() || na.RFC6052() {
+               // last four bytes are the ip address
+               ip := net.IP(na.IP[12:16])
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+       }
+       if na.RFC3964() {
+               ip := net.IP(na.IP[2:7])
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+
+       }
+       if na.RFC4380() {
+               // teredo tunnels have the last 4 bytes as the v4 address XOR 0xff.
+               ip := net.IP(make([]byte, 4))
+               for i, byte := range na.IP[12:16] {
+                       ip[i] = byte ^ 0xff
+               }
+               return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+       }
+
+       bits := 32
+       heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)}
+       if heNet.Contains(na.IP) {
+               bits = 36
+       }
+       return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
+}
+
+func (a *AddrBook) moveToOld(ka *knownAddress) error {
+       if ka.isOld() {
+               return errors.New("cannot promote address that is already old")
+       }
+       if len(ka.Buckets) == 0 {
+               return errors.New("cannot promote address that isn't in any new buckets")
+       }
+
+       a.removeFromAllBuckets(ka)
+       ka.BucketType = bucketTypeOld
+       oldBucketIdx := a.calcOldBucket(ka.Addr)
+       return a.addToOldBucket(ka, oldBucketIdx)
+}
+
+func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
+       bucket := a.getBucket(bucketType, bucketIdx)
+       var oldest *knownAddress
+       for _, ka := range bucket {
+               if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
+                       oldest = ka
+               }
+       }
+       return oldest
+}
+
+func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
+       delete(a.addrLookup, ka.Addr.String())
+       for _, bucketIdx := range ka.Buckets {
+               bucket := a.getBucket(ka.BucketType, bucketIdx)
+               delete(bucket, ka.Addr.String())
+       }
+       ka.Buckets = nil
+       if ka.BucketType == bucketTypeNew {
+               a.nNew--
+       } else {
+               a.nOld--
+       }
+}
+
+func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketIdx int) {
+       bucket := a.getBucket(ka.BucketType, bucketIdx)
+       delete(bucket, ka.Addr.String())
+       if ka.removeBucketRef(bucketIdx) == 0 {
+               delete(a.addrLookup, ka.Addr.String())
+               if ka.BucketType == bucketTypeNew {
+                       a.nNew--
+               } else {
+                       a.nOld--
+               }
+       }
+}
+
+func (a *AddrBook) size() int {
+       return a.nNew + a.nOld
+}
similarity index 76%
rename from p2p/addrbook_test.go
rename to p2p/pex/addrbook_test.go
index a201a72..720d023 100644 (file)
@@ -1,6 +1,5 @@
 // +build !network
-
-package p2p
+package pex
 
 import (
        "fmt"
@@ -10,55 +9,12 @@ import (
 
        "github.com/stretchr/testify/assert"
        "github.com/tendermint/tmlibs/log"
-)
-
-func createTempFileName(prefix string) string {
-       f, err := ioutil.TempFile("", prefix)
-       if err != nil {
-               panic(err)
-       }
-       fname := f.Name()
-       err = f.Close()
-       if err != nil {
-               panic(err)
-       }
-       return fname
-}
-
-func TestAddrBookSaveLoad(t *testing.T) {
-       fname := createTempFileName("addrbook_test")
-
-       // 0 addresses
-       book := NewAddrBook(fname, true)
-       book.SetLogger(log.TestingLogger())
-       book.saveToFile(fname)
-
-       book = NewAddrBook(fname, true)
-       book.SetLogger(log.TestingLogger())
-       book.loadFromFile(fname)
 
-       assert.Zero(t, book.Size())
-
-       // 100 addresses
-       randAddrs := randNetAddressPairs(t, 100)
-
-       for _, addrSrc := range randAddrs {
-               book.AddAddress(addrSrc.addr, addrSrc.src)
-       }
-
-       assert.Equal(t, 100, book.Size())
-       book.saveToFile(fname)
-
-       book = NewAddrBook(fname, true)
-       book.SetLogger(log.TestingLogger())
-       book.loadFromFile(fname)
-
-       assert.Equal(t, 100, book.Size())
-}
+       "github.com/bytom/p2p"
+)
 
 func TestAddrBookLookup(t *testing.T) {
        fname := createTempFileName("addrbook_test")
-
        randAddrs := randNetAddressPairs(t, 100)
 
        book := NewAddrBook(fname, true)
@@ -81,7 +37,6 @@ func TestAddrBookPromoteToOld(t *testing.T) {
        fname := createTempFileName("addrbook_test")
 
        randAddrs := randNetAddressPairs(t, 100)
-
        book := NewAddrBook(fname, true)
        book.SetLogger(log.TestingLogger())
        for _, addrSrc := range randAddrs {
@@ -100,8 +55,6 @@ func TestAddrBookPromoteToOld(t *testing.T) {
                }
        }
 
-       // TODO: do more testing :)
-
        selection := book.GetSelection()
        t.Logf("selection: %v", selection)
 
@@ -117,7 +70,6 @@ func TestAddrBookHandlesDuplicates(t *testing.T) {
        book.SetLogger(log.TestingLogger())
 
        randAddrs := randNetAddressPairs(t, 100)
-
        differentSrc := randIPv4Address(t)
        for _, addrSrc := range randAddrs {
                book.AddAddress(addrSrc.addr, addrSrc.src)
@@ -128,9 +80,38 @@ func TestAddrBookHandlesDuplicates(t *testing.T) {
        assert.Equal(t, 100, book.Size())
 }
 
+func TestAddrBookRemoveAddress(t *testing.T) {
+       fname := createTempFileName("addrbook_test")
+       book := NewAddrBook(fname, true)
+       book.SetLogger(log.TestingLogger())
+
+       addr := randIPv4Address(t)
+       book.AddAddress(addr, addr)
+       assert.Equal(t, 1, book.Size())
+
+       book.RemoveAddress(addr)
+       assert.Equal(t, 0, book.Size())
+
+       nonExistingAddr := randIPv4Address(t)
+       book.RemoveAddress(nonExistingAddr)
+       assert.Equal(t, 0, book.Size())
+}
+
 type netAddressPair struct {
-       addr *NetAddress
-       src  *NetAddress
+       addr *p2p.NetAddress
+       src  *p2p.NetAddress
+}
+
+func createTempFileName(prefix string) string {
+       f, err := ioutil.TempFile("", prefix)
+       if err != nil {
+               panic(err)
+       }
+       fname := f.Name()
+       if err = f.Close(); err != nil {
+               panic(err)
+       }
+       return fname
 }
 
 func randNetAddressPairs(t *testing.T, n int) []netAddressPair {
@@ -141,7 +122,7 @@ func randNetAddressPairs(t *testing.T, n int) []netAddressPair {
        return randAddrs
 }
 
-func randIPv4Address(t *testing.T) *NetAddress {
+func randIPv4Address(t *testing.T) *p2p.NetAddress {
        for {
                ip := fmt.Sprintf("%v.%v.%v.%v",
                        rand.Intn(254)+1,
@@ -150,27 +131,10 @@ func randIPv4Address(t *testing.T) *NetAddress {
                        rand.Intn(255),
                )
                port := rand.Intn(65535-1) + 1
-               addr, err := NewNetAddressString(fmt.Sprintf("%v:%v", ip, port))
+               addr, err := p2p.NewNetAddressString(fmt.Sprintf("%v:%v", ip, port))
                assert.Nil(t, err, "error generating rand network address")
                if addr.Routable() {
                        return addr
                }
        }
 }
-
-func TestAddrBookRemoveAddress(t *testing.T) {
-       fname := createTempFileName("addrbook_test")
-       book := NewAddrBook(fname, true)
-       book.SetLogger(log.TestingLogger())
-
-       addr := randIPv4Address(t)
-       book.AddAddress(addr, addr)
-       assert.Equal(t, 1, book.Size())
-
-       book.RemoveAddress(addr)
-       assert.Equal(t, 0, book.Size())
-
-       nonExistingAddr := randIPv4Address(t)
-       book.RemoveAddress(nonExistingAddr)
-       assert.Equal(t, 0, book.Size())
-}
diff --git a/p2p/pex/file.go b/p2p/pex/file.go
new file mode 100644 (file)
index 0000000..1402acc
--- /dev/null
@@ -0,0 +1,79 @@
+package pex
+
+import (
+       "encoding/json"
+       "os"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       cmn "github.com/tendermint/tmlibs/common"
+)
+
+type addrBookJSON struct {
+       Key   string
+       Addrs []*knownAddress
+}
+
+// SaveToFile will save the address book to a json file in disk
+func (a *AddrBook) SaveToFile() error {
+       a.mtx.RLock()
+       defer a.mtx.RUnlock()
+
+       aJSON := &addrBookJSON{Key: a.key, Addrs: []*knownAddress{}}
+       for _, ka := range a.addrLookup {
+               aJSON.Addrs = append(aJSON.Addrs, ka)
+       }
+
+       rawDats, err := json.MarshalIndent(aJSON, "", "\t")
+       if err != nil {
+               return err
+       }
+       return cmn.WriteFileAtomic(a.filePath, rawDats, 0644)
+}
+
+func (a *AddrBook) loadFromFile() error {
+       if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
+               return nil
+       }
+
+       r, err := os.Open(a.filePath)
+       if err != nil {
+               return err
+       }
+
+       defer r.Close()
+       aJSON := &addrBookJSON{}
+       if err = json.NewDecoder(r).Decode(aJSON); err != nil {
+               return err
+       }
+
+       a.key = aJSON.Key
+       for _, ka := range aJSON.Addrs {
+               a.addrLookup[ka.Addr.String()] = ka
+               for _, bucketIndex := range ka.Buckets {
+                       bucket := a.getBucket(ka.BucketType, bucketIndex)
+                       bucket[ka.Addr.String()] = ka
+               }
+               if ka.BucketType == bucketTypeNew {
+                       a.nNew++
+               } else {
+                       a.nOld++
+               }
+       }
+       return nil
+}
+
+func (a *AddrBook) saveRoutine() {
+       ticker := time.NewTicker(2 * time.Minute)
+       for {
+               select {
+               case <-ticker.C:
+                       if err := a.SaveToFile(); err != nil {
+                               log.WithField("err", err).Error("failed to save AddrBook to file")
+                       }
+               case <-a.Quit:
+                       a.SaveToFile()
+                       return
+               }
+       }
+}
diff --git a/p2p/pex/file_test.go b/p2p/pex/file_test.go
new file mode 100644 (file)
index 0000000..24a3751
--- /dev/null
@@ -0,0 +1,69 @@
+package pex
+
+import (
+       "os"
+       "testing"
+)
+
+func TestFileStorage(t *testing.T) {
+       file := createTempFileName("TestFileStorage")
+       defer os.Remove(file)
+
+       a := NewAddrBook(file, true)
+       for i := 1; i < 256; i++ {
+               if err := a.addAddress(randIPv4Address(t), randIPv4Address(t)); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       i := 0
+       for _, ka := range a.addrLookup {
+               i++
+               if i%7 != 0 {
+                       continue
+               }
+               if err := a.moveToOld(ka); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       if err := a.SaveToFile(); err != nil {
+               t.Fatal(err)
+       }
+
+       // load address book b from file
+       b := NewAddrBook(file, true)
+       if err := b.loadFromFile(); err != nil {
+               t.Fatal(err)
+       }
+
+       for key, want := range a.addrLookup {
+               got, ok := b.addrLookup[key]
+               if !ok {
+                       t.Errorf("can't find %s in loaded address book", key)
+               }
+               if !want.Addr.Equals(got.Addr) || !want.Src.Equals(got.Src) {
+                       t.Errorf("addrLookup check want %v but get %v", want, got)
+               }
+       }
+
+       for i, aBucket := range a.bucketsNew {
+               bBucket := b.bucketsNew[i]
+               for j, want := range aBucket {
+                       got := bBucket[j]
+                       if !want.Addr.Equals(got.Addr) {
+                               t.Errorf("new bucket check want %v but get %v", want, got)
+                       }
+               }
+       }
+
+       for i, aBucket := range a.bucketsOld {
+               bBucket := b.bucketsOld[i]
+               for j, want := range aBucket {
+                       got := bBucket[j]
+                       if !want.Addr.Equals(got.Addr) {
+                               t.Errorf("old bucket check want %v but get %v", want, got)
+                       }
+               }
+       }
+}
diff --git a/p2p/pex/know_address.go b/p2p/pex/know_address.go
new file mode 100644 (file)
index 0000000..37c511e
--- /dev/null
@@ -0,0 +1,91 @@
+package pex
+
+import (
+       "time"
+
+       "github.com/bytom/p2p"
+)
+
+type knownAddress struct {
+       Addr        *p2p.NetAddress
+       Src         *p2p.NetAddress
+       Attempts    int32
+       LastAttempt time.Time
+       LastSuccess time.Time
+       BucketType  byte
+       Buckets     []int
+}
+
+func newKnownAddress(addr, src *p2p.NetAddress) *knownAddress {
+       return &knownAddress{
+               Addr:        addr,
+               Src:         src,
+               Attempts:    0,
+               LastAttempt: time.Now(),
+               BucketType:  bucketTypeNew,
+               Buckets:     nil,
+       }
+}
+
+func (ka *knownAddress) addBucketRef(bucketIdx int) int {
+       for _, bucket := range ka.Buckets {
+               if bucket == bucketIdx {
+                       return -1
+               }
+       }
+       ka.Buckets = append(ka.Buckets, bucketIdx)
+       return len(ka.Buckets)
+}
+
+func (ka *knownAddress) isBad() bool {
+       if ka.BucketType == bucketTypeOld {
+               return false
+       }
+       if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
+               return true
+       }
+       if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
+               return true
+       }
+       if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
+               return true
+       }
+       if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
+               return true
+       }
+       return false
+}
+
+func (ka *knownAddress) isOld() bool {
+       return ka.BucketType == bucketTypeOld
+}
+
+func (ka *knownAddress) isNew() bool {
+       return ka.BucketType == bucketTypeNew
+}
+
+func (ka *knownAddress) markAttempt() {
+       ka.LastAttempt = time.Now()
+       ka.Attempts++
+}
+
+func (ka *knownAddress) markGood() {
+       now := time.Now()
+       ka.LastAttempt = now
+       ka.LastSuccess = now
+       ka.Attempts = 0
+}
+
+func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
+       buckets := []int{}
+       for _, bucket := range ka.Buckets {
+               if bucket != bucketIdx {
+                       buckets = append(buckets, bucket)
+               }
+       }
+       if len(buckets) != len(ka.Buckets)-1 {
+               return -1
+       }
+       ka.Buckets = buckets
+       return len(ka.Buckets)
+}
diff --git a/p2p/pex/params.go b/p2p/pex/params.go
new file mode 100644 (file)
index 0000000..9a02065
--- /dev/null
@@ -0,0 +1,24 @@
+package pex
+
+const (
+       bucketTypeNew = 0x01
+       bucketTypeOld = 0x02
+
+       oldBucketSize      = 64
+       oldBucketCount     = 64
+       oldBucketsPerGroup = 4
+       newBucketSize      = 64
+       newBucketCount     = 256
+       newBucketsPerGroup = 32
+
+       getSelectionPercent = 23
+       minGetSelection     = 32
+       maxGetSelection     = 250
+
+       needAddressThreshold    = 1000 // addresses under which the address manager will claim to need more addresses.
+       maxNewBucketsPerAddress = 4    // buckets a frequently seen new address may end up in.
+       numMissingDays          = 30   // days before which we assume an address has vanished
+       numRetries              = 3    // tries without a single success before we assume an address is bad.
+       maxFailures             = 10   // max failures we will accept without a success before considering an address bad.
+       minBadDays              = 7    // days since the last success before we will consider evicting an address.
+)
diff --git a/p2p/pex/pex_message.go b/p2p/pex/pex_message.go
new file mode 100644 (file)
index 0000000..329c2b1
--- /dev/null
@@ -0,0 +1,44 @@
+package pex
+
+import (
+       "bytes"
+       "fmt"
+
+       wire "github.com/tendermint/go-wire"
+
+       "github.com/bytom/p2p"
+)
+
+const (
+       msgTypeRequest = byte(0x01)
+       msgTypeAddrs   = byte(0x02)
+)
+
+// PexMessage is a primary type for PEX messages. Underneath, it could contain
+// either pexRequestMessage, or pexAddrsMessage messages.
+type PexMessage interface{}
+
+var _ = wire.RegisterInterface(
+       struct{ PexMessage }{},
+       wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
+       wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
+)
+
+// DecodeMessage implements interface registered above.
+func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
+       msgType = bz[0]
+       n := new(int)
+       r := bytes.NewReader(bz)
+       msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
+       return
+}
+
+type pexRequestMessage struct{}
+
+func (m *pexRequestMessage) String() string { return "[pexRequest]" }
+
+type pexAddrsMessage struct {
+       Addrs []*p2p.NetAddress
+}
+
+func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
diff --git a/p2p/pex/pex_reactor.go b/p2p/pex/pex_reactor.go
new file mode 100644 (file)
index 0000000..89edc87
--- /dev/null
@@ -0,0 +1,304 @@
+package pex
+
+import (
+       "errors"
+       "math/rand"
+       "reflect"
+       "strings"
+       "sync"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       cmn "github.com/tendermint/tmlibs/common"
+
+       "github.com/bytom/p2p"
+)
+
+const (
+       // PexChannel is a channel for PEX messages
+       PexChannel = byte(0x00)
+
+       minNumOutboundPeers      = 5
+       maxPexMessageSize        = 1048576 // 1MB
+       defaultMaxMsgCountByPeer = uint16(1000)
+)
+
+// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
+type PEXReactor struct {
+       p2p.BaseReactor
+       book           *AddrBook
+       msgCountByPeer *cmn.CMap
+}
+
+// NewPEXReactor creates new PEX reactor.
+func NewPEXReactor(b *AddrBook) *PEXReactor {
+       r := &PEXReactor{
+               book:           b,
+               msgCountByPeer: cmn.NewCMap(),
+       }
+       r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
+       return r
+}
+
+// OnStart implements BaseService
+func (r *PEXReactor) OnStart() error {
+       r.BaseReactor.OnStart()
+       if _, err := r.book.Start(); err != nil {
+               return err
+       }
+
+       go r.ensurePeersRoutine()
+       go r.flushMsgCountByPeer()
+       return nil
+}
+
+// OnStop implements BaseService
+func (r *PEXReactor) OnStop() {
+       r.BaseReactor.OnStop()
+       r.book.Stop()
+}
+
+// GetChannels implements Reactor
+func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
+       return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
+               ID:                PexChannel,
+               Priority:          1,
+               SendQueueCapacity: 10,
+       }}
+}
+
+// AddPeer adding peer to the address book
+func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
+       if p.IsOutbound() {
+               if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
+                       return errors.New("Send pex message fail")
+               }
+               return nil
+       }
+
+       addr, err := p2p.NewNetAddressString(p.ListenAddr)
+       if err != nil {
+               return errors.New("addPeer: invalid peer address")
+       }
+
+       if err := r.book.AddAddress(addr, addr); err != nil {
+               return err
+       }
+
+       if r.Switch.Peers().Size() >= r.Switch.Config.MaxNumPeers {
+               if r.SendAddrs(p, r.book.GetSelection()) {
+                       <-time.After(1 * time.Second)
+                       r.Switch.StopPeerGracefully(p)
+               }
+               return errors.New("addPeer: reach the max peer, exchange then close")
+       }
+       return nil
+}
+
+// Receive implements Reactor by handling incoming PEX messages.
+func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
+       srcAddr := p.Connection().RemoteAddress
+       srcAddrStr := srcAddr.String()
+       r.incrementMsgCount(srcAddrStr)
+       if r.reachedMaxMsgLimit(srcAddrStr) {
+               log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+               r.Switch.StopPeerGracefully(p)
+               return
+       }
+
+       _, msg, err := DecodeMessage(rawMsg)
+       if err != nil {
+               log.WithField("error", err).Error("failed to decoding pex message")
+               r.Switch.StopPeerGracefully(p)
+               return
+       }
+
+       switch msg := msg.(type) {
+       case *pexRequestMessage:
+               if !r.SendAddrs(p, r.book.GetSelection()) {
+                       log.Error("failed to send pex address message")
+               }
+
+       case *pexAddrsMessage:
+               for _, addr := range msg.Addrs {
+                       if err := r.book.AddAddress(addr, srcAddr); err != nil {
+                               log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
+                               r.Switch.StopPeerGracefully(p)
+                               return
+                       }
+               }
+
+       default:
+               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
+       }
+}
+
+// RemovePeer implements Reactor.
+func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
+
+// RequestAddrs asks peer for more addresses.
+func (r *PEXReactor) RequestAddrs(p *p2p.Peer) bool {
+       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
+       if !ok {
+               r.Switch.StopPeerGracefully(p)
+       }
+       return ok
+}
+
+// SendAddrs sends addrs to the peer.
+func (r *PEXReactor) SendAddrs(p *p2p.Peer, addrs []*p2p.NetAddress) bool {
+       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
+       if !ok {
+               r.Switch.StopPeerGracefully(p)
+       }
+       return ok
+}
+
+func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
+       if err := r.Switch.DialPeerWithAddress(a); err != nil {
+               r.book.MarkAttempt(a)
+       } else {
+               r.book.MarkGood(a)
+       }
+       wg.Done()
+}
+
+func (r *PEXReactor) dialSeeds() {
+       if r.Switch.Config.Seeds == "" {
+               return
+       }
+
+       seeds := strings.Split(r.Switch.Config.Seeds, ",")
+       netAddrs, err := p2p.NewNetAddressStrings(seeds)
+       if err != nil {
+               log.WithField("err", err).Error("dialSeeds: fail to decode net address strings")
+       }
+
+       ourAddr, err := p2p.NewNetAddressString(r.Switch.NodeInfo().ListenAddr)
+       if err != nil {
+               log.WithField("err", err).Error("dialSeeds: fail to get our address")
+       }
+
+       for _, netAddr := range netAddrs {
+               if netAddr.Equals(ourAddr) {
+                       continue
+               }
+               if err := r.book.AddAddress(netAddr, ourAddr); err != nil {
+                       log.WithField("err", err).Warn("dialSeeds: fail to add address")
+               }
+       }
+
+       if err := r.book.SaveToFile(); err != nil {
+               log.WithField("err", err).Warn("dialSeeds: fail to save address book")
+       }
+
+       perm := rand.Perm(len(netAddrs))
+       for i := 0; i < len(perm); i += 2 {
+               if err := r.Switch.DialPeerWithAddress(netAddrs[perm[i]]); err != nil {
+                       log.WithField("err", err).Warn("dialSeeds: fail to dial seed")
+               }
+       }
+}
+
+func (r *PEXReactor) ensurePeers() {
+       numOutPeers, _, numDialing := r.Switch.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 3
+       log.WithFields(log.Fields{
+               "numOutPeers": numOutPeers,
+               "numDialing":  numDialing,
+               "numToDial":   numToDial,
+       }).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
+       }
+
+       newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
+       toDial := make(map[string]*p2p.NetAddress)
+       maxAttempts := numToDial * 3
+
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range r.Switch.Peers().List() {
+               connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+       }
+
+       for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
+               try := r.book.PickAddress(newBias)
+               if try == nil {
+                       continue
+               }
+               if _, selected := toDial[try.IP.String()]; selected {
+                       continue
+               }
+               if dialling := r.Switch.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               log.Debug("Will dial address addr:", try)
+               toDial[try.IP.String()] = try
+       }
+
+       var wg sync.WaitGroup
+       for _, item := range toDial {
+               wg.Add(1)
+               go r.dialPeerWorker(item, &wg)
+       }
+       wg.Wait()
+
+       if r.book.NeedMoreAddrs() {
+               if peers := r.Switch.Peers().List(); len(peers) > 0 {
+                       peer := peers[rand.Int()%len(peers)]
+                       r.RequestAddrs(peer)
+               }
+       }
+}
+
+func (r *PEXReactor) ensurePeersRoutine() {
+       r.ensurePeers()
+       if r.Switch.Peers().Size() < 3 {
+               r.dialSeeds()
+       }
+
+       ticker := time.NewTicker(120 * time.Second)
+       quickTicker := time.NewTicker(3 * time.Second)
+
+       for {
+               select {
+               case <-ticker.C:
+                       r.ensurePeers()
+               case <-quickTicker.C:
+                       if r.Switch.Peers().Size() < 3 {
+                               r.ensurePeers()
+                       }
+               case <-r.Quit:
+                       return
+               }
+       }
+}
+
+func (r *PEXReactor) flushMsgCountByPeer() {
+       ticker := time.NewTicker(1 * time.Hour)
+       for {
+               select {
+               case <-ticker.C:
+                       r.msgCountByPeer.Clear()
+               case <-r.Quit:
+                       return
+               }
+       }
+}
+
+func (r *PEXReactor) incrementMsgCount(addr string) {
+       var count uint16
+       if countI := r.msgCountByPeer.Get(addr); countI != nil {
+               count = countI.(uint16)
+       }
+       count++
+       r.msgCountByPeer.Set(addr, count)
+}
+
+func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
+       return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
+}
diff --git a/p2p/pex_reactor.go b/p2p/pex_reactor.go
deleted file mode 100644 (file)
index 395f199..0000000
+++ /dev/null
@@ -1,422 +0,0 @@
-package p2p
-
-import (
-       "bytes"
-       "fmt"
-       "math/rand"
-       "reflect"
-       "strings"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-       wire "github.com/tendermint/go-wire"
-       cmn "github.com/tendermint/tmlibs/common"
-
-       "github.com/bytom/errors"
-)
-
-const (
-       // PexChannel is a channel for PEX messages
-       PexChannel = byte(0x00)
-
-       // period to ensure peers connected
-       defaultEnsurePeersPeriod = 120 * time.Second
-       minNumOutboundPeers      = 5
-       maxPexMessageSize        = 1048576 // 1MB
-
-       // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
-       defaultMaxMsgCountByPeer    = 1000
-       msgCountByPeerFlushInterval = 1 * time.Hour
-)
-
-var ErrSendPexFail = errors.New("Send pex message fail")
-
-// PEXReactor handles PEX (peer exchange) and ensures that an
-// adequate number of peers are connected to the switch.
-//
-// It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
-//
-// ## Preventing abuse
-//
-// For now, it just limits the number of messages from one peer to
-// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
-// msg/hour).
-//
-// NOTE [2017-01-17]:
-//   Limiting is fine for now. Maybe down the road we want to keep track of the
-//   quality of peer messages so if peerA keeps telling us about peers we can't
-//   connect to then maybe we should care less about peerA. But I don't think
-//   that kind of complexity is priority right now.
-type PEXReactor struct {
-       BaseReactor
-
-       sw                *Switch
-       book              *AddrBook
-       ensurePeersPeriod time.Duration
-
-       // tracks message count by peer, so we can prevent abuse
-       msgCountByPeer    *cmn.CMap
-       maxMsgCountByPeer uint16
-}
-
-// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
-       r := &PEXReactor{
-               sw:                sw,
-               book:              b,
-               ensurePeersPeriod: defaultEnsurePeersPeriod,
-               msgCountByPeer:    cmn.NewCMap(),
-               maxMsgCountByPeer: defaultMaxMsgCountByPeer,
-       }
-       r.BaseReactor = *NewBaseReactor("PEXReactor", r)
-       return r
-}
-
-// OnStart implements BaseService
-func (r *PEXReactor) OnStart() error {
-       r.BaseReactor.OnStart()
-       r.book.Start()
-       go r.ensurePeersRoutine()
-       go r.flushMsgCountByPeer()
-       return nil
-}
-
-// OnStop implements BaseService
-func (r *PEXReactor) OnStop() {
-       r.BaseReactor.OnStop()
-       r.book.Stop()
-}
-
-// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
-       return []*ChannelDescriptor{
-               &ChannelDescriptor{
-                       ID:                PexChannel,
-                       Priority:          1,
-                       SendQueueCapacity: 10,
-               },
-       }
-}
-
-// AddPeer implements Reactor by adding peer to the address book (if inbound)
-// or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) error {
-       if p.IsOutbound() {
-               // For outbound peers, the address is already in the books.
-               // Either it was added in DialSeeds or when we
-               // received the peer's address in r.Receive
-               if r.book.NeedMoreAddrs() {
-                       if ok := r.RequestPEX(p); !ok {
-                               return ErrSendPexFail
-                       }
-               }
-               return nil
-       }
-
-       // For inbound connections, the peer is its own source
-       addr, err := NewNetAddressString(p.ListenAddr)
-       if err != nil {
-               // this should never happen
-               log.WithFields(log.Fields{
-                       "addr":  p.ListenAddr,
-                       "error": err,
-               }).Error("Error in AddPeer: Invalid peer address")
-               return errors.New("Error in AddPeer: Invalid peer address")
-       }
-       r.book.AddAddress(addr, addr)
-
-       // close the connect if connect is big than max limit
-       if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
-               if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
-                       <-time.After(1 * time.Second)
-                       r.sw.StopPeerGracefully(p)
-               }
-               return errors.New("Error in AddPeer: reach the max peer, exchange then close")
-       }
-
-       return nil
-}
-
-// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
-       // If we aren't keeping track of local temp data for each peer here, then we
-       // don't have to do anything.
-}
-
-// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
-       srcAddr := src.Connection().RemoteAddress
-       srcAddrStr := srcAddr.String()
-
-       r.IncrementMsgCountForPeer(srcAddrStr)
-       if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
-               log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
-               // TODO remove src from peers?
-               return
-       }
-
-       _, msg, err := DecodeMessage(msgBytes)
-       if err != nil {
-               log.WithField("error", err).Error("Error decoding message")
-               return
-       }
-       log.WithField("msg", msg).Info("Reveived message")
-
-       switch msg := msg.(type) {
-       case *pexRequestMessage:
-               // src requested some peers.
-               if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
-                       log.Info("Send address message failed. Stop peer.")
-               }
-       case *pexAddrsMessage:
-               // We received some peer addresses from src.
-               // (We don't want to get spammed with bad peers)
-               for _, addr := range msg.Addrs {
-                       if addr != nil {
-                               r.book.AddAddress(addr, srcAddr)
-                       }
-               }
-       default:
-               log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
-       }
-}
-
-// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestPEX(p *Peer) bool {
-       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
-       if !ok {
-               r.sw.StopPeerGracefully(p)
-       }
-       return ok
-}
-
-// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
-       ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
-       if !ok {
-               r.sw.StopPeerGracefully(p)
-       }
-       return ok
-}
-
-// SetEnsurePeersPeriod sets period to ensure peers connected.
-func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
-       r.ensurePeersPeriod = d
-}
-
-// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
-func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
-       r.maxMsgCountByPeer = v
-}
-
-// ReachedMaxMsgCountForPeer returns true if we received too many
-// messages from peer with address `addr`.
-// NOTE: assumes the value in the CMap is non-nil
-func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
-       return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
-}
-
-// Increment or initialize the msg count for the peer in the CMap
-func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
-       var count uint16
-       countI := r.msgCountByPeer.Get(addr)
-       if countI != nil {
-               count = countI.(uint16)
-       }
-       count++
-       r.msgCountByPeer.Set(addr, count)
-}
-
-// Ensures that sufficient peers are connected. (continuous)
-func (r *PEXReactor) ensurePeersRoutine() {
-       // Randomize when routine starts
-       ensurePeersPeriodMs := int64(10000)
-       time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
-
-       // fire once immediately.
-       r.ensurePeers()
-
-       // fire periodically
-       ticker := time.NewTicker(r.ensurePeersPeriod)
-       quickTicker := time.NewTicker(time.Second * 1)
-
-       for {
-               select {
-               case <-ticker.C:
-                       r.ensurePeers()
-               case <-quickTicker.C:
-                       if r.sw.peers.Size() < 3 {
-                               r.ensurePeers()
-                       }
-               case <-r.Quit:
-                       ticker.Stop()
-                       quickTicker.Stop()
-                       return
-               }
-       }
-}
-
-// ensurePeers ensures that sufficient peers are connected. (once)
-//
-// Old bucket / New bucket are arbitrary categories to denote whether an
-// address is vetted or not, and this needs to be determined over time via a
-// heuristic that we haven't perfected yet, or, perhaps is manually edited by
-// the node operator. It should not be used to compute what addresses are
-// already connected or not.
-//
-// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
-// What we're currently doing in terms of marking good/bad peers is just a
-// placeholder. It should not be the case that an address becomes old/vetted
-// upon a single successful connection.
-func (r *PEXReactor) ensurePeers() {
-       numOutPeers, _, numDialing := r.Switch.NumPeers()
-       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
-       log.WithFields(log.Fields{
-               "numOutPeers": numOutPeers,
-               "numDialing":  numDialing,
-               "numToDial":   numToDial,
-       }).Info("Ensure peers")
-       if numToDial <= 0 {
-               return
-       }
-
-       newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
-       toDial := make(map[string]*NetAddress)
-
-       // Try to pick numToDial addresses to dial.
-       for i := 0; i < numToDial; i++ {
-               // The purpose of newBias is to first prioritize old (more vetted) peers
-               // when we have few connections, but to allow for new (less vetted) peers
-               // if we already have many connections. This algorithm isn't perfect, but
-               // it somewhat ensures that we prioritize connecting to more-vetted
-               // peers.
-
-               var picked *NetAddress
-               // Try to fetch a new peer 3 times.
-               // This caps the maximum number of tries to 3 * numToDial.
-               for j := 0; j < 3; j++ {
-                       try := r.book.PickAddress(newBias)
-                       if try == nil {
-                               break
-                       }
-                       ka := r.book.addrLookup[try.String()]
-                       if ka != nil {
-                               if ka.isBad() {
-                                       continue
-                               }
-                       }
-                       _, alreadySelected := toDial[try.IP.String()]
-                       alreadyDialing := r.Switch.IsDialing(try)
-                       var alreadyConnected bool
-
-                       for _, v := range r.Switch.Peers().list {
-                               if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
-                                       alreadyConnected = true
-                                       break
-                               }
-                       }
-                       if alreadySelected || alreadyDialing || alreadyConnected {
-                               continue
-                       } else {
-                               log.Debug("Will dial address addr:", try)
-                               picked = try
-                               break
-                       }
-               }
-               if picked == nil {
-                       continue
-               }
-               toDial[picked.IP.String()] = picked
-       }
-
-       var wg sync.WaitGroup
-       for _, item := range toDial {
-               wg.Add(1)
-               go r.dialPeerWorker(item, &wg)
-       }
-       wg.Wait()
-
-       // If we need more addresses, pick a random peer and ask for more.
-       if r.book.NeedMoreAddrs() {
-               if peers := r.Switch.Peers().List(); len(peers) > 0 {
-                       i := rand.Int() % len(peers)
-                       peer := peers[i]
-                       log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
-                       if ok := r.RequestPEX(peer); !ok {
-                               log.Info("Send request address message failed. Stop peer.")
-                       }
-               }
-       }
-}
-
-func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
-       if err := r.Switch.DialPeerWithAddress(a); err != nil {
-               r.book.MarkAttempt(a)
-       } else {
-               r.book.MarkGood(a)
-       }
-       wg.Done()
-}
-
-func (r *PEXReactor) flushMsgCountByPeer() {
-       ticker := time.NewTicker(msgCountByPeerFlushInterval)
-
-       for {
-               select {
-               case <-ticker.C:
-                       r.msgCountByPeer.Clear()
-               case <-r.Quit:
-                       ticker.Stop()
-                       return
-               }
-       }
-}
-
-//-----------------------------------------------------------------------------
-// Messages
-
-const (
-       msgTypeRequest = byte(0x01)
-       msgTypeAddrs   = byte(0x02)
-)
-
-// PexMessage is a primary type for PEX messages. Underneath, it could contain
-// either pexRequestMessage, or pexAddrsMessage messages.
-type PexMessage interface{}
-
-var _ = wire.RegisterInterface(
-       struct{ PexMessage }{},
-       wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
-       wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
-)
-
-// DecodeMessage implements interface registered above.
-func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
-       msgType = bz[0]
-       n := new(int)
-       r := bytes.NewReader(bz)
-       msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
-       return
-}
-
-/*
-A pexRequestMessage requests additional peer addresses.
-*/
-type pexRequestMessage struct {
-}
-
-func (m *pexRequestMessage) String() string {
-       return "[pexRequest]"
-}
-
-/*
-A message with announced peer addresses.
-*/
-type pexAddrsMessage struct {
-       Addrs []*NetAddress
-}
-
-func (m *pexAddrsMessage) String() string {
-       return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
-}
diff --git a/p2p/pex_reactor_test.go b/p2p/pex_reactor_test.go
deleted file mode 100644 (file)
index a02caa8..0000000
+++ /dev/null
@@ -1,185 +0,0 @@
-// +build !network
-
-package p2p
-
-import (
-       "io/ioutil"
-       "math/rand"
-       "os"
-       "testing"
-       "time"
-
-       "github.com/stretchr/testify/assert"
-       "github.com/stretchr/testify/require"
-       wire "github.com/tendermint/go-wire"
-       cmn "github.com/tendermint/tmlibs/common"
-       "github.com/tendermint/tmlibs/log"
-)
-
-var (
-       sw *Switch
-)
-
-func TestPEXReactorBasic(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       dir, err := ioutil.TempDir("", "pex_reactor")
-       require.Nil(err)
-       defer os.RemoveAll(dir)
-       book := NewAddrBook(dir+"addrbook.json", true)
-       book.SetLogger(log.TestingLogger())
-
-       r := NewPEXReactor(book, sw)
-       r.SetLogger(log.TestingLogger())
-
-       assert.NotNil(r)
-       assert.NotEmpty(r.GetChannels())
-}
-
-func TestPEXReactorAddRemovePeer(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       dir, err := ioutil.TempDir("", "pex_reactor")
-       require.Nil(err)
-       defer os.RemoveAll(dir)
-       book := NewAddrBook(dir+"addrbook.json", true)
-       book.SetLogger(log.TestingLogger())
-
-       r := NewPEXReactor(book, sw)
-       r.SetLogger(log.TestingLogger())
-
-       size := book.Size()
-       peer := createRandomPeer(false)
-
-       r.AddPeer(peer)
-       assert.Equal(size+1, book.Size())
-
-       r.RemovePeer(peer, "peer not available")
-       assert.Equal(size+1, book.Size())
-
-       outboundPeer := createRandomPeer(true)
-
-       r.AddPeer(outboundPeer)
-       assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
-
-       r.RemovePeer(outboundPeer, "peer not available")
-       assert.Equal(size+1, book.Size())
-}
-
-func TestPEXReactorRunning(t *testing.T) {
-       require := require.New(t)
-
-       N := 3
-       switches := make([]*Switch, N)
-
-       dir, err := ioutil.TempDir("", "pex_reactor")
-       require.Nil(err)
-       defer os.RemoveAll(dir)
-       book := NewAddrBook(dir+"addrbook.json", false)
-       book.SetLogger(log.TestingLogger())
-
-       // create switches
-       for i := 0; i < N; i++ {
-               switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
-                       sw.SetLogger(log.TestingLogger().With("switch", i))
-
-                       r := NewPEXReactor(book, sw)
-                       r.SetLogger(log.TestingLogger())
-                       r.SetEnsurePeersPeriod(250 * time.Millisecond)
-                       sw.AddReactor("pex", r)
-                       return sw
-               })
-       }
-
-       // fill the address book and add listeners
-       for _, s := range switches {
-               addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr)
-               book.AddAddress(addr, addr)
-               l, _ := NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())
-               s.AddListener(l)
-       }
-
-       // start switches
-       for _, s := range switches {
-               _, err := s.Start() // start switch and reactors
-               require.Nil(err)
-       }
-
-       time.Sleep(1 * time.Second)
-
-       // check peers are connected after some time
-       for _, s := range switches {
-               outbound, inbound, _ := s.NumPeers()
-               if outbound+inbound == 0 {
-                       t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr)
-               }
-       }
-
-       // stop them
-       for _, s := range switches {
-               s.Stop()
-       }
-}
-
-func TestPEXReactorReceive(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       dir, err := ioutil.TempDir("", "pex_reactor")
-       require.Nil(err)
-       defer os.RemoveAll(dir)
-       book := NewAddrBook(dir+"addrbook.json", true)
-       book.SetLogger(log.TestingLogger())
-
-       r := NewPEXReactor(book, sw)
-       r.SetLogger(log.TestingLogger())
-
-       peer := createRandomPeer(false)
-
-       size := book.Size()
-       netAddr, _ := NewNetAddressString(peer.ListenAddr)
-       addrs := []*NetAddress{netAddr}
-       msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
-       r.Receive(PexChannel, peer, msg)
-       assert.Equal(size+1, book.Size())
-
-       msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
-       r.Receive(PexChannel, peer, msg)
-}
-
-func TestPEXReactorAbuseFromPeer(t *testing.T) {
-       assert, require := assert.New(t), require.New(t)
-
-       dir, err := ioutil.TempDir("", "pex_reactor")
-       require.Nil(err)
-       defer os.RemoveAll(dir)
-       book := NewAddrBook(dir+"addrbook.json", true)
-       book.SetLogger(log.TestingLogger())
-
-       r := NewPEXReactor(book, sw)
-       r.SetLogger(log.TestingLogger())
-       r.SetMaxMsgCountByPeer(5)
-
-       peer := createRandomPeer(false)
-
-       msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
-       for i := 0; i < 10; i++ {
-               r.Receive(PexChannel, peer, msg)
-       }
-
-       assert.True(r.ReachedMaxMsgCountForPeer(peer.ListenAddr))
-}
-
-func createRandomPeer(outbound bool) *Peer {
-       addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
-       netAddr, _ := NewNetAddressString(addr)
-       p := &Peer{
-               Key: cmn.RandStr(12),
-               NodeInfo: &NodeInfo{
-                       ListenAddr: addr,
-               },
-               outbound: outbound,
-               mconn:    &MConnection{RemoteAddress: netAddr},
-       }
-       p.SetLogger(log.TestingLogger().With("peer", addr))
-       return p
-}
index 8f0134a..ed8d017 100644 (file)
@@ -3,7 +3,6 @@ package p2p
 import (
        "encoding/json"
        "fmt"
-       "math/rand"
        "net"
        "sync"
        "time"
@@ -30,6 +29,15 @@ var (
        ErrConnectBannedPeer = errors.New("Connect banned peer")
 )
 
+// An AddrBook represents an address book from the pex package, which is used to store peer addresses.
+type AddrBook interface {
+       AddAddress(*NetAddress, *NetAddress) error
+       AddOurAddress(*NetAddress)
+       MarkGood(*NetAddress)
+       RemoveAddress(*NetAddress)
+       SaveToFile() error
+}
+
 //-----------------------------------------------------------------------------
 
 // Switch handles peer connections and exposes an API to receive incoming messages
@@ -39,7 +47,7 @@ var (
 type Switch struct {
        cmn.BaseService
 
-       config       *cfg.P2PConfig
+       Config       *cfg.P2PConfig
        peerConfig   *PeerConfig
        listeners    []Listener
        reactors     map[string]Reactor
@@ -49,16 +57,16 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
-       addrBook     *AddrBook
+       addrBook     AddrBook
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
 }
 
 // NewSwitch creates a new Switch with the given config.
-func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
+func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
        sw := &Switch{
-               config:       config,
+               Config:       config,
                peerConfig:   DefaultPeerConfig(config),
                reactors:     make(map[string]Reactor),
                chDescs:      make([]*ChannelDescriptor, 0),
@@ -66,17 +74,10 @@ func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
                peers:        NewPeerSet(),
                dialing:      cmn.NewCMap(),
                nodeInfo:     nil,
+               addrBook:     addrBook,
                db:           trustHistoryDB,
        }
        sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
-
-       // Optionally, start the pex reactor
-       if config.PexReactor {
-               sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
-               pexReactor := NewPEXReactor(sw.addrBook, sw)
-               sw.AddReactor("PEX", pexReactor)
-       }
-
        sw.bannedPeer = make(map[string]time.Time)
        if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
                if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
@@ -241,37 +242,6 @@ func (sw *Switch) startInitPeer(peer *Peer) error {
        return nil
 }
 
-// DialSeeds a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(seeds []string) error {
-       netAddrs, err := NewNetAddressStrings(seeds)
-       if err != nil {
-               return err
-       }
-
-       if sw.addrBook != nil {
-               // add seeds to `addrBook`
-               ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
-               for _, netAddr := range netAddrs {
-                       // do not add ourselves
-                       if netAddr.Equals(ourAddr) {
-                               continue
-                       }
-                       sw.addrBook.AddAddress(netAddr, ourAddr)
-               }
-
-               sw.addrBook.Save()
-       }
-
-       //permute the list, dial them in random order.
-       perm := rand.Perm(len(netAddrs))
-       for i := 0; i < len(perm); i += 2 {
-               j := perm[i]
-               sw.dialSeed(netAddrs[j])
-       }
-
-       return nil
-}
-
 func (sw *Switch) dialSeed(addr *NetAddress) {
        err := sw.DialPeerWithAddress(addr)
        if err != nil {
@@ -403,7 +373,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
                // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
                // be double of MaxNumPeers
-               if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
+               if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
                        inConn.Close()
                        log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
@@ -419,7 +389,7 @@ func (sw *Switch) listenerRoutine(l Listener) {
 }
 
 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+       peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
        if err != nil {
                conn.Close()
                return err
index aad9723..88c088d 100644 (file)
@@ -13,6 +13,35 @@ import (
 //PanicOnAddPeerErr add peer error
 var PanicOnAddPeerErr = false
 
+func CreateRandomPeer(outbound bool) *Peer {
+       _, netAddr := CreateRoutableAddr()
+       p := &Peer{
+               peerConn: &peerConn{
+                       outbound: outbound,
+               },
+               NodeInfo: &NodeInfo{
+                       ListenAddr: netAddr.DialString(),
+               },
+               mconn: &MConnection{},
+       }
+       return p
+}
+
+func CreateRoutableAddr() (addr string, netAddr *NetAddress) {
+       for {
+               var err error
+               addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256)
+               netAddr, err = NewNetAddressString(addr)
+               if err != nil {
+                       panic(err)
+               }
+               if netAddr.Routable() {
+                       break
+               }
+       }
+       return
+}
+
 // MakeConnectedSwitches switches connected via arbitrary net.Conn; useful for testing
 // Returns n switches, connected according to the connect func.
 // If connect==Connect2Switches, the switches will be fully connected.
@@ -21,7 +50,7 @@ var PanicOnAddPeerErr = false
 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
        switches := make([]*Switch, n)
        for i := 0; i < n; i++ {
-               switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
+               switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
        }
 
        if err := startSwitches(switches); err != nil {
@@ -73,11 +102,11 @@ func startSwitches(switches []*Switch) error {
        return nil
 }
 
-func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
+func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
        privKey := crypto.GenPrivKeyEd25519()
        // new switch, add reactors
        // TODO: let the config be passed in?
-       s := initSwitch(i, NewSwitch(cfg, nil))
+       s := initSwitch(i, NewSwitch(cfg, nil, nil))
        s.SetNodeInfo(&NodeInfo{
                PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
                Moniker:    cmn.Fmt("switch%d", i),
diff --git a/p2p/util.go b/p2p/util.go
deleted file mode 100644 (file)
index 2be3202..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package p2p
-
-import (
-       "crypto/sha256"
-)
-
-// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
-func doubleSha256(b []byte) []byte {
-       hasher := sha256.New()
-       hasher.Write(b)
-       sum := hasher.Sum(nil)
-       hasher.Reset()
-       hasher.Write(sum)
-       return hasher.Sum(nil)
-}
index e9d1af3..872ef4f 100644 (file)
@@ -32,7 +32,7 @@ func (c *Chain) GetBlockByHash(hash *bc.Hash) (*types.Block, error) {
 func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) {
        node := c.index.NodeByHeight(height)
        if node == nil {
-               return nil, errors.New("can't find block in given hight")
+               return nil, errors.New("can't find block in given height")
        }
        return c.store.GetBlock(&node.Hash)
 }
index 8f1e5e5..27c3bfe 100644 (file)
@@ -181,6 +181,7 @@ func (w *Wallet) buildAnnotatedTransaction(orig *types.Tx, b *types.Block, statu
                Inputs:                 make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
                Outputs:                make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
                StatusFail:             statusFail,
+               Size:                   orig.SerializedSize,
        }
        for i := range orig.Inputs {
                tx.Inputs = append(tx.Inputs, w.BuildAnnotatedInput(orig, uint32(i)))