Inputs []*AnnotatedInput `json:"inputs"`
Outputs []*AnnotatedOutput `json:"outputs"`
StatusFail bool `json:"status_fail"`
+ Size uint64 `json:"size"`
}
//AnnotatedInput means an annotated transaction input.
//AnnotatedAccount means an annotated account.
type AnnotatedAccount struct {
- ID string `json:"id"`
- Alias string `json:"alias,omitempty"`
- XPubs []chainkd.XPub `json:"xpubs"`
- Quorum int `json:"quorum"`
- KeyIndex uint64 `json:"key_index"`
+ ID string `json:"id"`
+ Alias string `json:"alias,omitempty"`
+ XPubs []chainkd.XPub `json:"xpubs"`
+ Quorum int `json:"quorum"`
+ KeyIndex uint64 `json:"key_index"`
}
//AnnotatedAsset means an annotated asset.
AssetDerivationPath []chainjson.HexBytes `json:"asset_derivation_path"`
}
+//AnnotatedUTXO means an annotated utxo.
type AnnotatedUTXO struct {
Alias string `json:"account_alias"`
OutputID string `json:"id"`
"golang.org/x/crypto/sha3"
)
+func DoubleSha256(b []byte) []byte {
+ hasher := sha3.New256()
+ hasher.Write(b)
+ sum := hasher.Sum(nil)
+ hasher.Reset()
+ hasher.Write(sum)
+ return hasher.Sum(nil)
+}
+
func Sha256(data ...[]byte) []byte {
d := sha3.New256()
for _, b := range data {
numWorkers uint64
started bool
discreteMining bool
- wg sync.WaitGroup
workerWg sync.WaitGroup
updateNumWorkers chan struct{}
- queryHashesPerSec chan float64
- updateHashes chan uint64
- speedMonitorQuit chan struct{}
quit chan struct{}
newBlockCh chan *bc.Hash
}
}
}
- // Wait until all workers shut down to stop the speed monitor since
- // they rely on being able to send updates to it.
m.workerWg.Wait()
- close(m.speedMonitorQuit)
- m.wg.Done()
}
// Start begins the CPU mining process as well as the speed monitor used to
m.Lock()
defer m.Unlock()
- // Nothing to do if the miner is already running or if running in
- // discrete mode (using GenerateNBlocks).
- if m.started || m.discreteMining {
+ // Nothing to do if the miner is already running
+ if m.started {
return
}
m.quit = make(chan struct{})
- m.speedMonitorQuit = make(chan struct{})
- m.wg.Add(1)
go m.miningWorkerController()
m.started = true
m.Lock()
defer m.Unlock()
- // Nothing to do if the miner is not currently running or if running in
- // discrete mode (using GenerateNBlocks).
- if !m.started || m.discreteMining {
+ // Nothing to do if the miner is not currently running
+ if !m.started {
return
}
close(m.quit)
- m.wg.Wait()
m.started = false
log.Info("CPU miner stopped")
}
txPool: txPool,
numWorkers: defaultNumWorkers,
updateNumWorkers: make(chan struct{}),
- queryHashesPerSec: make(chan float64),
- updateHashes: make(chan uint64),
newBlockCh: newBlockCh,
}
}
newMinedBlock: make(chan *blockPending),
quit: make(chan struct{}),
queue: prque.New(),
- queues: make(map[string]int),
queued: make(map[bc.Hash]*blockPending),
}
}
// state.
func (f *Fetcher) forgetBlock(hash bc.Hash) {
if insert := f.queued[hash]; insert != nil {
- f.queues[insert.peerID]--
- if f.queues[insert.peerID] == 0 {
- delete(f.queues, insert.peerID)
- }
delete(f.queued, hash)
}
}
cfg "github.com/bytom/config"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/pex"
core "github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/version"
}
trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
- manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+ addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
+ manager.sw = p2p.NewSwitch(config.P2P, addrBook, trustHistoryDB)
+
+ pexReactor := pex.NewPEXReactor(addrBook)
+ manager.sw.AddReactor("PEX", pexReactor)
manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
-
protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
manager.sw.AddReactor("PROTOCOL", protocolReactor)
}
func (sm *SyncManager) netStart() error {
- // Start the switch
_, err := sm.sw.Start()
- if err != nil {
- return err
- }
-
- // If seeds exist, add them to the address book and dial out
- if sm.config.P2P.Seeds != "" {
- // dial out
- seeds := strings.Split(sm.config.P2P.Seeds, ",")
- if err := sm.DialSeeds(seeds); err != nil {
- return err
- }
- }
- return nil
+ return err
}
//Start start sync manager service
return sm.peers
}
-//DialSeeds dial seed peers
-func (sm *SyncManager) DialSeeds(seeds []string) error {
- return sm.sw.DialSeeds(seeds)
-}
-
//Switch get sync manager switch
func (sm *SyncManager) Switch() *p2p.Switch {
return sm.sw
+++ /dev/null
-// Modified for Tendermint
-// Originally Copyright (c) 2013-2014 Conformal Systems LLC.
-// https://github.com/conformal/btcd/blob/master/LICENSE
-
-package p2p
-
-import (
- "encoding/binary"
- "encoding/json"
- "math"
- "math/rand"
- "net"
- "os"
- "sync"
- "time"
-
- log "github.com/sirupsen/logrus"
- "github.com/tendermint/go-crypto"
- cmn "github.com/tendermint/tmlibs/common"
-)
-
-const (
- // addresses under which the address manager will claim to need more addresses.
- needAddressThreshold = 1000
-
- // interval used to dump the address cache to disk for future use.
- dumpAddressInterval = time.Minute * 2
-
- // max addresses in each old address bucket.
- oldBucketSize = 64
-
- // buckets we split old addresses over.
- oldBucketCount = 64
-
- // max addresses in each new address bucket.
- newBucketSize = 64
-
- // buckets that we spread new addresses over.
- newBucketCount = 256
-
- // old buckets over which an address group will be spread.
- oldBucketsPerGroup = 4
-
- // new buckets over which an source address group will be spread.
- newBucketsPerGroup = 32
-
- // buckets a frequently seen new address may end up in.
- maxNewBucketsPerAddress = 4
-
- // days before which we assume an address has vanished
- // if we have not seen it announced in that long.
- numMissingDays = 30
-
- // tries without a single success before we assume an address is bad.
- numRetries = 3
-
- // max failures we will accept without a success before considering an address bad.
- maxFailures = 10
-
- // days since the last success before we will consider evicting an address.
- minBadDays = 7
-
- // % of total addresses known returned by GetSelection.
- getSelectionPercent = 23
-
- // min addresses that must be returned by GetSelection. Useful for bootstrapping.
- minGetSelection = 32
-
- // max addresses returned by GetSelection
- // NOTE: this must match "maxPexMessageSize"
- maxGetSelection = 250
-)
-
-const (
- bucketTypeNew = 0x01
- bucketTypeOld = 0x02
-)
-
-// AddrBook - concurrency safe peer address manager.
-type AddrBook struct {
- cmn.BaseService
-
- // immutable after creation
- filePath string
- routabilityStrict bool
- key string
-
- mtx sync.Mutex
- rand *rand.Rand
- ourAddrs map[string]*NetAddress
- addrLookup map[string]*knownAddress // new & old
- addrNew []map[string]*knownAddress
- addrOld []map[string]*knownAddress
- nOld int
- nNew int
-
- wg sync.WaitGroup
-}
-
-// NewAddrBook creates a new address book.
-// Use Start to begin processing asynchronous address updates.
-func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
- am := &AddrBook{
- rand: rand.New(rand.NewSource(time.Now().UnixNano())),
- ourAddrs: make(map[string]*NetAddress),
- addrLookup: make(map[string]*knownAddress),
- filePath: filePath,
- routabilityStrict: routabilityStrict,
- }
- am.init()
- am.BaseService = *cmn.NewBaseService(nil, "AddrBook", am)
- return am
-}
-
-// When modifying this, don't forget to update loadFromFile()
-func (a *AddrBook) init() {
- a.key = crypto.CRandHex(24) // 24/2 * 8 = 96 bits
- // New addr buckets
- a.addrNew = make([]map[string]*knownAddress, newBucketCount)
- for i := range a.addrNew {
- a.addrNew[i] = make(map[string]*knownAddress)
- }
- // Old addr buckets
- a.addrOld = make([]map[string]*knownAddress, oldBucketCount)
- for i := range a.addrOld {
- a.addrOld[i] = make(map[string]*knownAddress)
- }
-}
-
-// OnStart implements Service.
-func (a *AddrBook) OnStart() error {
- a.BaseService.OnStart()
- a.loadFromFile(a.filePath)
- a.wg.Add(1)
- go a.saveRoutine()
- return nil
-}
-
-// OnStop implements Service.
-func (a *AddrBook) OnStop() {
- a.BaseService.OnStop()
-}
-
-func (a *AddrBook) Wait() {
- a.wg.Wait()
-}
-
-func (a *AddrBook) AddOurAddress(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- log.WithField("addr", addr).Info("Add our address to book")
-
- a.ourAddrs[addr.String()] = addr
-}
-
-func (a *AddrBook) OurAddresses() []*NetAddress {
- addrs := []*NetAddress{}
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- for _, addr := range a.ourAddrs {
- addrs = append(addrs, addr)
- }
- return addrs
-}
-
-// NOTE: addr must not be nil
-func (a *AddrBook) AddAddress(addr *NetAddress, src *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- log.WithFields(log.Fields{
- "addr": addr,
- "src": src,
- }).Debug("Add address to book")
- a.addAddress(addr, src)
-}
-
-func (a *AddrBook) NeedMoreAddrs() bool {
- return a.Size() < needAddressThreshold
-}
-
-func (a *AddrBook) Size() int {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- return a.size()
-}
-
-func (a *AddrBook) size() int {
- return a.nNew + a.nOld
-}
-
-// Pick an address to connect to with new/old bias.
-func (a *AddrBook) PickAddress(newBias int) *NetAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- if a.size() == 0 {
- return nil
- }
- if newBias > 100 {
- newBias = 100
- }
- if newBias < 0 {
- newBias = 0
- }
-
- // Bias between new and old addresses.
- oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(newBias))
- newCorrelation := math.Sqrt(float64(a.nNew)) * float64(newBias)
-
- if (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation {
- // pick random Old bucket.
- var bucket map[string]*knownAddress = nil
- num := 0
- for len(bucket) == 0 && num < oldBucketCount {
- bucket = a.addrOld[a.rand.Intn(len(a.addrOld))]
- num++
- }
- if num == oldBucketCount {
- return nil
- }
- // pick a random ka from bucket.
- randIndex := a.rand.Intn(len(bucket))
- for _, ka := range bucket {
- if randIndex == 0 {
- return ka.Addr
- }
- randIndex--
- }
- cmn.PanicSanity("Should not happen")
- } else {
- // pick random New bucket.
- var bucket map[string]*knownAddress = nil
- num := 0
- for len(bucket) == 0 && num < newBucketCount {
- bucket = a.addrNew[a.rand.Intn(len(a.addrNew))]
- num++
- }
- if num == newBucketCount {
- return nil
- }
- // pick a random ka from bucket.
- randIndex := a.rand.Intn(len(bucket))
- for _, ka := range bucket {
- if randIndex == 0 {
- return ka.Addr
- }
- randIndex--
- }
- cmn.PanicSanity("Should not happen")
- }
- return nil
-}
-
-func (a *AddrBook) MarkGood(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- ka := a.addrLookup[addr.String()]
- if ka == nil {
- return
- }
- ka.markGood()
- if ka.isNew() {
- a.moveToOld(ka)
- }
-}
-
-func (a *AddrBook) MarkAttempt(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- ka := a.addrLookup[addr.String()]
- if ka == nil {
- return
- }
- ka.markAttempt()
-}
-
-// MarkBad currently just ejects the address. In the future, consider
-// blacklisting.
-func (a *AddrBook) MarkBad(addr *NetAddress) {
- a.RemoveAddress(addr)
-}
-
-// RemoveAddress removes the address from the book.
-func (a *AddrBook) RemoveAddress(addr *NetAddress) {
- a.mtx.Lock()
- defer a.mtx.Unlock()
- ka := a.addrLookup[addr.String()]
- if ka == nil {
- return
- }
- log.WithField("addr", addr).Info("Remove address from book")
- a.removeFromAllBuckets(ka)
-}
-
-/* Peer exchange */
-
-// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
-func (a *AddrBook) GetSelection() []*NetAddress {
- a.mtx.Lock()
- defer a.mtx.Unlock()
-
- if a.size() == 0 {
- return nil
- }
-
- allAddr := make([]*NetAddress, a.size())
- i := 0
- for _, v := range a.addrLookup {
- allAddr[i] = v.Addr
- i++
- }
-
- numAddresses := cmn.MaxInt(
- cmn.MinInt(minGetSelection, len(allAddr)),
- len(allAddr)*getSelectionPercent/100)
- numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
-
- // Fisher-Yates shuffle the array. We only need to do the first
- // `numAddresses' since we are throwing the rest.
- for i := 0; i < numAddresses; i++ {
- // pick a number between current index and the end
- j := rand.Intn(len(allAddr)-i) + i
- allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
- }
-
- // slice off the limit we are willing to share.
- return allAddr[:numAddresses]
-}
-
-/* Loading & Saving */
-
-type addrBookJSON struct {
- Key string
- Addrs []*knownAddress
-}
-
-func (a *AddrBook) saveToFile(filePath string) {
- log.WithField("size", a.Size()).Info("Saving AddrBook to file")
-
- a.mtx.Lock()
- defer a.mtx.Unlock()
- // Compile Addrs
- addrs := []*knownAddress{}
- for _, ka := range a.addrLookup {
- addrs = append(addrs, ka)
- }
-
- aJSON := &addrBookJSON{
- Key: a.key,
- Addrs: addrs,
- }
-
- jsonBytes, err := json.MarshalIndent(aJSON, "", "\t")
- if err != nil {
- log.WithField("err", err).Error("Failed to save AddrBook to file")
- return
- }
- err = cmn.WriteFileAtomic(filePath, jsonBytes, 0644)
- if err != nil {
- log.WithFields(log.Fields{
- "file": filePath,
- "err": err,
- }).Error("Failed to save AddrBook to file")
- }
-}
-
-// Returns false if file does not exist.
-// cmn.Panics if file is corrupt.
-func (a *AddrBook) loadFromFile(filePath string) bool {
- // If doesn't exist, do nothing.
- _, err := os.Stat(filePath)
- if os.IsNotExist(err) {
- return false
- }
-
- // Load addrBookJSON{}
- r, err := os.Open(filePath)
- if err != nil {
- cmn.PanicCrisis(cmn.Fmt("Error opening file %s: %v", filePath, err))
- }
- defer r.Close()
- aJSON := &addrBookJSON{}
- dec := json.NewDecoder(r)
- err = dec.Decode(aJSON)
- if err != nil {
- cmn.PanicCrisis(cmn.Fmt("Error reading file %s: %v", filePath, err))
- }
-
- // Restore all the fields...
- // Restore the key
- a.key = aJSON.Key
- // Restore .addrNew & .addrOld
- for _, ka := range aJSON.Addrs {
- for _, bucketIndex := range ka.Buckets {
- bucket := a.getBucket(ka.BucketType, bucketIndex)
- bucket[ka.Addr.String()] = ka
- }
- a.addrLookup[ka.Addr.String()] = ka
- if ka.BucketType == bucketTypeNew {
- a.nNew++
- } else {
- a.nOld++
- }
- }
- return true
-}
-
-// Save saves the book.
-func (a *AddrBook) Save() {
- log.WithField("size", a.Size()).Info("Saving AddrBook to file")
- a.saveToFile(a.filePath)
-}
-
-/* Private methods */
-
-func (a *AddrBook) saveRoutine() {
- dumpAddressTicker := time.NewTicker(dumpAddressInterval)
-out:
- for {
- select {
- case <-dumpAddressTicker.C:
- a.saveToFile(a.filePath)
- case <-a.Quit:
- break out
- }
- }
- dumpAddressTicker.Stop()
- a.saveToFile(a.filePath)
- a.wg.Done()
- log.Info("Address handler done")
-}
-
-func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
- switch bucketType {
- case bucketTypeNew:
- return a.addrNew[bucketIdx]
- case bucketTypeOld:
- return a.addrOld[bucketIdx]
- default:
- cmn.PanicSanity("Should not happen")
- return nil
- }
-}
-
-// Adds ka to new bucket. Returns false if it couldn't do it cuz buckets full.
-// NOTE: currently it always returns true.
-func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool {
- // Sanity check
- if ka.isOld() {
- log.Error(cmn.Fmt("Cannot add address already in old bucket to a new bucket: %v", ka))
- return false
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeNew, bucketIdx)
-
- // Already exists?
- if _, ok := bucket[addrStr]; ok {
- return true
- }
-
- // Enforce max addresses.
- if len(bucket) > newBucketSize {
- log.Info("new bucket is full, expiring old ")
- a.expireNew(bucketIdx)
- }
-
- // Add to bucket.
- bucket[addrStr] = ka
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nNew++
- }
-
- // Ensure in addrLookup
- a.addrLookup[addrStr] = ka
-
- return true
-}
-
-// Adds ka to old bucket. Returns false if it couldn't do it cuz buckets full.
-func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool {
- // Sanity check
- if ka.isNew() {
- log.Error(cmn.Fmt("Cannot add new address to old bucket: %v", ka))
- return false
- }
- if len(ka.Buckets) != 0 {
- log.Error(cmn.Fmt("Cannot add already old address to another old bucket: %v", ka))
- return false
- }
-
- addrStr := ka.Addr.String()
- bucket := a.getBucket(bucketTypeOld, bucketIdx)
-
- // Already exists?
- if _, ok := bucket[addrStr]; ok {
- return true
- }
-
- // Enforce max addresses.
- if len(bucket) > oldBucketSize {
- return false
- }
-
- // Add to bucket.
- bucket[addrStr] = ka
- if ka.addBucketRef(bucketIdx) == 1 {
- a.nOld++
- }
-
- // Ensure in addrLookup
- a.addrLookup[addrStr] = ka
-
- return true
-}
-
-func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) {
- if ka.BucketType != bucketType {
- log.Error(cmn.Fmt("Bucket type mismatch: %v", ka))
- return
- }
- bucket := a.getBucket(bucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- if ka.removeBucketRef(bucketIdx) == 0 {
- if bucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.Addr.String())
- }
-}
-
-func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
- for _, bucketIdx := range ka.Buckets {
- bucket := a.getBucket(ka.BucketType, bucketIdx)
- delete(bucket, ka.Addr.String())
- }
- ka.Buckets = nil
- if ka.BucketType == bucketTypeNew {
- a.nNew--
- } else {
- a.nOld--
- }
- delete(a.addrLookup, ka.Addr.String())
-}
-
-func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
- bucket := a.getBucket(bucketType, bucketIdx)
- var oldest *knownAddress
- for _, ka := range bucket {
- if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
- oldest = ka
- }
- }
- return oldest
-}
-
-func (a *AddrBook) addAddress(addr, src *NetAddress) {
- if a.routabilityStrict && !addr.Routable() {
- log.Error(cmn.Fmt("Cannot add non-routable address %v", addr))
- return
- }
- if _, ok := a.ourAddrs[addr.String()]; ok {
- // Ignore our own listener address.
- return
- }
-
- ka := a.addrLookup[addr.String()]
-
- if ka != nil {
- // Already old.
- if ka.isOld() {
- return
- }
- // Already in max new buckets.
- if len(ka.Buckets) == maxNewBucketsPerAddress {
- return
- }
- // The more entries we have, the less likely we are to add more.
- factor := int32(2 * len(ka.Buckets))
- if a.rand.Int31n(factor) != 0 {
- return
- }
- } else {
- ka = newKnownAddress(addr, src)
- }
-
- bucket := a.calcNewBucket(addr, src)
- a.addToNewBucket(ka, bucket)
-
- log.Debug("Added new address ", "address:", addr, " total:", a.size())
-}
-
-// Make space in the new buckets by expiring the really bad entries.
-// If no bad entries are available we remove the oldest.
-func (a *AddrBook) expireNew(bucketIdx int) {
- for addrStr, ka := range a.addrNew[bucketIdx] {
- // If an entry is bad, throw it away
- if ka.isBad() {
- log.Info(cmn.Fmt("expiring bad address %v", addrStr))
- a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
- return
- }
- }
-
- // If we haven't thrown out a bad entry, throw out the oldest entry
- oldest := a.pickOldest(bucketTypeNew, bucketIdx)
- a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
-}
-
-// Promotes an address from new to old.
-// TODO: Move to old probabilistically.
-// The better a node is, the less likely it should be evicted from an old bucket.
-func (a *AddrBook) moveToOld(ka *knownAddress) {
- // Sanity check
- if ka.isOld() {
- log.Error(cmn.Fmt("Cannot promote address that is already old %v", ka))
- return
- }
- if len(ka.Buckets) == 0 {
- log.Error(cmn.Fmt("Cannot promote address that isn't in any new buckets %v", ka))
- return
- }
-
- // Remember one of the buckets in which ka is in.
- freedBucket := ka.Buckets[0]
- // Remove from all (new) buckets.
- a.removeFromAllBuckets(ka)
- // It's officially old now.
- ka.BucketType = bucketTypeOld
-
- // Try to add it to its oldBucket destination.
- oldBucketIdx := a.calcOldBucket(ka.Addr)
- added := a.addToOldBucket(ka, oldBucketIdx)
- if !added {
- // No room, must evict something
- oldest := a.pickOldest(bucketTypeOld, oldBucketIdx)
- a.removeFromBucket(oldest, bucketTypeOld, oldBucketIdx)
- // Find new bucket to put oldest in
- newBucketIdx := a.calcNewBucket(oldest.Addr, oldest.Src)
- added := a.addToNewBucket(oldest, newBucketIdx)
- // No space in newBucket either, just put it in freedBucket from above.
- if !added {
- added := a.addToNewBucket(oldest, freedBucket)
- if !added {
- log.Error(cmn.Fmt("Could not migrate oldest %v to freedBucket %v", oldest, freedBucket))
- }
- }
- // Finally, add to bucket again.
- added = a.addToOldBucket(ka, oldBucketIdx)
- if !added {
- log.Error(cmn.Fmt("Could not re-add ka %v to oldBucketIdx %v", ka, oldBucketIdx))
- }
- }
-}
-
-// doublesha256( key + sourcegroup +
-// int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets
-func (a *AddrBook) calcNewBucket(addr, src *NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(a.groupKey(addr))...)
- data1 = append(data1, []byte(a.groupKey(src))...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= newBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(src)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
-}
-
-// doublesha256( key + group +
-// int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets
-func (a *AddrBook) calcOldBucket(addr *NetAddress) int {
- data1 := []byte{}
- data1 = append(data1, []byte(a.key)...)
- data1 = append(data1, []byte(addr.String())...)
- hash1 := doubleSha256(data1)
- hash64 := binary.BigEndian.Uint64(hash1)
- hash64 %= oldBucketsPerGroup
- var hashbuf [8]byte
- binary.BigEndian.PutUint64(hashbuf[:], hash64)
- data2 := []byte{}
- data2 = append(data2, []byte(a.key)...)
- data2 = append(data2, a.groupKey(addr)...)
- data2 = append(data2, hashbuf[:]...)
-
- hash2 := doubleSha256(data2)
- return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
-}
-
-// Return a string representing the network group of this address.
-// This is the /16 for IPv6, the /32 (/36 for he.net) for IPv6, the string
-// "local" for a local address and the string "unroutable for an unroutable
-// address.
-func (a *AddrBook) groupKey(na *NetAddress) string {
- if a.routabilityStrict && na.Local() {
- return "local"
- }
- if a.routabilityStrict && !na.Routable() {
- return "unroutable"
- }
-
- if ipv4 := na.IP.To4(); ipv4 != nil {
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
- }
- if na.RFC6145() || na.RFC6052() {
- // last four bytes are the ip address
- ip := net.IP(na.IP[12:16])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- if na.RFC3964() {
- ip := net.IP(na.IP[2:7])
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
-
- }
- if na.RFC4380() {
- // teredo tunnels have the last 4 bytes as the v4 address XOR
- // 0xff.
- ip := net.IP(make([]byte, 4))
- for i, byte := range na.IP[12:16] {
- ip[i] = byte ^ 0xff
- }
- return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
- }
-
- // OK, so now we know ourselves to be a IPv6 address.
- // bitcoind uses /32 for everything, except for Hurricane Electric's
- // (he.net) IP range, which it uses /36 for.
- bits := 32
- heNet := &net.IPNet{IP: net.ParseIP("2001:470::"),
- Mask: net.CIDRMask(32, 128)}
- if heNet.Contains(na.IP) {
- bits = 36
- }
-
- return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
-}
-
-//-----------------------------------------------------------------------------
-
-/*
- knownAddress
-
- tracks information about a known network address that is used
- to determine how viable an address is.
-*/
-type knownAddress struct {
- Addr *NetAddress
- Src *NetAddress
- Attempts int32
- LastAttempt time.Time
- LastSuccess time.Time
- BucketType byte
- Buckets []int
-}
-
-func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress {
- return &knownAddress{
- Addr: addr,
- Src: src,
- Attempts: 0,
- LastAttempt: time.Now(),
- BucketType: bucketTypeNew,
- Buckets: nil,
- }
-}
-
-func (ka *knownAddress) isOld() bool {
- return ka.BucketType == bucketTypeOld
-}
-
-func (ka *knownAddress) isNew() bool {
- return ka.BucketType == bucketTypeNew
-}
-
-func (ka *knownAddress) markAttempt() {
- now := time.Now()
- ka.LastAttempt = now
- ka.Attempts += 1
-}
-
-func (ka *knownAddress) markGood() {
- now := time.Now()
- ka.LastAttempt = now
- ka.Attempts = 0
- ka.LastSuccess = now
-}
-
-func (ka *knownAddress) addBucketRef(bucketIdx int) int {
- for _, bucket := range ka.Buckets {
- if bucket == bucketIdx {
- // TODO refactor to return error?
- // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka))
- return -1
- }
- }
- ka.Buckets = append(ka.Buckets, bucketIdx)
- return len(ka.Buckets)
-}
-
-func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
- buckets := []int{}
- for _, bucket := range ka.Buckets {
- if bucket != bucketIdx {
- buckets = append(buckets, bucket)
- }
- }
- if len(buckets) != len(ka.Buckets)-1 {
- // TODO refactor to return error?
- // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka))
- return -1
- }
- ka.Buckets = buckets
- return len(ka.Buckets)
-}
-
-/*
- An address is bad if the address in question has not been tried in the last
- minute and meets one of the following criteria:
-
- 1) It claims to be from the future
- 2) It hasn't been seen in over a month
- 3) It has failed at least three times and never succeeded
- 4) It has failed ten times in the last week
-
- All addresses that meet these criteria are assumed to be worthless and not
- worth keeping hold of.
-*/
-func (ka *knownAddress) isBad() bool {
- // Has been attempted in the last minute --> bad
- if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
- return true
- }
-
- // Over a month old?
- if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
- return true
- }
-
- // Never succeeded?
- if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
- return true
- }
-
- // Hasn't succeeded in too long?
- if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
- return true
- }
-
- return false
-}
}
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
- if pkt.ChannelID == PexChannel {
- continue
- } else {
- cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
- }
+ cmn.PanicQ(cmn.Fmt("Unknown channel %X", pkt.ChannelID))
}
msgBytes, err := channel.recvMsgPacket(pkt)
if err != nil {
// Cleanup
close(l.connections)
- for _ = range l.connections {
- // Drain
- }
}
//Connections a channel of inbound connections. It gets closed when the listener closes.
--- /dev/null
+package pex
+
+import (
+ "encoding/binary"
+ "errors"
+ "math"
+ "math/rand"
+ "net"
+ "sync"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ tcrypto "github.com/tendermint/go-crypto"
+ cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/crypto"
+ "github.com/bytom/p2p"
+)
+
+// AddrBook - concurrency safe peer address manager.
+type AddrBook struct {
+ cmn.BaseService
+
+ // immutable after creation
+ filePath string
+ routabilityStrict bool
+ key string
+
+ mtx sync.RWMutex
+ rand *rand.Rand
+ ourAddrs map[string]*p2p.NetAddress
+ addrLookup map[string]*knownAddress // new & old
+ bucketsNew []map[string]*knownAddress
+ bucketsOld []map[string]*knownAddress
+ nOld int
+ nNew int
+}
+
+// NewAddrBook creates a new address book. Use Start to begin processing asynchronous address updates.
+func NewAddrBook(filePath string, routabilityStrict bool) *AddrBook {
+ a := &AddrBook{
+ filePath: filePath,
+ routabilityStrict: routabilityStrict,
+ key: tcrypto.CRandHex(24),
+ rand: rand.New(rand.NewSource(time.Now().UnixNano())),
+ ourAddrs: make(map[string]*p2p.NetAddress),
+ addrLookup: make(map[string]*knownAddress),
+ bucketsNew: make([]map[string]*knownAddress, newBucketCount),
+ bucketsOld: make([]map[string]*knownAddress, oldBucketCount),
+ }
+ for i := range a.bucketsNew {
+ a.bucketsNew[i] = make(map[string]*knownAddress)
+ }
+ for i := range a.bucketsOld {
+ a.bucketsOld[i] = make(map[string]*knownAddress)
+ }
+ a.BaseService = *cmn.NewBaseService(nil, "AddrBook", a)
+ return a
+}
+
+// OnStart implements Service.
+func (a *AddrBook) OnStart() error {
+ if err := a.BaseService.OnStart(); err != nil {
+ return err
+ }
+
+ if err := a.loadFromFile(); err != nil {
+ return err
+ }
+
+ go a.saveRoutine()
+ return nil
+}
+
+// OnStop implements Service.
+func (a *AddrBook) OnStop() {
+ a.BaseService.OnStop()
+}
+
+// AddAddress add address to address book
+func (a *AddrBook) AddAddress(addr, src *p2p.NetAddress) error {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+ return a.addAddress(addr, src)
+}
+
+// AddOurAddress one of our addresses.
+func (a *AddrBook) AddOurAddress(addr *p2p.NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ a.ourAddrs[addr.String()] = addr
+}
+
+// GetSelection randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
+func (a *AddrBook) GetSelection() []*p2p.NetAddress {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ bookSize := a.size()
+ if bookSize == 0 {
+ return nil
+ }
+
+ numAddresses := cmn.MaxInt(cmn.MinInt(minGetSelection, bookSize), bookSize*getSelectionPercent/100)
+ numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
+ allAddr := []*p2p.NetAddress{}
+ for _, ka := range a.addrLookup {
+ allAddr = append(allAddr, ka.Addr)
+ }
+
+ for i := 0; i < numAddresses; i++ {
+ j := rand.Intn(len(allAddr)-i) + i
+ allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
+ }
+ return allAddr[:numAddresses]
+}
+
+// MarkGood marks the peer as good and moves it into an "old" bucket.
+func (a *AddrBook) MarkGood(addr *p2p.NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ ka := a.addrLookup[addr.String()]
+ if ka == nil {
+ return
+ }
+
+ ka.markGood()
+ if ka.isNew() {
+ if err := a.moveToOld(ka); err != nil {
+ log.WithField("err", err).Error("fail on move to old bucket")
+ }
+ }
+}
+
+// MarkAttempt marks that an attempt was made to connect to the address.
+func (a *AddrBook) MarkAttempt(addr *p2p.NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ if ka := a.addrLookup[addr.String()]; ka != nil {
+ ka.markAttempt()
+ }
+}
+
+// NeedMoreAddrs check does the address number meet the threshold
+func (a *AddrBook) NeedMoreAddrs() bool {
+ return a.Size() < needAddressThreshold
+}
+
+// PickAddress picks a random address from random bucket
+func (a *AddrBook) PickAddress(bias int) *p2p.NetAddress {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ if a.size() == 0 {
+ return nil
+ }
+
+ // make sure bias is in the range [0, 100]
+ if bias > 100 {
+ bias = 100
+ } else if bias < 0 {
+ bias = 0
+ }
+
+ oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(bias))
+ newCorrelation := math.Sqrt(float64(a.nNew)) * float64(bias)
+ pickFromOldBucket := (newCorrelation+oldCorrelation)*a.rand.Float64() < oldCorrelation
+ if (pickFromOldBucket && a.nOld == 0) || (!pickFromOldBucket && a.nNew == 0) {
+ return nil
+ }
+
+ var bucket map[string]*knownAddress
+ for len(bucket) == 0 {
+ if pickFromOldBucket {
+ bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
+ } else {
+ bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
+ }
+ }
+
+ randIndex := a.rand.Intn(len(bucket))
+ for _, ka := range bucket {
+ if randIndex == 0 {
+ return ka.Addr
+ }
+ randIndex--
+ }
+ return nil
+}
+
+// RemoveAddress removes the address from the book.
+func (a *AddrBook) RemoveAddress(addr *p2p.NetAddress) {
+ a.mtx.Lock()
+ defer a.mtx.Unlock()
+
+ if ka := a.addrLookup[addr.String()]; ka != nil {
+ a.removeFromAllBuckets(ka)
+ }
+}
+
+// Size count the number of know address
+func (a *AddrBook) Size() int {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+ return a.size()
+}
+
+func (a *AddrBook) addAddress(addr, src *p2p.NetAddress) error {
+ if addr == nil || src == nil {
+ return errors.New("can't add nil to address book")
+ }
+ if _, ok := a.ourAddrs[addr.String()]; ok {
+ return errors.New("add ourselves to address book")
+ }
+ if a.routabilityStrict && !addr.Routable() {
+ return errors.New("cannot add non-routable address")
+ }
+
+ ka := a.addrLookup[addr.String()]
+ if ka != nil {
+ if ka.isOld() {
+ return nil
+ }
+ if len(ka.Buckets) == maxNewBucketsPerAddress {
+ return nil
+ }
+ if factor := int32(2 * len(ka.Buckets)); a.rand.Int31n(factor) != 0 {
+ return nil
+ }
+ } else {
+ ka = newKnownAddress(addr, src)
+ }
+
+ bucket := a.calcNewBucket(addr, src)
+ return a.addToNewBucket(ka, bucket)
+}
+
+func (a *AddrBook) addToNewBucket(ka *knownAddress, bucketIdx int) error {
+ if ka.isOld() {
+ return errors.New("cant add old address to new bucket")
+ }
+
+ addrStr := ka.Addr.String()
+ bucket := a.getBucket(bucketTypeNew, bucketIdx)
+ if _, ok := bucket[addrStr]; ok {
+ return nil
+ }
+
+ if len(bucket) > newBucketSize {
+ a.expireNew(bucketIdx)
+ }
+
+ bucket[addrStr] = ka
+ a.addrLookup[addrStr] = ka
+ if ka.addBucketRef(bucketIdx) == 1 {
+ a.nNew++
+ }
+ return nil
+}
+
+func (a *AddrBook) addToOldBucket(ka *knownAddress, bucketIdx int) error {
+ if ka.isNew() {
+ return errors.New("cannot add old address to new bucket")
+ }
+ if len(ka.Buckets) != 0 {
+ return errors.New("cannot add already old address to another old bucket")
+ }
+
+ bucket := a.getBucket(bucketTypeOld, bucketIdx)
+ if len(bucket) > oldBucketSize {
+ return errors.New("old bucket is full")
+ }
+
+ addrStr := ka.Addr.String()
+ bucket[addrStr] = ka
+ a.addrLookup[addrStr] = ka
+ if ka.addBucketRef(bucketIdx) == 1 {
+ a.nOld++
+ }
+ return nil
+}
+
+func (a *AddrBook) calcNewBucket(addr, src *p2p.NetAddress) int {
+ data1 := []byte{}
+ data1 = append(data1, []byte(a.key)...)
+ data1 = append(data1, []byte(a.groupKey(addr))...)
+ data1 = append(data1, []byte(a.groupKey(src))...)
+ hash1 := crypto.DoubleSha256(data1)
+ hash64 := binary.BigEndian.Uint64(hash1)
+ hash64 %= newBucketsPerGroup
+ var hashbuf [8]byte
+ binary.BigEndian.PutUint64(hashbuf[:], hash64)
+ data2 := []byte{}
+ data2 = append(data2, []byte(a.key)...)
+ data2 = append(data2, a.groupKey(src)...)
+ data2 = append(data2, hashbuf[:]...)
+
+ hash2 := crypto.DoubleSha256(data2)
+ return int(binary.BigEndian.Uint64(hash2) % newBucketCount)
+}
+
+func (a *AddrBook) calcOldBucket(addr *p2p.NetAddress) int {
+ data1 := []byte{}
+ data1 = append(data1, []byte(a.key)...)
+ data1 = append(data1, []byte(addr.String())...)
+ hash1 := crypto.DoubleSha256(data1)
+ hash64 := binary.BigEndian.Uint64(hash1)
+ hash64 %= oldBucketsPerGroup
+ var hashbuf [8]byte
+ binary.BigEndian.PutUint64(hashbuf[:], hash64)
+ data2 := []byte{}
+ data2 = append(data2, []byte(a.key)...)
+ data2 = append(data2, a.groupKey(addr)...)
+ data2 = append(data2, hashbuf[:]...)
+
+ hash2 := crypto.DoubleSha256(data2)
+ return int(binary.BigEndian.Uint64(hash2) % oldBucketCount)
+}
+
+func (a *AddrBook) expireNew(bucketIdx int) {
+ for _, ka := range a.bucketsNew[bucketIdx] {
+ if ka.isBad() {
+ a.removeFromBucket(ka, bucketIdx)
+ return
+ }
+ }
+
+ oldest := a.pickOldest(bucketTypeNew, bucketIdx)
+ a.removeFromBucket(oldest, bucketIdx)
+}
+
+func (a *AddrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress {
+ switch bucketType {
+ case bucketTypeNew:
+ return a.bucketsNew[bucketIdx]
+ case bucketTypeOld:
+ return a.bucketsOld[bucketIdx]
+ default:
+ log.Error("try to access an unknow address book bucket type")
+ return nil
+ }
+}
+
+func (a *AddrBook) groupKey(na *p2p.NetAddress) string {
+ if a.routabilityStrict && na.Local() {
+ return "local"
+ }
+ if a.routabilityStrict && !na.Routable() {
+ return "unroutable"
+ }
+ if ipv4 := na.IP.To4(); ipv4 != nil {
+ return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(16, 32)}).String()
+ }
+ if na.RFC6145() || na.RFC6052() {
+ // last four bytes are the ip address
+ ip := net.IP(na.IP[12:16])
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+ }
+ if na.RFC3964() {
+ ip := net.IP(na.IP[2:7])
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+
+ }
+ if na.RFC4380() {
+ // teredo tunnels have the last 4 bytes as the v4 address XOR 0xff.
+ ip := net.IP(make([]byte, 4))
+ for i, byte := range na.IP[12:16] {
+ ip[i] = byte ^ 0xff
+ }
+ return (&net.IPNet{IP: ip, Mask: net.CIDRMask(16, 32)}).String()
+ }
+
+ bits := 32
+ heNet := &net.IPNet{IP: net.ParseIP("2001:470::"), Mask: net.CIDRMask(32, 128)}
+ if heNet.Contains(na.IP) {
+ bits = 36
+ }
+ return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String()
+}
+
+func (a *AddrBook) moveToOld(ka *knownAddress) error {
+ if ka.isOld() {
+ return errors.New("cannot promote address that is already old")
+ }
+ if len(ka.Buckets) == 0 {
+ return errors.New("cannot promote address that isn't in any new buckets")
+ }
+
+ a.removeFromAllBuckets(ka)
+ ka.BucketType = bucketTypeOld
+ oldBucketIdx := a.calcOldBucket(ka.Addr)
+ return a.addToOldBucket(ka, oldBucketIdx)
+}
+
+func (a *AddrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
+ bucket := a.getBucket(bucketType, bucketIdx)
+ var oldest *knownAddress
+ for _, ka := range bucket {
+ if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
+ oldest = ka
+ }
+ }
+ return oldest
+}
+
+func (a *AddrBook) removeFromAllBuckets(ka *knownAddress) {
+ delete(a.addrLookup, ka.Addr.String())
+ for _, bucketIdx := range ka.Buckets {
+ bucket := a.getBucket(ka.BucketType, bucketIdx)
+ delete(bucket, ka.Addr.String())
+ }
+ ka.Buckets = nil
+ if ka.BucketType == bucketTypeNew {
+ a.nNew--
+ } else {
+ a.nOld--
+ }
+}
+
+func (a *AddrBook) removeFromBucket(ka *knownAddress, bucketIdx int) {
+ bucket := a.getBucket(ka.BucketType, bucketIdx)
+ delete(bucket, ka.Addr.String())
+ if ka.removeBucketRef(bucketIdx) == 0 {
+ delete(a.addrLookup, ka.Addr.String())
+ if ka.BucketType == bucketTypeNew {
+ a.nNew--
+ } else {
+ a.nOld--
+ }
+ }
+}
+
+func (a *AddrBook) size() int {
+ return a.nNew + a.nOld
+}
// +build !network
-
-package p2p
+package pex
import (
"fmt"
"github.com/stretchr/testify/assert"
"github.com/tendermint/tmlibs/log"
-)
-
-func createTempFileName(prefix string) string {
- f, err := ioutil.TempFile("", prefix)
- if err != nil {
- panic(err)
- }
- fname := f.Name()
- err = f.Close()
- if err != nil {
- panic(err)
- }
- return fname
-}
-
-func TestAddrBookSaveLoad(t *testing.T) {
- fname := createTempFileName("addrbook_test")
-
- // 0 addresses
- book := NewAddrBook(fname, true)
- book.SetLogger(log.TestingLogger())
- book.saveToFile(fname)
-
- book = NewAddrBook(fname, true)
- book.SetLogger(log.TestingLogger())
- book.loadFromFile(fname)
- assert.Zero(t, book.Size())
-
- // 100 addresses
- randAddrs := randNetAddressPairs(t, 100)
-
- for _, addrSrc := range randAddrs {
- book.AddAddress(addrSrc.addr, addrSrc.src)
- }
-
- assert.Equal(t, 100, book.Size())
- book.saveToFile(fname)
-
- book = NewAddrBook(fname, true)
- book.SetLogger(log.TestingLogger())
- book.loadFromFile(fname)
-
- assert.Equal(t, 100, book.Size())
-}
+ "github.com/bytom/p2p"
+)
func TestAddrBookLookup(t *testing.T) {
fname := createTempFileName("addrbook_test")
-
randAddrs := randNetAddressPairs(t, 100)
book := NewAddrBook(fname, true)
fname := createTempFileName("addrbook_test")
randAddrs := randNetAddressPairs(t, 100)
-
book := NewAddrBook(fname, true)
book.SetLogger(log.TestingLogger())
for _, addrSrc := range randAddrs {
}
}
- // TODO: do more testing :)
-
selection := book.GetSelection()
t.Logf("selection: %v", selection)
book.SetLogger(log.TestingLogger())
randAddrs := randNetAddressPairs(t, 100)
-
differentSrc := randIPv4Address(t)
for _, addrSrc := range randAddrs {
book.AddAddress(addrSrc.addr, addrSrc.src)
assert.Equal(t, 100, book.Size())
}
+func TestAddrBookRemoveAddress(t *testing.T) {
+ fname := createTempFileName("addrbook_test")
+ book := NewAddrBook(fname, true)
+ book.SetLogger(log.TestingLogger())
+
+ addr := randIPv4Address(t)
+ book.AddAddress(addr, addr)
+ assert.Equal(t, 1, book.Size())
+
+ book.RemoveAddress(addr)
+ assert.Equal(t, 0, book.Size())
+
+ nonExistingAddr := randIPv4Address(t)
+ book.RemoveAddress(nonExistingAddr)
+ assert.Equal(t, 0, book.Size())
+}
+
type netAddressPair struct {
- addr *NetAddress
- src *NetAddress
+ addr *p2p.NetAddress
+ src *p2p.NetAddress
+}
+
+func createTempFileName(prefix string) string {
+ f, err := ioutil.TempFile("", prefix)
+ if err != nil {
+ panic(err)
+ }
+ fname := f.Name()
+ if err = f.Close(); err != nil {
+ panic(err)
+ }
+ return fname
}
func randNetAddressPairs(t *testing.T, n int) []netAddressPair {
return randAddrs
}
-func randIPv4Address(t *testing.T) *NetAddress {
+func randIPv4Address(t *testing.T) *p2p.NetAddress {
for {
ip := fmt.Sprintf("%v.%v.%v.%v",
rand.Intn(254)+1,
rand.Intn(255),
)
port := rand.Intn(65535-1) + 1
- addr, err := NewNetAddressString(fmt.Sprintf("%v:%v", ip, port))
+ addr, err := p2p.NewNetAddressString(fmt.Sprintf("%v:%v", ip, port))
assert.Nil(t, err, "error generating rand network address")
if addr.Routable() {
return addr
}
}
}
-
-func TestAddrBookRemoveAddress(t *testing.T) {
- fname := createTempFileName("addrbook_test")
- book := NewAddrBook(fname, true)
- book.SetLogger(log.TestingLogger())
-
- addr := randIPv4Address(t)
- book.AddAddress(addr, addr)
- assert.Equal(t, 1, book.Size())
-
- book.RemoveAddress(addr)
- assert.Equal(t, 0, book.Size())
-
- nonExistingAddr := randIPv4Address(t)
- book.RemoveAddress(nonExistingAddr)
- assert.Equal(t, 0, book.Size())
-}
--- /dev/null
+package pex
+
+import (
+ "encoding/json"
+ "os"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
+)
+
+type addrBookJSON struct {
+ Key string
+ Addrs []*knownAddress
+}
+
+// SaveToFile will save the address book to a json file in disk
+func (a *AddrBook) SaveToFile() error {
+ a.mtx.RLock()
+ defer a.mtx.RUnlock()
+
+ aJSON := &addrBookJSON{Key: a.key, Addrs: []*knownAddress{}}
+ for _, ka := range a.addrLookup {
+ aJSON.Addrs = append(aJSON.Addrs, ka)
+ }
+
+ rawDats, err := json.MarshalIndent(aJSON, "", "\t")
+ if err != nil {
+ return err
+ }
+ return cmn.WriteFileAtomic(a.filePath, rawDats, 0644)
+}
+
+func (a *AddrBook) loadFromFile() error {
+ if _, err := os.Stat(a.filePath); os.IsNotExist(err) {
+ return nil
+ }
+
+ r, err := os.Open(a.filePath)
+ if err != nil {
+ return err
+ }
+
+ defer r.Close()
+ aJSON := &addrBookJSON{}
+ if err = json.NewDecoder(r).Decode(aJSON); err != nil {
+ return err
+ }
+
+ a.key = aJSON.Key
+ for _, ka := range aJSON.Addrs {
+ a.addrLookup[ka.Addr.String()] = ka
+ for _, bucketIndex := range ka.Buckets {
+ bucket := a.getBucket(ka.BucketType, bucketIndex)
+ bucket[ka.Addr.String()] = ka
+ }
+ if ka.BucketType == bucketTypeNew {
+ a.nNew++
+ } else {
+ a.nOld++
+ }
+ }
+ return nil
+}
+
+func (a *AddrBook) saveRoutine() {
+ ticker := time.NewTicker(2 * time.Minute)
+ for {
+ select {
+ case <-ticker.C:
+ if err := a.SaveToFile(); err != nil {
+ log.WithField("err", err).Error("failed to save AddrBook to file")
+ }
+ case <-a.Quit:
+ a.SaveToFile()
+ return
+ }
+ }
+}
--- /dev/null
+package pex
+
+import (
+ "os"
+ "testing"
+)
+
+func TestFileStorage(t *testing.T) {
+ file := createTempFileName("TestFileStorage")
+ defer os.Remove(file)
+
+ a := NewAddrBook(file, true)
+ for i := 1; i < 256; i++ {
+ if err := a.addAddress(randIPv4Address(t), randIPv4Address(t)); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ i := 0
+ for _, ka := range a.addrLookup {
+ i++
+ if i%7 != 0 {
+ continue
+ }
+ if err := a.moveToOld(ka); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ if err := a.SaveToFile(); err != nil {
+ t.Fatal(err)
+ }
+
+ // load address book b from file
+ b := NewAddrBook(file, true)
+ if err := b.loadFromFile(); err != nil {
+ t.Fatal(err)
+ }
+
+ for key, want := range a.addrLookup {
+ got, ok := b.addrLookup[key]
+ if !ok {
+ t.Errorf("can't find %s in loaded address book", key)
+ }
+ if !want.Addr.Equals(got.Addr) || !want.Src.Equals(got.Src) {
+ t.Errorf("addrLookup check want %v but get %v", want, got)
+ }
+ }
+
+ for i, aBucket := range a.bucketsNew {
+ bBucket := b.bucketsNew[i]
+ for j, want := range aBucket {
+ got := bBucket[j]
+ if !want.Addr.Equals(got.Addr) {
+ t.Errorf("new bucket check want %v but get %v", want, got)
+ }
+ }
+ }
+
+ for i, aBucket := range a.bucketsOld {
+ bBucket := b.bucketsOld[i]
+ for j, want := range aBucket {
+ got := bBucket[j]
+ if !want.Addr.Equals(got.Addr) {
+ t.Errorf("old bucket check want %v but get %v", want, got)
+ }
+ }
+ }
+}
--- /dev/null
+package pex
+
+import (
+ "time"
+
+ "github.com/bytom/p2p"
+)
+
+type knownAddress struct {
+ Addr *p2p.NetAddress
+ Src *p2p.NetAddress
+ Attempts int32
+ LastAttempt time.Time
+ LastSuccess time.Time
+ BucketType byte
+ Buckets []int
+}
+
+func newKnownAddress(addr, src *p2p.NetAddress) *knownAddress {
+ return &knownAddress{
+ Addr: addr,
+ Src: src,
+ Attempts: 0,
+ LastAttempt: time.Now(),
+ BucketType: bucketTypeNew,
+ Buckets: nil,
+ }
+}
+
+func (ka *knownAddress) addBucketRef(bucketIdx int) int {
+ for _, bucket := range ka.Buckets {
+ if bucket == bucketIdx {
+ return -1
+ }
+ }
+ ka.Buckets = append(ka.Buckets, bucketIdx)
+ return len(ka.Buckets)
+}
+
+func (ka *knownAddress) isBad() bool {
+ if ka.BucketType == bucketTypeOld {
+ return false
+ }
+ if ka.LastAttempt.After(time.Now().Add(-1*time.Minute)) && ka.Attempts != 0 {
+ return true
+ }
+ if ka.LastAttempt.Before(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) {
+ return true
+ }
+ if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries {
+ return true
+ }
+ if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && ka.Attempts >= maxFailures {
+ return true
+ }
+ return false
+}
+
+func (ka *knownAddress) isOld() bool {
+ return ka.BucketType == bucketTypeOld
+}
+
+func (ka *knownAddress) isNew() bool {
+ return ka.BucketType == bucketTypeNew
+}
+
+func (ka *knownAddress) markAttempt() {
+ ka.LastAttempt = time.Now()
+ ka.Attempts++
+}
+
+func (ka *knownAddress) markGood() {
+ now := time.Now()
+ ka.LastAttempt = now
+ ka.LastSuccess = now
+ ka.Attempts = 0
+}
+
+func (ka *knownAddress) removeBucketRef(bucketIdx int) int {
+ buckets := []int{}
+ for _, bucket := range ka.Buckets {
+ if bucket != bucketIdx {
+ buckets = append(buckets, bucket)
+ }
+ }
+ if len(buckets) != len(ka.Buckets)-1 {
+ return -1
+ }
+ ka.Buckets = buckets
+ return len(ka.Buckets)
+}
--- /dev/null
+package pex
+
+const (
+ bucketTypeNew = 0x01
+ bucketTypeOld = 0x02
+
+ oldBucketSize = 64
+ oldBucketCount = 64
+ oldBucketsPerGroup = 4
+ newBucketSize = 64
+ newBucketCount = 256
+ newBucketsPerGroup = 32
+
+ getSelectionPercent = 23
+ minGetSelection = 32
+ maxGetSelection = 250
+
+ needAddressThreshold = 1000 // addresses under which the address manager will claim to need more addresses.
+ maxNewBucketsPerAddress = 4 // buckets a frequently seen new address may end up in.
+ numMissingDays = 30 // days before which we assume an address has vanished
+ numRetries = 3 // tries without a single success before we assume an address is bad.
+ maxFailures = 10 // max failures we will accept without a success before considering an address bad.
+ minBadDays = 7 // days since the last success before we will consider evicting an address.
+)
--- /dev/null
+package pex
+
+import (
+ "bytes"
+ "fmt"
+
+ wire "github.com/tendermint/go-wire"
+
+ "github.com/bytom/p2p"
+)
+
+const (
+ msgTypeRequest = byte(0x01)
+ msgTypeAddrs = byte(0x02)
+)
+
+// PexMessage is a primary type for PEX messages. Underneath, it could contain
+// either pexRequestMessage, or pexAddrsMessage messages.
+type PexMessage interface{}
+
+var _ = wire.RegisterInterface(
+ struct{ PexMessage }{},
+ wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
+ wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
+)
+
+// DecodeMessage implements interface registered above.
+func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
+ msgType = bz[0]
+ n := new(int)
+ r := bytes.NewReader(bz)
+ msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
+ return
+}
+
+type pexRequestMessage struct{}
+
+func (m *pexRequestMessage) String() string { return "[pexRequest]" }
+
+type pexAddrsMessage struct {
+ Addrs []*p2p.NetAddress
+}
+
+func (m *pexAddrsMessage) String() string { return fmt.Sprintf("[pexAddrs %v]", m.Addrs) }
--- /dev/null
+package pex
+
+import (
+ "errors"
+ "math/rand"
+ "reflect"
+ "strings"
+ "sync"
+ "time"
+
+ log "github.com/sirupsen/logrus"
+ cmn "github.com/tendermint/tmlibs/common"
+
+ "github.com/bytom/p2p"
+)
+
+const (
+ // PexChannel is a channel for PEX messages
+ PexChannel = byte(0x00)
+
+ minNumOutboundPeers = 5
+ maxPexMessageSize = 1048576 // 1MB
+ defaultMaxMsgCountByPeer = uint16(1000)
+)
+
+// PEXReactor handles peer exchange and ensures that an adequate number of peers are connected to the switch.
+type PEXReactor struct {
+ p2p.BaseReactor
+ book *AddrBook
+ msgCountByPeer *cmn.CMap
+}
+
+// NewPEXReactor creates new PEX reactor.
+func NewPEXReactor(b *AddrBook) *PEXReactor {
+ r := &PEXReactor{
+ book: b,
+ msgCountByPeer: cmn.NewCMap(),
+ }
+ r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r)
+ return r
+}
+
+// OnStart implements BaseService
+func (r *PEXReactor) OnStart() error {
+ r.BaseReactor.OnStart()
+ if _, err := r.book.Start(); err != nil {
+ return err
+ }
+
+ go r.ensurePeersRoutine()
+ go r.flushMsgCountByPeer()
+ return nil
+}
+
+// OnStop implements BaseService
+func (r *PEXReactor) OnStop() {
+ r.BaseReactor.OnStop()
+ r.book.Stop()
+}
+
+// GetChannels implements Reactor
+func (r *PEXReactor) GetChannels() []*p2p.ChannelDescriptor {
+ return []*p2p.ChannelDescriptor{&p2p.ChannelDescriptor{
+ ID: PexChannel,
+ Priority: 1,
+ SendQueueCapacity: 10,
+ }}
+}
+
+// AddPeer adding peer to the address book
+func (r *PEXReactor) AddPeer(p *p2p.Peer) error {
+ if p.IsOutbound() {
+ if r.book.NeedMoreAddrs() && !r.RequestAddrs(p) {
+ return errors.New("Send pex message fail")
+ }
+ return nil
+ }
+
+ addr, err := p2p.NewNetAddressString(p.ListenAddr)
+ if err != nil {
+ return errors.New("addPeer: invalid peer address")
+ }
+
+ if err := r.book.AddAddress(addr, addr); err != nil {
+ return err
+ }
+
+ if r.Switch.Peers().Size() >= r.Switch.Config.MaxNumPeers {
+ if r.SendAddrs(p, r.book.GetSelection()) {
+ <-time.After(1 * time.Second)
+ r.Switch.StopPeerGracefully(p)
+ }
+ return errors.New("addPeer: reach the max peer, exchange then close")
+ }
+ return nil
+}
+
+// Receive implements Reactor by handling incoming PEX messages.
+func (r *PEXReactor) Receive(chID byte, p *p2p.Peer, rawMsg []byte) {
+ srcAddr := p.Connection().RemoteAddress
+ srcAddrStr := srcAddr.String()
+ r.incrementMsgCount(srcAddrStr)
+ if r.reachedMaxMsgLimit(srcAddrStr) {
+ log.WithField("peer", srcAddrStr).Error("reached the max pex messages limit")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+
+ _, msg, err := DecodeMessage(rawMsg)
+ if err != nil {
+ log.WithField("error", err).Error("failed to decoding pex message")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+
+ switch msg := msg.(type) {
+ case *pexRequestMessage:
+ if !r.SendAddrs(p, r.book.GetSelection()) {
+ log.Error("failed to send pex address message")
+ }
+
+ case *pexAddrsMessage:
+ for _, addr := range msg.Addrs {
+ if err := r.book.AddAddress(addr, srcAddr); err != nil {
+ log.WithField("error", err).Error("pex fail on process pexAddrsMessage")
+ r.Switch.StopPeerGracefully(p)
+ return
+ }
+ }
+
+ default:
+ log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
+ }
+}
+
+// RemovePeer implements Reactor.
+func (r *PEXReactor) RemovePeer(p *p2p.Peer, reason interface{}) {}
+
+// RequestAddrs asks peer for more addresses.
+func (r *PEXReactor) RequestAddrs(p *p2p.Peer) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
+ if !ok {
+ r.Switch.StopPeerGracefully(p)
+ }
+ return ok
+}
+
+// SendAddrs sends addrs to the peer.
+func (r *PEXReactor) SendAddrs(p *p2p.Peer, addrs []*p2p.NetAddress) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
+ if !ok {
+ r.Switch.StopPeerGracefully(p)
+ }
+ return ok
+}
+
+func (r *PEXReactor) dialPeerWorker(a *p2p.NetAddress, wg *sync.WaitGroup) {
+ if err := r.Switch.DialPeerWithAddress(a); err != nil {
+ r.book.MarkAttempt(a)
+ } else {
+ r.book.MarkGood(a)
+ }
+ wg.Done()
+}
+
+func (r *PEXReactor) dialSeeds() {
+ if r.Switch.Config.Seeds == "" {
+ return
+ }
+
+ seeds := strings.Split(r.Switch.Config.Seeds, ",")
+ netAddrs, err := p2p.NewNetAddressStrings(seeds)
+ if err != nil {
+ log.WithField("err", err).Error("dialSeeds: fail to decode net address strings")
+ }
+
+ ourAddr, err := p2p.NewNetAddressString(r.Switch.NodeInfo().ListenAddr)
+ if err != nil {
+ log.WithField("err", err).Error("dialSeeds: fail to get our address")
+ }
+
+ for _, netAddr := range netAddrs {
+ if netAddr.Equals(ourAddr) {
+ continue
+ }
+ if err := r.book.AddAddress(netAddr, ourAddr); err != nil {
+ log.WithField("err", err).Warn("dialSeeds: fail to add address")
+ }
+ }
+
+ if err := r.book.SaveToFile(); err != nil {
+ log.WithField("err", err).Warn("dialSeeds: fail to save address book")
+ }
+
+ perm := rand.Perm(len(netAddrs))
+ for i := 0; i < len(perm); i += 2 {
+ if err := r.Switch.DialPeerWithAddress(netAddrs[perm[i]]); err != nil {
+ log.WithField("err", err).Warn("dialSeeds: fail to dial seed")
+ }
+ }
+}
+
+func (r *PEXReactor) ensurePeers() {
+ numOutPeers, _, numDialing := r.Switch.NumPeers()
+ numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 3
+ log.WithFields(log.Fields{
+ "numOutPeers": numOutPeers,
+ "numDialing": numDialing,
+ "numToDial": numToDial,
+ }).Debug("ensure peers")
+ if numToDial <= 0 {
+ return
+ }
+
+ newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
+ toDial := make(map[string]*p2p.NetAddress)
+ maxAttempts := numToDial * 3
+
+ connectedPeers := make(map[string]struct{})
+ for _, peer := range r.Switch.Peers().List() {
+ connectedPeers[peer.RemoteAddrHost()] = struct{}{}
+ }
+
+ for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
+ try := r.book.PickAddress(newBias)
+ if try == nil {
+ continue
+ }
+ if _, selected := toDial[try.IP.String()]; selected {
+ continue
+ }
+ if dialling := r.Switch.IsDialing(try); dialling {
+ continue
+ }
+ if _, ok := connectedPeers[try.IP.String()]; ok {
+ continue
+ }
+
+ log.Debug("Will dial address addr:", try)
+ toDial[try.IP.String()] = try
+ }
+
+ var wg sync.WaitGroup
+ for _, item := range toDial {
+ wg.Add(1)
+ go r.dialPeerWorker(item, &wg)
+ }
+ wg.Wait()
+
+ if r.book.NeedMoreAddrs() {
+ if peers := r.Switch.Peers().List(); len(peers) > 0 {
+ peer := peers[rand.Int()%len(peers)]
+ r.RequestAddrs(peer)
+ }
+ }
+}
+
+func (r *PEXReactor) ensurePeersRoutine() {
+ r.ensurePeers()
+ if r.Switch.Peers().Size() < 3 {
+ r.dialSeeds()
+ }
+
+ ticker := time.NewTicker(120 * time.Second)
+ quickTicker := time.NewTicker(3 * time.Second)
+
+ for {
+ select {
+ case <-ticker.C:
+ r.ensurePeers()
+ case <-quickTicker.C:
+ if r.Switch.Peers().Size() < 3 {
+ r.ensurePeers()
+ }
+ case <-r.Quit:
+ return
+ }
+ }
+}
+
+func (r *PEXReactor) flushMsgCountByPeer() {
+ ticker := time.NewTicker(1 * time.Hour)
+ for {
+ select {
+ case <-ticker.C:
+ r.msgCountByPeer.Clear()
+ case <-r.Quit:
+ return
+ }
+ }
+}
+
+func (r *PEXReactor) incrementMsgCount(addr string) {
+ var count uint16
+ if countI := r.msgCountByPeer.Get(addr); countI != nil {
+ count = countI.(uint16)
+ }
+ count++
+ r.msgCountByPeer.Set(addr, count)
+}
+
+func (r *PEXReactor) reachedMaxMsgLimit(addr string) bool {
+ return r.msgCountByPeer.Get(addr).(uint16) >= defaultMaxMsgCountByPeer
+}
+++ /dev/null
-package p2p
-
-import (
- "bytes"
- "fmt"
- "math/rand"
- "reflect"
- "strings"
- "sync"
- "time"
-
- log "github.com/sirupsen/logrus"
- wire "github.com/tendermint/go-wire"
- cmn "github.com/tendermint/tmlibs/common"
-
- "github.com/bytom/errors"
-)
-
-const (
- // PexChannel is a channel for PEX messages
- PexChannel = byte(0x00)
-
- // period to ensure peers connected
- defaultEnsurePeersPeriod = 120 * time.Second
- minNumOutboundPeers = 5
- maxPexMessageSize = 1048576 // 1MB
-
- // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
- defaultMaxMsgCountByPeer = 1000
- msgCountByPeerFlushInterval = 1 * time.Hour
-)
-
-var ErrSendPexFail = errors.New("Send pex message fail")
-
-// PEXReactor handles PEX (peer exchange) and ensures that an
-// adequate number of peers are connected to the switch.
-//
-// It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
-//
-// ## Preventing abuse
-//
-// For now, it just limits the number of messages from one peer to
-// `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
-// msg/hour).
-//
-// NOTE [2017-01-17]:
-// Limiting is fine for now. Maybe down the road we want to keep track of the
-// quality of peer messages so if peerA keeps telling us about peers we can't
-// connect to then maybe we should care less about peerA. But I don't think
-// that kind of complexity is priority right now.
-type PEXReactor struct {
- BaseReactor
-
- sw *Switch
- book *AddrBook
- ensurePeersPeriod time.Duration
-
- // tracks message count by peer, so we can prevent abuse
- msgCountByPeer *cmn.CMap
- maxMsgCountByPeer uint16
-}
-
-// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
- r := &PEXReactor{
- sw: sw,
- book: b,
- ensurePeersPeriod: defaultEnsurePeersPeriod,
- msgCountByPeer: cmn.NewCMap(),
- maxMsgCountByPeer: defaultMaxMsgCountByPeer,
- }
- r.BaseReactor = *NewBaseReactor("PEXReactor", r)
- return r
-}
-
-// OnStart implements BaseService
-func (r *PEXReactor) OnStart() error {
- r.BaseReactor.OnStart()
- r.book.Start()
- go r.ensurePeersRoutine()
- go r.flushMsgCountByPeer()
- return nil
-}
-
-// OnStop implements BaseService
-func (r *PEXReactor) OnStop() {
- r.BaseReactor.OnStop()
- r.book.Stop()
-}
-
-// GetChannels implements Reactor
-func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
- return []*ChannelDescriptor{
- &ChannelDescriptor{
- ID: PexChannel,
- Priority: 1,
- SendQueueCapacity: 10,
- },
- }
-}
-
-// AddPeer implements Reactor by adding peer to the address book (if inbound)
-// or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) error {
- if p.IsOutbound() {
- // For outbound peers, the address is already in the books.
- // Either it was added in DialSeeds or when we
- // received the peer's address in r.Receive
- if r.book.NeedMoreAddrs() {
- if ok := r.RequestPEX(p); !ok {
- return ErrSendPexFail
- }
- }
- return nil
- }
-
- // For inbound connections, the peer is its own source
- addr, err := NewNetAddressString(p.ListenAddr)
- if err != nil {
- // this should never happen
- log.WithFields(log.Fields{
- "addr": p.ListenAddr,
- "error": err,
- }).Error("Error in AddPeer: Invalid peer address")
- return errors.New("Error in AddPeer: Invalid peer address")
- }
- r.book.AddAddress(addr, addr)
-
- // close the connect if connect is big than max limit
- if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
- if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
- <-time.After(1 * time.Second)
- r.sw.StopPeerGracefully(p)
- }
- return errors.New("Error in AddPeer: reach the max peer, exchange then close")
- }
-
- return nil
-}
-
-// RemovePeer implements Reactor.
-func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
- // If we aren't keeping track of local temp data for each peer here, then we
- // don't have to do anything.
-}
-
-// Receive implements Reactor by handling incoming PEX messages.
-func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
- srcAddr := src.Connection().RemoteAddress
- srcAddrStr := srcAddr.String()
-
- r.IncrementMsgCountForPeer(srcAddrStr)
- if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
- log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
- // TODO remove src from peers?
- return
- }
-
- _, msg, err := DecodeMessage(msgBytes)
- if err != nil {
- log.WithField("error", err).Error("Error decoding message")
- return
- }
- log.WithField("msg", msg).Info("Reveived message")
-
- switch msg := msg.(type) {
- case *pexRequestMessage:
- // src requested some peers.
- if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
- log.Info("Send address message failed. Stop peer.")
- }
- case *pexAddrsMessage:
- // We received some peer addresses from src.
- // (We don't want to get spammed with bad peers)
- for _, addr := range msg.Addrs {
- if addr != nil {
- r.book.AddAddress(addr, srcAddr)
- }
- }
- default:
- log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
- }
-}
-
-// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestPEX(p *Peer) bool {
- ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
- if !ok {
- r.sw.StopPeerGracefully(p)
- }
- return ok
-}
-
-// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
- ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
- if !ok {
- r.sw.StopPeerGracefully(p)
- }
- return ok
-}
-
-// SetEnsurePeersPeriod sets period to ensure peers connected.
-func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
- r.ensurePeersPeriod = d
-}
-
-// SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
-func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
- r.maxMsgCountByPeer = v
-}
-
-// ReachedMaxMsgCountForPeer returns true if we received too many
-// messages from peer with address `addr`.
-// NOTE: assumes the value in the CMap is non-nil
-func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
- return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
-}
-
-// Increment or initialize the msg count for the peer in the CMap
-func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
- var count uint16
- countI := r.msgCountByPeer.Get(addr)
- if countI != nil {
- count = countI.(uint16)
- }
- count++
- r.msgCountByPeer.Set(addr, count)
-}
-
-// Ensures that sufficient peers are connected. (continuous)
-func (r *PEXReactor) ensurePeersRoutine() {
- // Randomize when routine starts
- ensurePeersPeriodMs := int64(10000)
- time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
-
- // fire once immediately.
- r.ensurePeers()
-
- // fire periodically
- ticker := time.NewTicker(r.ensurePeersPeriod)
- quickTicker := time.NewTicker(time.Second * 1)
-
- for {
- select {
- case <-ticker.C:
- r.ensurePeers()
- case <-quickTicker.C:
- if r.sw.peers.Size() < 3 {
- r.ensurePeers()
- }
- case <-r.Quit:
- ticker.Stop()
- quickTicker.Stop()
- return
- }
- }
-}
-
-// ensurePeers ensures that sufficient peers are connected. (once)
-//
-// Old bucket / New bucket are arbitrary categories to denote whether an
-// address is vetted or not, and this needs to be determined over time via a
-// heuristic that we haven't perfected yet, or, perhaps is manually edited by
-// the node operator. It should not be used to compute what addresses are
-// already connected or not.
-//
-// TODO Basically, we need to work harder on our good-peer/bad-peer marking.
-// What we're currently doing in terms of marking good/bad peers is just a
-// placeholder. It should not be the case that an address becomes old/vetted
-// upon a single successful connection.
-func (r *PEXReactor) ensurePeers() {
- numOutPeers, _, numDialing := r.Switch.NumPeers()
- numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) * 5
- log.WithFields(log.Fields{
- "numOutPeers": numOutPeers,
- "numDialing": numDialing,
- "numToDial": numToDial,
- }).Info("Ensure peers")
- if numToDial <= 0 {
- return
- }
-
- newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
- toDial := make(map[string]*NetAddress)
-
- // Try to pick numToDial addresses to dial.
- for i := 0; i < numToDial; i++ {
- // The purpose of newBias is to first prioritize old (more vetted) peers
- // when we have few connections, but to allow for new (less vetted) peers
- // if we already have many connections. This algorithm isn't perfect, but
- // it somewhat ensures that we prioritize connecting to more-vetted
- // peers.
-
- var picked *NetAddress
- // Try to fetch a new peer 3 times.
- // This caps the maximum number of tries to 3 * numToDial.
- for j := 0; j < 3; j++ {
- try := r.book.PickAddress(newBias)
- if try == nil {
- break
- }
- ka := r.book.addrLookup[try.String()]
- if ka != nil {
- if ka.isBad() {
- continue
- }
- }
- _, alreadySelected := toDial[try.IP.String()]
- alreadyDialing := r.Switch.IsDialing(try)
- var alreadyConnected bool
-
- for _, v := range r.Switch.Peers().list {
- if strings.Compare(v.mconn.RemoteAddress.IP.String(), try.IP.String()) == 0 {
- alreadyConnected = true
- break
- }
- }
- if alreadySelected || alreadyDialing || alreadyConnected {
- continue
- } else {
- log.Debug("Will dial address addr:", try)
- picked = try
- break
- }
- }
- if picked == nil {
- continue
- }
- toDial[picked.IP.String()] = picked
- }
-
- var wg sync.WaitGroup
- for _, item := range toDial {
- wg.Add(1)
- go r.dialPeerWorker(item, &wg)
- }
- wg.Wait()
-
- // If we need more addresses, pick a random peer and ask for more.
- if r.book.NeedMoreAddrs() {
- if peers := r.Switch.Peers().List(); len(peers) > 0 {
- i := rand.Int() % len(peers)
- peer := peers[i]
- log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
- if ok := r.RequestPEX(peer); !ok {
- log.Info("Send request address message failed. Stop peer.")
- }
- }
- }
-}
-
-func (r *PEXReactor) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
- if err := r.Switch.DialPeerWithAddress(a); err != nil {
- r.book.MarkAttempt(a)
- } else {
- r.book.MarkGood(a)
- }
- wg.Done()
-}
-
-func (r *PEXReactor) flushMsgCountByPeer() {
- ticker := time.NewTicker(msgCountByPeerFlushInterval)
-
- for {
- select {
- case <-ticker.C:
- r.msgCountByPeer.Clear()
- case <-r.Quit:
- ticker.Stop()
- return
- }
- }
-}
-
-//-----------------------------------------------------------------------------
-// Messages
-
-const (
- msgTypeRequest = byte(0x01)
- msgTypeAddrs = byte(0x02)
-)
-
-// PexMessage is a primary type for PEX messages. Underneath, it could contain
-// either pexRequestMessage, or pexAddrsMessage messages.
-type PexMessage interface{}
-
-var _ = wire.RegisterInterface(
- struct{ PexMessage }{},
- wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
- wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
-)
-
-// DecodeMessage implements interface registered above.
-func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
- msgType = bz[0]
- n := new(int)
- r := bytes.NewReader(bz)
- msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
- return
-}
-
-/*
-A pexRequestMessage requests additional peer addresses.
-*/
-type pexRequestMessage struct {
-}
-
-func (m *pexRequestMessage) String() string {
- return "[pexRequest]"
-}
-
-/*
-A message with announced peer addresses.
-*/
-type pexAddrsMessage struct {
- Addrs []*NetAddress
-}
-
-func (m *pexAddrsMessage) String() string {
- return fmt.Sprintf("[pexAddrs %v]", m.Addrs)
-}
+++ /dev/null
-// +build !network
-
-package p2p
-
-import (
- "io/ioutil"
- "math/rand"
- "os"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
- "github.com/stretchr/testify/require"
- wire "github.com/tendermint/go-wire"
- cmn "github.com/tendermint/tmlibs/common"
- "github.com/tendermint/tmlibs/log"
-)
-
-var (
- sw *Switch
-)
-
-func TestPEXReactorBasic(t *testing.T) {
- assert, require := assert.New(t), require.New(t)
-
- dir, err := ioutil.TempDir("", "pex_reactor")
- require.Nil(err)
- defer os.RemoveAll(dir)
- book := NewAddrBook(dir+"addrbook.json", true)
- book.SetLogger(log.TestingLogger())
-
- r := NewPEXReactor(book, sw)
- r.SetLogger(log.TestingLogger())
-
- assert.NotNil(r)
- assert.NotEmpty(r.GetChannels())
-}
-
-func TestPEXReactorAddRemovePeer(t *testing.T) {
- assert, require := assert.New(t), require.New(t)
-
- dir, err := ioutil.TempDir("", "pex_reactor")
- require.Nil(err)
- defer os.RemoveAll(dir)
- book := NewAddrBook(dir+"addrbook.json", true)
- book.SetLogger(log.TestingLogger())
-
- r := NewPEXReactor(book, sw)
- r.SetLogger(log.TestingLogger())
-
- size := book.Size()
- peer := createRandomPeer(false)
-
- r.AddPeer(peer)
- assert.Equal(size+1, book.Size())
-
- r.RemovePeer(peer, "peer not available")
- assert.Equal(size+1, book.Size())
-
- outboundPeer := createRandomPeer(true)
-
- r.AddPeer(outboundPeer)
- assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book")
-
- r.RemovePeer(outboundPeer, "peer not available")
- assert.Equal(size+1, book.Size())
-}
-
-func TestPEXReactorRunning(t *testing.T) {
- require := require.New(t)
-
- N := 3
- switches := make([]*Switch, N)
-
- dir, err := ioutil.TempDir("", "pex_reactor")
- require.Nil(err)
- defer os.RemoveAll(dir)
- book := NewAddrBook(dir+"addrbook.json", false)
- book.SetLogger(log.TestingLogger())
-
- // create switches
- for i := 0; i < N; i++ {
- switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch {
- sw.SetLogger(log.TestingLogger().With("switch", i))
-
- r := NewPEXReactor(book, sw)
- r.SetLogger(log.TestingLogger())
- r.SetEnsurePeersPeriod(250 * time.Millisecond)
- sw.AddReactor("pex", r)
- return sw
- })
- }
-
- // fill the address book and add listeners
- for _, s := range switches {
- addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr)
- book.AddAddress(addr, addr)
- l, _ := NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())
- s.AddListener(l)
- }
-
- // start switches
- for _, s := range switches {
- _, err := s.Start() // start switch and reactors
- require.Nil(err)
- }
-
- time.Sleep(1 * time.Second)
-
- // check peers are connected after some time
- for _, s := range switches {
- outbound, inbound, _ := s.NumPeers()
- if outbound+inbound == 0 {
- t.Errorf("%v expected to be connected to at least one peer", s.NodeInfo().ListenAddr)
- }
- }
-
- // stop them
- for _, s := range switches {
- s.Stop()
- }
-}
-
-func TestPEXReactorReceive(t *testing.T) {
- assert, require := assert.New(t), require.New(t)
-
- dir, err := ioutil.TempDir("", "pex_reactor")
- require.Nil(err)
- defer os.RemoveAll(dir)
- book := NewAddrBook(dir+"addrbook.json", true)
- book.SetLogger(log.TestingLogger())
-
- r := NewPEXReactor(book, sw)
- r.SetLogger(log.TestingLogger())
-
- peer := createRandomPeer(false)
-
- size := book.Size()
- netAddr, _ := NewNetAddressString(peer.ListenAddr)
- addrs := []*NetAddress{netAddr}
- msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
- r.Receive(PexChannel, peer, msg)
- assert.Equal(size+1, book.Size())
-
- msg = wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
- r.Receive(PexChannel, peer, msg)
-}
-
-func TestPEXReactorAbuseFromPeer(t *testing.T) {
- assert, require := assert.New(t), require.New(t)
-
- dir, err := ioutil.TempDir("", "pex_reactor")
- require.Nil(err)
- defer os.RemoveAll(dir)
- book := NewAddrBook(dir+"addrbook.json", true)
- book.SetLogger(log.TestingLogger())
-
- r := NewPEXReactor(book, sw)
- r.SetLogger(log.TestingLogger())
- r.SetMaxMsgCountByPeer(5)
-
- peer := createRandomPeer(false)
-
- msg := wire.BinaryBytes(struct{ PexMessage }{&pexRequestMessage{}})
- for i := 0; i < 10; i++ {
- r.Receive(PexChannel, peer, msg)
- }
-
- assert.True(r.ReachedMaxMsgCountForPeer(peer.ListenAddr))
-}
-
-func createRandomPeer(outbound bool) *Peer {
- addr := cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256)
- netAddr, _ := NewNetAddressString(addr)
- p := &Peer{
- Key: cmn.RandStr(12),
- NodeInfo: &NodeInfo{
- ListenAddr: addr,
- },
- outbound: outbound,
- mconn: &MConnection{RemoteAddress: netAddr},
- }
- p.SetLogger(log.TestingLogger().With("peer", addr))
- return p
-}
import (
"encoding/json"
"fmt"
- "math/rand"
"net"
"sync"
"time"
ErrConnectBannedPeer = errors.New("Connect banned peer")
)
+// An AddrBook represents an address book from the pex package, which is used to store peer addresses.
+type AddrBook interface {
+ AddAddress(*NetAddress, *NetAddress) error
+ AddOurAddress(*NetAddress)
+ MarkGood(*NetAddress)
+ RemoveAddress(*NetAddress)
+ SaveToFile() error
+}
+
//-----------------------------------------------------------------------------
// Switch handles peer connections and exposes an API to receive incoming messages
type Switch struct {
cmn.BaseService
- config *cfg.P2PConfig
+ Config *cfg.P2PConfig
peerConfig *PeerConfig
listeners []Listener
reactors map[string]Reactor
dialing *cmn.CMap
nodeInfo *NodeInfo // our node info
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
- addrBook *AddrBook
+ addrBook AddrBook
bannedPeer map[string]time.Time
db dbm.DB
mtx sync.Mutex
}
// NewSwitch creates a new Switch with the given config.
-func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
+func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
sw := &Switch{
- config: config,
+ Config: config,
peerConfig: DefaultPeerConfig(config),
reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
nodeInfo: nil,
+ addrBook: addrBook,
db: trustHistoryDB,
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
-
- // Optionally, start the pex reactor
- if config.PexReactor {
- sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
- pexReactor := NewPEXReactor(sw.addrBook, sw)
- sw.AddReactor("PEX", pexReactor)
- }
-
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
return nil
}
-// DialSeeds a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(seeds []string) error {
- netAddrs, err := NewNetAddressStrings(seeds)
- if err != nil {
- return err
- }
-
- if sw.addrBook != nil {
- // add seeds to `addrBook`
- ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
- for _, netAddr := range netAddrs {
- // do not add ourselves
- if netAddr.Equals(ourAddr) {
- continue
- }
- sw.addrBook.AddAddress(netAddr, ourAddr)
- }
-
- sw.addrBook.Save()
- }
-
- //permute the list, dial them in random order.
- perm := rand.Perm(len(netAddrs))
- for i := 0; i < len(perm); i += 2 {
- j := perm[i]
- sw.dialSeed(netAddrs[j])
- }
-
- return nil
-}
-
func (sw *Switch) dialSeed(addr *NetAddress) {
err := sw.DialPeerWithAddress(addr)
if err != nil {
// disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
// the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
// be double of MaxNumPeers
- if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
+ if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
inConn.Close()
log.Info("Ignoring inbound connection: already have enough peers.")
continue
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
- peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
+ peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
if err != nil {
conn.Close()
return err
//PanicOnAddPeerErr add peer error
var PanicOnAddPeerErr = false
+func CreateRandomPeer(outbound bool) *Peer {
+ _, netAddr := CreateRoutableAddr()
+ p := &Peer{
+ peerConn: &peerConn{
+ outbound: outbound,
+ },
+ NodeInfo: &NodeInfo{
+ ListenAddr: netAddr.DialString(),
+ },
+ mconn: &MConnection{},
+ }
+ return p
+}
+
+func CreateRoutableAddr() (addr string, netAddr *NetAddress) {
+ for {
+ var err error
+ addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256, cmn.RandInt()%256)
+ netAddr, err = NewNetAddressString(addr)
+ if err != nil {
+ panic(err)
+ }
+ if netAddr.Routable() {
+ break
+ }
+ }
+ return
+}
+
// MakeConnectedSwitches switches connected via arbitrary net.Conn; useful for testing
// Returns n switches, connected according to the connect func.
// If connect==Connect2Switches, the switches will be fully connected.
func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
switches := make([]*Switch, n)
for i := 0; i < n; i++ {
- switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
+ switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
}
if err := startSwitches(switches); err != nil {
return nil
}
-func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
+func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
privKey := crypto.GenPrivKeyEd25519()
// new switch, add reactors
// TODO: let the config be passed in?
- s := initSwitch(i, NewSwitch(cfg, nil))
+ s := initSwitch(i, NewSwitch(cfg, nil, nil))
s.SetNodeInfo(&NodeInfo{
PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
Moniker: cmn.Fmt("switch%d", i),
+++ /dev/null
-package p2p
-
-import (
- "crypto/sha256"
-)
-
-// doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes.
-func doubleSha256(b []byte) []byte {
- hasher := sha256.New()
- hasher.Write(b)
- sum := hasher.Sum(nil)
- hasher.Reset()
- hasher.Write(sum)
- return hasher.Sum(nil)
-}
func (c *Chain) GetBlockByHeight(height uint64) (*types.Block, error) {
node := c.index.NodeByHeight(height)
if node == nil {
- return nil, errors.New("can't find block in given hight")
+ return nil, errors.New("can't find block in given height")
}
return c.store.GetBlock(&node.Hash)
}
Inputs: make([]*query.AnnotatedInput, 0, len(orig.Inputs)),
Outputs: make([]*query.AnnotatedOutput, 0, len(orig.Outputs)),
StatusFail: statusFail,
+ Size: orig.SerializedSize,
}
for i := range orig.Inputs {
tx.Inputs = append(tx.Inputs, w.BuildAnnotatedInput(orig, uint32(i)))