orphanNum := uint64(0)
reqNum := uint64(0)
isOrphan := false
+ peer := bk.peers.Peer(peerID)
for num <= maxPeerHeight && num > 0 {
if isOrphan {
reqNum = orphanNum
block, err := bk.BlockRequest(peerID, reqNum)
if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
- peer := bk.peers.Peer(peerID)
if peer == nil {
log.Info("peer is not registered")
break
}
log.Info("Peer communication abnormality")
- bk.sw.StopPeerGracefully(peer.Peer)
+ if ban := peer.addBanScore(0, 50, "block request error"); ban {
+ bk.sw.AddBannedPeer(peer.getPeer())
+ }
+ bk.sw.StopPeerGracefully(peer.getPeer())
break
}
isOrphan, err = bk.chain.ProcessBlock(block)
if err != nil {
- bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
+ if ban := peer.addBanScore(50, 0, "block process error"); ban {
+ bk.sw.AddBannedPeer(peer.getPeer())
+ bk.sw.StopPeerGracefully(peer.getPeer())
+ }
log.WithField("hash:", block.Hash()).Errorf("blockKeeper fail process block %v ", err)
break
}
func (bk *blockKeeper) txsProcessWorker() {
for txsResponse := range bk.txsProcessCh {
tx := txsResponse.tx
+ peer:=bk.peers.Peer(txsResponse.peerID)
log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
- bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
+ if ban := peer.addBanScore(50, 0, "tx error"); ban {
+ bk.sw.AddBannedPeer(peer.getPeer())
+ bk.sw.StopPeerGracefully(peer.getPeer())
+ }
}
}
}
// insert spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
-func (f *Fetcher) insert(peer string, block *types.Block) {
+func (f *Fetcher) insert(peerID string, block *types.Block) {
// Run the import on a new thread
- log.Info("Importing propagated block", " from peer: ", peer, " height: ", block.Height)
+ log.Info("Importing propagated block", " from peer: ", peerID, " height: ", block.Height)
// Run the actual import and log any issues
if _, err := f.chain.ProcessBlock(block); err != nil {
- log.Info("Propagated block import failed", " from peer: ", peer, " height: ", block.Height, "err: ", err)
+ log.Info("Propagated block import failed", " from peer: ", peerID, " height: ", block.Height, "err: ", err)
+ peer := f.peers.Peer(peerID)
+ if ban := peer.addBanScore(50, 0, "block process error"); ban {
+ f.sw.AddBannedPeer(peer.getPeer())
+ f.sw.StopPeerGracefully(peer.getPeer())
+ }
return
}
// If import succeeded, broadcast the block
"github.com/bytom/errors"
"github.com/bytom/p2p"
+ "github.com/bytom/p2p/trust"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
)
errNotRegistered = errors.New("peer is not registered")
)
-const defaultVersion = 1
+const (
+ defaultVersion = 1
+ defaultBanThreshold = uint64(100)
+)
type peer struct {
- mtx sync.RWMutex
- version int // Protocol version negotiated
- id string
- height uint64
- hash *bc.Hash
- *p2p.Peer
+ mtx sync.RWMutex
+ version int // Protocol version negotiated
+ id string
+ height uint64
+ hash *bc.Hash
+ banScore trust.DynamicBanScore
+
+ swPeer *p2p.Peer
knownTxs *set.Set // Set of transaction hashes known to be known by this peer
knownBlocks *set.Set // Set of block hashes known to be known by this peer
version: defaultVersion,
height: height,
hash: hash,
- Peer: Peer,
+ swPeer: Peer,
knownTxs: set.New(),
knownBlocks: set.New(),
}
func (p *peer) requestBlockByHash(hash *bc.Hash) error {
msg := &BlockRequestMessage{RawHash: hash.Byte32()}
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
return nil
}
func (p *peer) requestBlockByHeight(height uint64) error {
msg := &BlockRequestMessage{Height: height}
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
return nil
}
}
hash := &tx.ID
p.knownTxs.Add(hash.String())
- p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ p.swPeer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
}
p.mtx.RLock()
defer p.mtx.RUnlock()
- return p.Peer
+ return p.swPeer
}
// MarkTransaction marks a transaction as known for the peer, ensuring that it
p.knownBlocks.Add(hash.String())
}
+// addBanScore increases the persistent and decaying ban score fields by the
+// values passed as parameters. If the resulting score exceeds half of the ban
+// threshold, a warning is logged including the reason provided. Further, if
+// the score is above the ban threshold, the peer will be banned and
+// disconnected.
+func (p *peer) addBanScore(persistent, transient uint64, reason string) bool {
+ warnThreshold := defaultBanThreshold >> 1
+ if transient == 0 && persistent == 0 {
+ // The score is not being increased, but a warning message is still
+ // logged if the score is above the warn threshold.
+ score := p.banScore.Int()
+ if score > warnThreshold {
+ log.Info("Misbehaving peer %s: %s -- ban score is %d, "+"it was not increased this time", p, reason, score)
+ }
+ return false
+ }
+ score := p.banScore.Increase(persistent, transient)
+ if score > warnThreshold {
+ log.Infof("Misbehaving peer %s: %s -- ban score increased to %d", p, reason, score)
+ if score > defaultBanThreshold {
+ log.Errorf("Misbehaving peer %s -- banning and disconnecting", p)
+ return true
+ }
+ }
+ return false
+}
+
type peerSet struct {
peers map[string]*peer
lock sync.RWMutex
for _, p := range ps.peers {
if bestPeer == nil || p.height > bestHeight {
- bestPeer, bestHeight = p.Peer, p.height
+ bestPeer, bestHeight = p.swPeer, p.height
}
}
defer ps.lock.Unlock()
for _, p := range ps.peers {
- p.CloseConn()
+ p.swPeer.CloseConn()
}
ps.closed = true
}
hash := block.Hash()
peers := ps.PeersWithoutBlock(&hash)
for _, peer := range peers {
- ps.MarkBlock(peer.Key, &hash)
- peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ ps.MarkBlock(peer.swPeer.Key, &hash)
+ peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
}
}
peers := ps.PeersWithoutTx(&tx.ID)
for _, peer := range peers {
- ps.peers[peer.Key].MarkTransaction(&tx.ID)
- peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+ ps.peers[peer.swPeer.Key].MarkTransaction(&tx.ID)
+ peer.swPeer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
}
return nil
}
"github.com/bytom/errors"
"github.com/bytom/p2p"
- "github.com/bytom/p2p/trust"
"github.com/bytom/protocol"
"github.com/bytom/protocol/bc"
"github.com/bytom/protocol/bc/types"
// Receive implements Reactor by handling 4 types of messages (look below).
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
- var tm *trust.TrustMetric
- key := src.Connection().RemoteAddress.IP.String()
- if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
- log.Errorf("Can't get peer trust metric")
- return
- }
-
_, msg, err := DecodeMessage(msgBytes)
if err != nil {
log.Errorf("Error decoding messagek %v", err)
// Remove the transactions that will be sent.
s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
if len(s.txs) == 0 {
- delete(pending, s.p.Key)
+ delete(pending, s.p.swPeer.Key)
}
// Send the pack in the background.
log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
for {
select {
case s := <-sm.txSyncCh:
- pending[s.p.Key] = s
+ pending[s.p.swPeer.Key] = s
if !sending {
send(s)
}
// Stop tracking peers that cause send failures.
if err != nil {
log.Info("Transaction send failed", "err", err)
- delete(pending, pack.p.Key)
+ delete(pending, pack.p.swPeer.Key)
}
// Schedule the next send.
if s := pick(); s != nil {
bannedPeerKey = "BannedPeer"
defaultBanDuration = time.Hour * 24
- peerBannedTM = 20
)
var ErrConnectBannedPeer = errors.New("Connect banned peer")
nodePrivKey crypto.PrivKeyEd25519 // our node privkey
bannedPeer map[string]time.Time
db dbm.DB
- TrustMetricStore *trust.TrustMetricStore
ScamPeerCh chan *Peer
mtx sync.Mutex
ScamPeerCh: make(chan *Peer),
}
sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
- sw.TrustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig())
- sw.TrustMetricStore.Start()
sw.bannedPeer = make(map[string]time.Time)
if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
return nil
}
}
- go sw.scamPeerHandler()
+ trust.Init()
return sw
}
return err
}
- tm := trust.NewMetric()
-
- tm.Start()
- sw.TrustMetricStore.AddPeerTrustMetric(peer.mconn.RemoteAddress.IP.String(), tm)
-
log.WithField("peer", peer).Info("Added peer")
return nil
}
return nil
}
-func (sw *Switch) scamPeerHandler() {
- for src := range sw.ScamPeerCh {
- var tm *trust.TrustMetric
- key := src.Connection().RemoteAddress.IP.String()
- if tm = sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
- log.Errorf("Can't get peer trust metric")
- continue
- }
- sw.delTrustMetric(tm, src)
- }
-}
-
-func (sw *Switch) AddScamPeer(src *Peer) {
- sw.ScamPeerCh <- src
-}
-
-func (sw *Switch) delTrustMetric(tm *trust.TrustMetric, src *Peer) {
- key := src.Connection().RemoteAddress.IP.String()
- tm.BadEvents(1)
- if tm.TrustScore() < peerBannedTM {
- sw.AddBannedPeer(src)
- sw.TrustMetricStore.PeerDisconnected(key)
- sw.StopPeerGracefully(src)
- }
-}
-
func (sw *Switch) checkBannedPeer(peer string) error {
if banEnd, ok := sw.bannedPeer[peer]; ok {
if time.Now().Before(banEnd) {
--- /dev/null
+// Copyright (c) 2016 The btcsuite developers
+// Use of this source code is governed by an ISC
+// license that can be found in the LICENSE file.
+
+package trust
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+const (
+ // Halflife defines the time (in seconds) by which the transient part
+ // of the ban score decays to one half of it's original value.
+ Halflife = 60
+
+ // lambda is the decaying constant.
+ lambda = math.Ln2 / Halflife
+
+ // Lifetime defines the maximum age of the transient part of the ban
+ // score to be considered a non-zero score (in seconds).
+ Lifetime = 1800
+
+ // precomputedLen defines the amount of decay factors (one per second) that
+ // should be precomputed at initialization.
+ precomputedLen = 64
+)
+
+// precomputedFactor stores precomputed exponential decay factors for the first
+// 'precomputedLen' seconds starting from t == 0.
+var precomputedFactor [precomputedLen]float64
+
+// init precomputes decay factors.
+func Init() {
+ for i := range precomputedFactor {
+ precomputedFactor[i] = math.Exp(-1.0 * float64(i) * lambda)
+ }
+}
+
+// decayFactor returns the decay factor at t seconds, using precalculated values
+// if available, or calculating the factor if needed.
+func decayFactor(t int64) float64 {
+ if t < precomputedLen {
+ return precomputedFactor[t]
+ }
+ return math.Exp(-1.0 * float64(t) * lambda)
+}
+
+// DynamicBanScore provides dynamic ban scores consisting of a persistent and a
+// decaying component. The persistent score could be utilized to create simple
+// additive banning policies similar to those found in other bitcoin node
+// implementations.
+//
+// The decaying score enables the creation of evasive logic which handles
+// misbehaving peers (especially application layer DoS attacks) gracefully
+// by disconnecting and banning peers attempting various kinds of flooding.
+// DynamicBanScore allows these two approaches to be used in tandem.
+//
+// Zero value: Values of type DynamicBanScore are immediately ready for use upon
+// declaration.
+type DynamicBanScore struct {
+ lastUnix int64
+ transient float64
+ persistent uint64
+ mtx sync.Mutex
+}
+
+// String returns the ban score as a human-readable string.
+func (s *DynamicBanScore) String() string {
+ s.mtx.Lock()
+ r := fmt.Sprintf("persistent %v + transient %v at %v = %v as of now",
+ s.persistent, s.transient, s.lastUnix, s.Int())
+ s.mtx.Unlock()
+ return r
+}
+
+// Int returns the current ban score, the sum of the persistent and decaying
+// scores.
+//
+// This function is safe for concurrent access.
+func (s *DynamicBanScore) Int() uint64 {
+ s.mtx.Lock()
+ r := s.int(time.Now())
+ s.mtx.Unlock()
+ return r
+}
+
+// Increase increases both the persistent and decaying scores by the values
+// passed as parameters. The resulting score is returned.
+//
+// This function is safe for concurrent access.
+func (s *DynamicBanScore) Increase(persistent, transient uint64) uint64 {
+ s.mtx.Lock()
+ r := s.increase(persistent, transient, time.Now())
+ s.mtx.Unlock()
+ return r
+}
+
+// Reset set both persistent and decaying scores to zero.
+//
+// This function is safe for concurrent access.
+func (s *DynamicBanScore) Reset() {
+ s.mtx.Lock()
+ s.persistent = 0
+ s.transient = 0
+ s.lastUnix = 0
+ s.mtx.Unlock()
+}
+
+// int returns the ban score, the sum of the persistent and decaying scores at a
+// given point in time.
+//
+// This function is not safe for concurrent access. It is intended to be used
+// internally and during testing.
+func (s *DynamicBanScore) int(t time.Time) uint64 {
+ dt := t.Unix() - s.lastUnix
+ if s.transient < 1 || dt < 0 || Lifetime < dt {
+ return s.persistent
+ }
+ return s.persistent + uint64(s.transient*decayFactor(dt))
+}
+
+// increase increases the persistent, the decaying or both scores by the values
+// passed as parameters. The resulting score is calculated as if the action was
+// carried out at the point time represented by the third parameter. The
+// resulting score is returned.
+//
+// This function is not safe for concurrent access.
+func (s *DynamicBanScore) increase(persistent, transient uint64, t time.Time) uint64 {
+ s.persistent += persistent
+ tu := t.Unix()
+ dt := tu - s.lastUnix
+
+ if transient > 0 {
+ if Lifetime < dt {
+ s.transient = 0
+ } else if s.transient > 1 && dt > 0 {
+ s.transient *= decayFactor(dt)
+ }
+ s.transient += float64(transient)
+ s.lastUnix = tu
+ }
+ return s.persistent + uint64(s.transient)
+}
+++ /dev/null
-package trust
-
-import "time"
-
-// TrustMetricConfig - Configures the weight functions and time intervals for the metric
-type TrustMetricConfig struct {
- // Determines the percentage given to current behavior
- ProportionalWeight float64
-
- // Determines the percentage given to prior behavior
- IntegralWeight float64
-
- // The window of time that the trust metric will track events across.
- // This can be set to cover many days without issue
- TrackingWindow time.Duration
-
- // Each interval should be short for adapability.
- // Less than 30 seconds is too sensitive,
- // and greater than 5 minutes will make the metric numb
- IntervalLength time.Duration
-}
-
-// DefaultConfig returns a config with values that have been tested and produce desirable results
-func DefaultConfig() TrustMetricConfig {
- return TrustMetricConfig{
- ProportionalWeight: 0.4,
- IntegralWeight: 0.6,
- TrackingWindow: (time.Minute * 60 * 24) * 14, // 14 days.
- IntervalLength: 1 * time.Minute,
- }
-}
-
-// Ensures that all configuration elements have valid values
-func customConfig(tmc TrustMetricConfig) TrustMetricConfig {
- config := DefaultConfig()
-
- // Check the config for set values, and setup appropriately
- if tmc.ProportionalWeight > 0 {
- config.ProportionalWeight = tmc.ProportionalWeight
- }
-
- if tmc.IntegralWeight > 0 {
- config.IntegralWeight = tmc.IntegralWeight
- }
-
- if tmc.IntervalLength > time.Duration(0) {
- config.IntervalLength = tmc.IntervalLength
- }
-
- if tmc.TrackingWindow > time.Duration(0) &&
- tmc.TrackingWindow >= config.IntervalLength {
- config.TrackingWindow = tmc.TrackingWindow
- }
- return config
-}
+++ /dev/null
-// Copyright 2017 Tendermint. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package trust
-
-import (
- "math"
- "sync"
- "time"
-
- cmn "github.com/tendermint/tmlibs/common"
-)
-
-//---------------------------------------------------------------------------------------
-
-const (
- // The weight applied to the derivative when current behavior is >= previous behavior
- defaultDerivativeGamma1 = 0
-
- // The weight applied to the derivative when current behavior is less than previous behavior
- defaultDerivativeGamma2 = 1.0
-
- // The weight applied to history data values when calculating the history value
- defaultHistoryDataWeight = 0.8
-)
-
-// MetricHistoryJSON - history data necessary to save the trust metric
-type MetricHistoryJSON struct {
- NumIntervals int `json:"intervals"`
- History []float64 `json:"history"`
-}
-
-// TrustMetric - keeps track of peer reliability
-// See tendermint/docs/architecture/adr-006-trust-metric.md for details
-type TrustMetric struct {
- cmn.BaseService
-
- // Mutex that protects the metric from concurrent access
- mtx sync.Mutex
-
- // Determines the percentage given to current behavior
- proportionalWeight float64
-
- // Determines the percentage given to prior behavior
- integralWeight float64
-
- // Count of how many time intervals this metric has been tracking
- numIntervals int
-
- // Size of the time interval window for this trust metric
- maxIntervals int
-
- // The time duration for a single time interval
- intervalLen time.Duration
-
- // Stores the trust history data for this metric
- history []float64
-
- // Weights applied to the history data when calculating the history value
- historyWeights []float64
-
- // The sum of the history weights used when calculating the history value
- historyWeightSum float64
-
- // The current number of history data elements
- historySize int
-
- // The maximum number of history data elements
- historyMaxSize int
-
- // The calculated history value for the current time interval
- historyValue float64
-
- // The number of recorded good and bad events for the current time interval
- bad, good float64
-
- // While true, history data is not modified
- paused bool
-
- // Used during testing in order to control the passing of time intervals
- testTicker MetricTicker
-}
-
-// NewMetric returns a trust metric with the default configuration.
-// Use Start to begin tracking the quality of peer behavior over time
-func NewMetric() *TrustMetric {
- return NewMetricWithConfig(DefaultConfig())
-}
-
-// NewMetricWithConfig returns a trust metric with a custom configuration.
-// Use Start to begin tracking the quality of peer behavior over time
-func NewMetricWithConfig(tmc TrustMetricConfig) *TrustMetric {
- tm := new(TrustMetric)
- config := customConfig(tmc)
-
- // Setup using the configuration values
- tm.proportionalWeight = config.ProportionalWeight
- tm.integralWeight = config.IntegralWeight
- tm.intervalLen = config.IntervalLength
- // The maximum number of time intervals is the tracking window / interval length
- tm.maxIntervals = int(config.TrackingWindow / tm.intervalLen)
- // The history size will be determined by the maximum number of time intervals
- tm.historyMaxSize = intervalToHistoryOffset(tm.maxIntervals) + 1
- // This metric has a perfect history so far
- tm.historyValue = 1.0
-
- tm.BaseService = *cmn.NewBaseService(nil, "TrustMetric", tm)
- return tm
-}
-
-// OnStart implements Service
-func (tm *TrustMetric) OnStart() error {
- if err := tm.BaseService.OnStart(); err != nil {
- return err
- }
- go tm.processRequests()
- return nil
-}
-
-// OnStop implements Service
-// Nothing to do since the goroutine shuts down by itself via BaseService.Quit
-func (tm *TrustMetric) OnStop() {}
-
-// Returns a snapshot of the trust metric history data
-func (tm *TrustMetric) HistoryJSON() MetricHistoryJSON {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- return MetricHistoryJSON{
- NumIntervals: tm.numIntervals,
- History: tm.history,
- }
-}
-
-// Instantiates a trust metric by loading the history data for a single peer.
-// This is called only once and only right after creation, which is why the
-// lock is not held while accessing the trust metric struct members
-func (tm *TrustMetric) Init(hist MetricHistoryJSON) {
- // Restore the number of time intervals we have previously tracked
- if hist.NumIntervals > tm.maxIntervals {
- hist.NumIntervals = tm.maxIntervals
- }
- tm.numIntervals = hist.NumIntervals
- // Restore the history and its current size
- if len(hist.History) > tm.historyMaxSize {
- // Keep the history no larger than historyMaxSize
- last := len(hist.History) - tm.historyMaxSize
- hist.History = hist.History[last:]
- }
- tm.history = hist.History
- tm.historySize = len(tm.history)
- // Create the history weight values and weight sum
- for i := 1; i <= tm.numIntervals; i++ {
- x := math.Pow(defaultHistoryDataWeight, float64(i)) // Optimistic weight
- tm.historyWeights = append(tm.historyWeights, x)
- }
-
- for _, v := range tm.historyWeights {
- tm.historyWeightSum += v
- }
- // Calculate the history value based on the loaded history data
- tm.historyValue = tm.calcHistoryValue()
-}
-
-// Pause tells the metric to pause recording data over time intervals.
-// All method calls that indicate events will unpause the metric
-func (tm *TrustMetric) Pause() {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- // Pause the metric for now
- tm.paused = true
-}
-
-// BadEvents indicates that an undesirable event(s) took place
-func (tm *TrustMetric) BadEvents(num int) {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- tm.unpause()
- tm.bad += float64(num)
-}
-
-// GoodEvents indicates that a desirable event(s) took place
-func (tm *TrustMetric) GoodEvents(num int) {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- tm.unpause()
- tm.good += float64(num)
-}
-
-// TrustValue gets the dependable trust value; always between 0 and 1
-func (tm *TrustMetric) TrustValue() float64 {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- return tm.calcTrustValue()
-}
-
-// TrustScore gets a score based on the trust value always between 0 and 100
-func (tm *TrustMetric) TrustScore() int {
- score := tm.TrustValue() * 100
-
- return int(math.Floor(score))
-}
-
-// NextTimeInterval saves current time interval data and prepares for the following interval
-func (tm *TrustMetric) NextTimeInterval() {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- if tm.paused {
- // Do not prepare for the next time interval while paused
- return
- }
-
- // Add the current trust value to the history data
- newHist := tm.calcTrustValue()
- tm.history = append(tm.history, newHist)
-
- // Update history and interval counters
- if tm.historySize < tm.historyMaxSize {
- tm.historySize++
- } else {
- // Keep the history no larger than historyMaxSize
- last := len(tm.history) - tm.historyMaxSize
- tm.history = tm.history[last:]
- }
-
- if tm.numIntervals < tm.maxIntervals {
- tm.numIntervals++
- // Add the optimistic weight for the new time interval
- wk := math.Pow(defaultHistoryDataWeight, float64(tm.numIntervals))
- tm.historyWeights = append(tm.historyWeights, wk)
- tm.historyWeightSum += wk
- }
-
- // Update the history data using Faded Memories
- tm.updateFadedMemory()
- // Calculate the history value for the upcoming time interval
- tm.historyValue = tm.calcHistoryValue()
- tm.good = 0
- tm.bad = 0
-}
-
-// SetTicker allows a TestTicker to be provided that will manually control
-// the passing of time from the perspective of the TrustMetric.
-// The ticker must be set before Start is called on the metric
-func (tm *TrustMetric) SetTicker(ticker MetricTicker) {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
-
- tm.testTicker = ticker
-}
-
-// Copy returns a new trust metric with members containing the same values
-func (tm *TrustMetric) Copy() *TrustMetric {
- tm.mtx.Lock()
- defer tm.mtx.Unlock()
- if tm == nil {
- return nil
- }
-
- return &TrustMetric{
- proportionalWeight: tm.proportionalWeight,
- integralWeight: tm.integralWeight,
- numIntervals: tm.numIntervals,
- maxIntervals: tm.maxIntervals,
- intervalLen: tm.intervalLen,
- history: tm.history,
- historyWeights: tm.historyWeights,
- historyWeightSum: tm.historyWeightSum,
- historySize: tm.historySize,
- historyMaxSize: tm.historyMaxSize,
- historyValue: tm.historyValue,
- good: tm.good,
- bad: tm.bad,
- paused: tm.paused,
- }
-
-}
-
-/* Private methods */
-
-// This method is for a goroutine that handles all requests on the metric
-func (tm *TrustMetric) processRequests() {
- t := tm.testTicker
- if t == nil {
- // No test ticker was provided, so we create a normal ticker
- t = NewTicker(tm.intervalLen)
- }
- defer t.Stop()
- // Obtain the raw channel
- tick := t.GetChannel()
-loop:
- for {
- select {
- case <-tick:
- tm.NextTimeInterval()
- case <-tm.Quit:
- // Stop all further tracking for this metric
- break loop
- }
- }
-}
-
-// Wakes the trust metric up if it is currently paused
-// This method needs to be called with the mutex locked
-func (tm *TrustMetric) unpause() {
- // Check if this is the first experience with
- // what we are tracking since being paused
- if tm.paused {
- tm.good = 0
- tm.bad = 0
- // New events cause us to unpause the metric
- tm.paused = false
- }
-}
-
-// Calculates the trust value for the request processing
-func (tm *TrustMetric) calcTrustValue() float64 {
- weightedP := tm.proportionalWeight * tm.proportionalValue()
- weightedI := tm.integralWeight * tm.historyValue
- weightedD := tm.weightedDerivative()
-
- tv := weightedP + weightedI + weightedD
-
- // Do not return a negative value.
- if tv < 0 {
- tv = 0
- }
- return tv
-}
-
-// Calculates the current score for good/bad experiences
-func (tm *TrustMetric) proportionalValue() float64 {
- value := 1.0
-
- total := tm.good + tm.bad
- if total > 0 {
- value = tm.good / total
- }
- return value
-}
-
-// Strengthens the derivative component when the change is negative
-func (tm *TrustMetric) weightedDerivative() float64 {
- var weight float64 = defaultDerivativeGamma1
-
- d := tm.derivativeValue()
- if d < 0 {
- weight = defaultDerivativeGamma2
- }
- return weight * d
-}
-
-// Calculates the derivative component
-func (tm *TrustMetric) derivativeValue() float64 {
- return tm.proportionalValue() - tm.historyValue
-}
-
-// Calculates the integral (history) component of the trust value
-func (tm *TrustMetric) calcHistoryValue() float64 {
- var hv float64
-
- for i := 0; i < tm.numIntervals; i++ {
- hv += tm.fadedMemoryValue(i) * tm.historyWeights[i]
- }
-
- return hv / tm.historyWeightSum
-}
-
-// Retrieves the actual history data value that represents the requested time interval
-func (tm *TrustMetric) fadedMemoryValue(interval int) float64 {
- first := tm.historySize - 1
-
- if interval == 0 {
- // Base case
- return tm.history[first]
- }
-
- offset := intervalToHistoryOffset(interval)
- return tm.history[first-offset]
-}
-
-// Performs the update for our Faded Memories process, which allows the
-// trust metric tracking window to be large while maintaining a small
-// number of history data values
-func (tm *TrustMetric) updateFadedMemory() {
- if tm.historySize < 2 {
- return
- }
-
- end := tm.historySize - 1
- // Keep the most recent history element
- for count := 1; count < tm.historySize; count++ {
- i := end - count
- // The older the data is, the more we spread it out
- x := math.Pow(2, float64(count))
- // Two history data values are merged into a single value
- tm.history[i] = ((tm.history[i] * (x - 1)) + tm.history[i+1]) / x
- }
-}
-
-// Map the interval value down to an offset from the beginning of history
-func intervalToHistoryOffset(interval int) int {
- // The system maintains 2^m interval values in the form of m history
- // data values. Therefore, we access the ith interval by obtaining
- // the history data index = the floor of log2(i)
- return int(math.Floor(math.Log2(float64(interval))))
-}
+++ /dev/null
-package trust
-
-import (
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-)
-
-func TestTrustMetricScores(t *testing.T) {
- tm := NewMetric()
- tm.Start()
-
- // Perfect score
- tm.GoodEvents(1)
- score := tm.TrustScore()
- assert.Equal(t, 100, score)
-
- // Less than perfect score
- tm.BadEvents(10)
- score = tm.TrustScore()
- assert.NotEqual(t, 100, score)
- tm.Stop()
-}
-
-func TestTrustMetricConfig(t *testing.T) {
- // 7 days
- window := time.Minute * 60 * 24 * 7
- config := TrustMetricConfig{
- TrackingWindow: window,
- IntervalLength: 2 * time.Minute,
- }
-
- tm := NewMetricWithConfig(config)
- tm.Start()
-
- // The max time intervals should be the TrackingWindow / IntervalLen
- assert.Equal(t, int(config.TrackingWindow/config.IntervalLength), tm.maxIntervals)
-
- dc := DefaultConfig()
- // These weights should still be the default values
- assert.Equal(t, dc.ProportionalWeight, tm.proportionalWeight)
- assert.Equal(t, dc.IntegralWeight, tm.integralWeight)
- tm.Stop()
- tm.Wait()
-
- config.ProportionalWeight = 0.3
- config.IntegralWeight = 0.7
- tm = NewMetricWithConfig(config)
- tm.Start()
-
- // These weights should be equal to our custom values
- assert.Equal(t, config.ProportionalWeight, tm.proportionalWeight)
- assert.Equal(t, config.IntegralWeight, tm.integralWeight)
- tm.Stop()
- tm.Wait()
-}
-
-// XXX: This test fails non-deterministically
-func _TestTrustMetricStopPause(t *testing.T) {
- // The TestTicker will provide manual control over
- // the passing of time within the metric
- tt := NewTestTicker()
- tm := NewMetric()
- tm.SetTicker(tt)
- tm.Start()
- // Allow some time intervals to pass and pause
- tt.NextTick()
- tt.NextTick()
- tm.Pause()
-
- // could be 1 or 2 because Pause and NextTick race
- first := tm.Copy().numIntervals
-
- // Allow more time to pass and check the intervals are unchanged
- tt.NextTick()
- tt.NextTick()
- assert.Equal(t, first, tm.Copy().numIntervals)
-
- // Get the trust metric activated again
- tm.GoodEvents(5)
- // Allow some time intervals to pass and stop
- tt.NextTick()
- tt.NextTick()
- tm.Stop()
- tm.Wait()
-
- second := tm.Copy().numIntervals
- // Allow more intervals to pass while the metric is stopped
- // and check that the number of intervals match
- tm.NextTimeInterval()
- tm.NextTimeInterval()
- // XXX: fails non-deterministically:
- // expected 5, got 6
- assert.Equal(t, second+2, tm.Copy().numIntervals)
-
- if first > second {
- t.Fatalf("numIntervals should always increase or stay the same over time")
- }
-}
+++ /dev/null
-// Copyright 2017 Tendermint. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package trust
-
-import (
- "encoding/json"
- "sync"
- "time"
-
- cmn "github.com/tendermint/tmlibs/common"
- dbm "github.com/tendermint/tmlibs/db"
-)
-
-const defaultStorePeriodicSaveInterval = 1 * time.Minute
-
-var trustMetricKey = []byte("trustMetricStore")
-
-// TrustMetricStore - Manages all trust metrics for peers
-type TrustMetricStore struct {
- cmn.BaseService
-
- // Maps a Peer.Key to that peer's TrustMetric
- peerMetrics map[string]*TrustMetric
-
- // Mutex that protects the map and history data file
- mtx sync.Mutex
-
- // The db where peer trust metric history data will be stored
- db dbm.DB
-
- // This configuration will be used when creating new TrustMetrics
- config TrustMetricConfig
-}
-
-// NewTrustMetricStore returns a store that saves data to the DB
-// and uses the config when creating new trust metrics.
-// Use Start to to initialize the trust metric store
-func NewTrustMetricStore(db dbm.DB, tmc TrustMetricConfig) *TrustMetricStore {
- tms := &TrustMetricStore{
- peerMetrics: make(map[string]*TrustMetric),
- db: db,
- config: tmc,
- }
-
- tms.BaseService = *cmn.NewBaseService(nil, "TrustMetricStore", tms)
- return tms
-}
-
-// OnStart implements Service
-func (tms *TrustMetricStore) OnStart() error {
- if err := tms.BaseService.OnStart(); err != nil {
- return err
- }
-
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- tms.loadFromDB()
- go tms.saveRoutine()
- return nil
-}
-
-// OnStop implements Service
-func (tms *TrustMetricStore) OnStop() {
- tms.BaseService.OnStop()
-
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- // Stop all trust metric go-routines
- for _, tm := range tms.peerMetrics {
- tm.Stop()
- }
-
- // Make the final trust history data save
- tms.saveToDB()
-}
-
-// Size returns the number of entries in the trust metric store
-func (tms *TrustMetricStore) Size() int {
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- return tms.size()
-}
-
-// AddPeerTrustMetric takes an existing trust metric and associates it with a peer key.
-// The caller is expected to call Start on the TrustMetric being added
-func (tms *TrustMetricStore) AddPeerTrustMetric(key string, tm *TrustMetric) {
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- if key == "" || tm == nil {
- return
- }
- tms.peerMetrics[key] = tm
-}
-
-// GetPeerTrustMetric returns a trust metric by peer key
-func (tms *TrustMetricStore) GetPeerTrustMetric(key string) *TrustMetric {
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- tm, ok := tms.peerMetrics[key]
- if !ok {
- // If the metric is not available, we will create it
- tm = NewMetricWithConfig(tms.config)
- tm.Start()
- // The metric needs to be in the map
- tms.peerMetrics[key] = tm
- }
- return tm
-}
-
-// PeerDisconnected pauses the trust metric associated with the peer identified by the key
-func (tms *TrustMetricStore) PeerDisconnected(key string) {
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- // If the Peer that disconnected has a metric, pause it
- if tm, ok := tms.peerMetrics[key]; ok {
- tm.Pause()
- }
-}
-
-// Saves the history data for all peers to the store DB.
-// This public method acquires the trust metric store lock
-func (tms *TrustMetricStore) SaveToDB() {
- tms.mtx.Lock()
- defer tms.mtx.Unlock()
-
- tms.saveToDB()
-}
-
-/* Private methods */
-
-// size returns the number of entries in the store without acquiring the mutex
-func (tms *TrustMetricStore) size() int {
- return len(tms.peerMetrics)
-}
-
-/* Loading & Saving */
-/* Both loadFromDB and savetoDB assume the mutex has been acquired */
-
-// Loads the history data for all peers from the store DB
-// cmn.Panics if file is corrupt
-func (tms *TrustMetricStore) loadFromDB() bool {
- // Obtain the history data we have so far
- bytes := tms.db.Get(trustMetricKey)
- if bytes == nil {
- return false
- }
-
- peers := make(map[string]MetricHistoryJSON)
- err := json.Unmarshal(bytes, &peers)
- if err != nil {
- cmn.PanicCrisis(cmn.Fmt("Could not unmarshal Trust Metric Store DB data: %v", err))
- }
-
- // If history data exists in the file,
- // load it into trust metric
- for key, p := range peers {
- tm := NewMetricWithConfig(tms.config)
-
- tm.Start()
- tm.Init(p)
- // Load the peer trust metric into the store
- tms.peerMetrics[key] = tm
- }
- return true
-}
-
-// Saves the history data for all peers to the store DB
-func (tms *TrustMetricStore) saveToDB() {
- tms.Logger.Debug("Saving TrustHistory to DB", "size", tms.size())
-
- peers := make(map[string]MetricHistoryJSON)
-
- for key, tm := range tms.peerMetrics {
- // Add an entry for the peer identified by key
- peers[key] = tm.HistoryJSON()
- }
-
- // Write all the data back to the DB
- bytes, err := json.Marshal(peers)
- if err != nil {
- tms.Logger.Error("Failed to encode the TrustHistory", "err", err)
- return
- }
- tms.db.SetSync(trustMetricKey, bytes)
-}
-
-// Periodically saves the trust history data to the DB
-func (tms *TrustMetricStore) saveRoutine() {
- t := time.NewTicker(defaultStorePeriodicSaveInterval)
- defer t.Stop()
-loop:
- for {
- select {
- case <-t.C:
- tms.SaveToDB()
- case <-tms.Quit:
- break loop
- }
- }
-}
+++ /dev/null
-// Copyright 2017 Tendermint. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package trust
-
-import (
- "fmt"
- "io/ioutil"
- "os"
- "testing"
-
- "github.com/stretchr/testify/assert"
- dbm "github.com/tendermint/tmlibs/db"
- "github.com/tendermint/tmlibs/log"
-)
-
-func TestTrustMetricStoreSaveLoad(t *testing.T) {
- dir, err := ioutil.TempDir("", "trust_test")
- if err != nil {
- panic(err)
- }
- defer os.Remove(dir)
-
- historyDB := dbm.NewDB("trusthistory", "goleveldb", dir)
-
- // 0 peers saved
- store := NewTrustMetricStore(historyDB, DefaultConfig())
- store.SetLogger(log.TestingLogger())
- store.saveToDB()
- // Load the data from the file
- store = NewTrustMetricStore(historyDB, DefaultConfig())
- store.SetLogger(log.TestingLogger())
- store.Start()
- // Make sure we still have 0 entries
- assert.Zero(t, store.Size())
-
- // 100 TestTickers
- var tt []*TestTicker
- for i := 0; i < 100; i++ {
- // The TestTicker will provide manual control over
- // the passing of time within the metric
- tt = append(tt, NewTestTicker())
- }
- // 100 peers
- for i := 0; i < 100; i++ {
- key := fmt.Sprintf("peer_%d", i)
- tm := NewMetric()
-
- tm.SetTicker(tt[i])
- tm.Start()
- store.AddPeerTrustMetric(key, tm)
-
- tm.BadEvents(10)
- tm.GoodEvents(1)
- }
- // Check that we have 100 entries and save
- assert.Equal(t, 100, store.Size())
- // Give the 100 metrics time to process the history data
- for i := 0; i < 100; i++ {
- tt[i].NextTick()
- tt[i].NextTick()
- }
- // Stop all the trust metrics and save
- store.Stop()
-
- // Load the data from the DB
- store = NewTrustMetricStore(historyDB, DefaultConfig())
- store.SetLogger(log.TestingLogger())
- store.Start()
-
- // Check that we still have 100 peers with imperfect trust values
- assert.Equal(t, 100, store.Size())
- for _, tm := range store.peerMetrics {
- assert.NotEqual(t, 1.0, tm.TrustValue())
- }
-
- store.Stop()
-}
-
-func TestTrustMetricStoreConfig(t *testing.T) {
- historyDB := dbm.NewDB("", "memdb", "")
-
- config := TrustMetricConfig{
- ProportionalWeight: 0.5,
- IntegralWeight: 0.5,
- }
-
- // Create a store with custom config
- store := NewTrustMetricStore(historyDB, config)
- store.SetLogger(log.TestingLogger())
- store.Start()
-
- // Have the store make us a metric with the config
- tm := store.GetPeerTrustMetric("TestKey")
-
- // Check that the options made it to the metric
- assert.Equal(t, 0.5, tm.proportionalWeight)
- assert.Equal(t, 0.5, tm.integralWeight)
- store.Stop()
-}
-
-func TestTrustMetricStoreLookup(t *testing.T) {
- historyDB := dbm.NewDB("", "memdb", "")
-
- store := NewTrustMetricStore(historyDB, DefaultConfig())
- store.SetLogger(log.TestingLogger())
- store.Start()
-
- // Create 100 peers in the trust metric store
- for i := 0; i < 100; i++ {
- key := fmt.Sprintf("peer_%d", i)
- store.GetPeerTrustMetric(key)
-
- // Check that the trust metric was successfully entered
- ktm := store.peerMetrics[key]
- assert.NotNil(t, ktm, "Expected to find TrustMetric %s but wasn't there.", key)
- }
-
- store.Stop()
-}
-
-func TestTrustMetricStorePeerScore(t *testing.T) {
- historyDB := dbm.NewDB("", "memdb", "")
-
- store := NewTrustMetricStore(historyDB, DefaultConfig())
- store.SetLogger(log.TestingLogger())
- store.Start()
-
- key := "TestKey"
- tm := store.GetPeerTrustMetric(key)
-
- // This peer is innocent so far
- first := tm.TrustScore()
- assert.Equal(t, 100, first)
-
- // Add some undesirable events and disconnect
- tm.BadEvents(1)
- first = tm.TrustScore()
- assert.NotEqual(t, 100, first)
- tm.BadEvents(10)
- second := tm.TrustScore()
-
- if second > first {
- t.Errorf("A greater number of bad events should lower the trust score")
- }
- store.PeerDisconnected(key)
-
- // We will remember our experiences with this peer
- tm = store.GetPeerTrustMetric(key)
- assert.NotEqual(t, 100, tm.TrustScore())
- store.Stop()
-}
+++ /dev/null
-// Copyright 2017 Tendermint. All rights reserved.
-// Use of this source code is governed by Apache 2 LICENSE that can be found in the LICENSE file.
-
-package trust
-
-import (
- "time"
-)
-
-// MetricTicker provides a single ticker interface for the trust metric
-type MetricTicker interface {
- // GetChannel returns the receive only channel that fires at each time interval
- GetChannel() <-chan time.Time
-
- // Stop will halt further activity on the ticker channel
- Stop()
-}
-
-// The ticker used during testing that provides manual control over time intervals
-type TestTicker struct {
- C chan time.Time
- stopped bool
-}
-
-// NewTestTicker returns our ticker used within test routines
-func NewTestTicker() *TestTicker {
- c := make(chan time.Time)
- return &TestTicker{
- C: c,
- }
-}
-
-func (t *TestTicker) GetChannel() <-chan time.Time {
- return t.C
-}
-
-func (t *TestTicker) Stop() {
- t.stopped = true
-}
-
-// NextInterval manually sends Time on the ticker channel
-func (t *TestTicker) NextTick() {
- if t.stopped {
- return
- }
- t.C <- time.Now()
-}
-
-// Ticker is just a wrap around time.Ticker that allows it
-// to meet the requirements of our interface
-type Ticker struct {
- *time.Ticker
-}
-
-// NewTicker returns a normal time.Ticker wrapped to meet our interface
-func NewTicker(d time.Duration) *Ticker {
- return &Ticker{time.NewTicker(d)}
-}
-
-func (t *Ticker) GetChannel() <-chan time.Time {
- return t.C
-}