OSDN Git Service

P2P: fixed node startup id (#1573)
[bytom/bytom.git] / p2p / switch.go
index b53924f..ec4b73d 100644 (file)
 package p2p
 
 import (
+       "encoding/hex"
        "encoding/json"
        "fmt"
-       "math/rand"
        "net"
-       "strings"
        "sync"
        "time"
 
        log "github.com/sirupsen/logrus"
-       crypto "github.com/tendermint/go-crypto"
+       "github.com/tendermint/go-crypto"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
 
        cfg "github.com/bytom/config"
+       "github.com/bytom/consensus"
        "github.com/bytom/errors"
+       "github.com/bytom/p2p/connection"
+       "github.com/bytom/p2p/discover"
        "github.com/bytom/p2p/trust"
+       "github.com/bytom/version"
 )
 
 const (
-       reconnectAttempts = 5
-       reconnectInterval = 10 * time.Second
-
-       bannedPeerKey      = "BannedPeer"
-       defaultBanDuration = time.Hour * 1
+       bannedPeerKey       = "BannedPeer"
+       defaultBanDuration  = time.Hour * 1
+       minNumOutboundPeers = 3
 )
 
-var ErrConnectBannedPeer = errors.New("Connect banned peer")
-
-type Reactor interface {
-       cmn.Service // Start, Stop
-
-       SetSwitch(*Switch)
-       GetChannels() []*ChannelDescriptor
-       AddPeer(peer *Peer) error
-       RemovePeer(peer *Peer, reason interface{})
-       Receive(chID byte, peer *Peer, msgBytes []byte)
-}
-
-//--------------------------------------
-
-type BaseReactor struct {
-       cmn.BaseService // Provides Start, Stop, .Quit
-       Switch          *Switch
-}
+//pre-define errors for connecting fail
+var (
+       ErrDuplicatePeer     = errors.New("Duplicate peer")
+       ErrConnectSelf       = errors.New("Connect self")
+       ErrConnectBannedPeer = errors.New("Connect banned peer")
+       ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
+)
 
-func NewBaseReactor(name string, impl Reactor) *BaseReactor {
-       return &BaseReactor{
-               BaseService: *cmn.NewBaseService(nil, name, impl),
-               Switch:      nil,
-       }
+type discv interface {
+       ReadRandomNodes(buf []*discover.Node) (n int)
 }
 
-func (br *BaseReactor) SetSwitch(sw *Switch) {
-       br.Switch = sw
-}
-func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
-func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
-func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
-func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
-
-//-----------------------------------------------------------------------------
-
-/*
-The `Switch` handles peer connections and exposes an API to receive incoming messages
-on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
-or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
-incoming messages are received on the reactor.
-*/
+// Switch handles peer connections and exposes an API to receive incoming messages
+// on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
+// or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
+// incoming messages are received on the reactor.
 type Switch struct {
        cmn.BaseService
 
-       config       *cfg.P2PConfig
+       Config       *cfg.Config
        peerConfig   *PeerConfig
        listeners    []Listener
        reactors     map[string]Reactor
-       chDescs      []*ChannelDescriptor
+       chDescs      []*connection.ChannelDescriptor
        reactorsByCh map[byte]Reactor
        peers        *PeerSet
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       discv        discv
        bannedPeer   map[string]time.Time
        db           dbm.DB
        mtx          sync.Mutex
-
-       filterConnByAddr   func(net.Addr) error
-       filterConnByPubKey func(crypto.PubKeyEd25519) error
 }
 
-var (
-       ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
-       ErrConnectSelf         = errors.New("Connect self")
-       ErrPeerConnected       = errors.New("Peer is connected")
-)
+// NewSwitch create a new Switch and set discover.
+func NewSwitch(config *cfg.Config) (*Switch, error) {
+       var err error
+       var l Listener
+       var listenAddr string
+       var discv *discover.Network
 
-func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
-       sw := &Switch{
-               config:       config,
-               peerConfig:   DefaultPeerConfig(config),
-               reactors:     make(map[string]Reactor),
-               chDescs:      make([]*ChannelDescriptor, 0),
-               reactorsByCh: make(map[byte]Reactor),
-               peers:        NewPeerSet(),
-               dialing:      cmn.NewCMap(),
-               nodeInfo:     nil,
-               db:           trustHistoryDB,
+       blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
+       config.P2P.PrivateKey, err = config.NodeKey()
+       if err != nil {
+               return nil, err
        }
-       sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
 
-       sw.bannedPeer = make(map[string]time.Time)
-       if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
-               if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
-                       return nil
-               }
+       bytes, err := hex.DecodeString(config.P2P.PrivateKey)
+       if err != nil {
+               return nil, err
        }
-       trust.Init()
-       return sw
-}
 
-// Not goroutine safe.
-func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
-       // Validate the reactor.
-       // No two reactors can share the same channel.
-       reactorChannels := reactor.GetChannels()
-       for _, chDesc := range reactorChannels {
-               chID := chDesc.ID
-               if sw.reactorsByCh[chID] != nil {
-                       cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
+       var newKey [64]byte
+       copy(newKey[:], bytes)
+       privKey := crypto.PrivKeyEd25519(newKey)
+       if !config.VaultMode {
+               // Create listener
+               l, listenAddr = GetListener(config.P2P)
+               discv, err = discover.NewDiscover(config, &privKey, l.ExternalAddress().Port)
+               if err != nil {
+                       return nil, err
                }
-               sw.chDescs = append(sw.chDescs, chDesc)
-               sw.reactorsByCh[chID] = reactor
        }
-       sw.reactors[name] = reactor
-       reactor.SetSwitch(sw)
-       return reactor
-}
-
-// Not goroutine safe.
-func (sw *Switch) Reactors() map[string]Reactor {
-       return sw.reactors
-}
-
-// Not goroutine safe.
-func (sw *Switch) Reactor(name string) Reactor {
-       return sw.reactors[name]
-}
-
-// Not goroutine safe.
-func (sw *Switch) AddListener(l Listener) {
-       sw.listeners = append(sw.listeners, l)
-}
-
-// Not goroutine safe.
-func (sw *Switch) Listeners() []Listener {
-       return sw.listeners
-}
-
-// Not goroutine safe.
-func (sw *Switch) IsListening() bool {
-       return len(sw.listeners) > 0
-}
 
-// Not goroutine safe.
-func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
-       sw.nodeInfo = nodeInfo
+       return newSwitch(config, discv, blacklistDB, l, privKey, listenAddr)
 }
 
-// Not goroutine safe.
-func (sw *Switch) NodeInfo() *NodeInfo {
-       return sw.nodeInfo
-}
-
-// Not goroutine safe.
-// NOTE: Overwrites sw.nodeInfo.PubKey
-func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
-       sw.nodePrivKey = nodePrivKey
-       if sw.nodeInfo != nil {
-               sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
+// newSwitch creates a new Switch with the given config.
+func newSwitch(config *cfg.Config, discv discv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
+       sw := &Switch{
+               Config:       config,
+               peerConfig:   DefaultPeerConfig(config.P2P),
+               reactors:     make(map[string]Reactor),
+               chDescs:      make([]*connection.ChannelDescriptor, 0),
+               reactorsByCh: make(map[byte]Reactor),
+               peers:        NewPeerSet(),
+               dialing:      cmn.NewCMap(),
+               nodePrivKey:  priv,
+               discv:        discv,
+               db:           blacklistDB,
+               nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
+               bannedPeer:   make(map[string]time.Time),
+       }
+       if err := sw.loadBannedPeers(); err != nil {
+               return nil, err
        }
+
+       sw.AddListener(l)
+       sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
+       trust.Init()
+       return sw, nil
 }
 
-// Switch.Start() starts all the reactors, peers, and listeners.
+// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
 func (sw *Switch) OnStart() error {
-       sw.BaseService.OnStart()
-       // Start reactors
        for _, reactor := range sw.reactors {
-               _, err := reactor.Start()
-               if err != nil {
+               if _, err := reactor.Start(); err != nil {
                        return err
                }
        }
-       // Start peers
-       for _, peer := range sw.peers.List() {
-               sw.startInitPeer(peer)
-       }
-       // Start listeners
        for _, listener := range sw.listeners {
                go sw.listenerRoutine(listener)
        }
+       go sw.ensureOutboundPeersRoutine()
        return nil
 }
 
+// OnStop implements BaseService. It stops all listeners, peers, and reactors.
 func (sw *Switch) OnStop() {
-       sw.BaseService.OnStop()
-       // Stop listeners
        for _, listener := range sw.listeners {
                listener.Stop()
        }
        sw.listeners = nil
-       // Stop peers
+
        for _, peer := range sw.peers.List() {
                peer.Stop()
                sw.peers.Remove(peer)
        }
-       // Stop reactors
+
        for _, reactor := range sw.reactors {
                reactor.Stop()
        }
 }
 
-// NOTE: This performs a blocking handshake before the peer is added.
-// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
-func (sw *Switch) AddPeer(peer *Peer) error {
-       if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
-               return err
-       }
+//AddBannedPeer add peer to blacklist
+func (sw *Switch) AddBannedPeer(ip string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
 
-       if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
+       sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
+       dataJSON, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
                return err
        }
 
-       if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
+       sw.db.Set([]byte(bannedPeerKey), dataJSON)
+       return nil
+}
+
+// AddPeer performs the P2P handshake with a peer
+// that already has a SecretConnection. If all goes well,
+// it starts the peer and adds it to the switch.
+// NOTE: This performs a blocking handshake before the peer is added.
+// CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
+func (sw *Switch) AddPeer(pc *peerConn) error {
+       peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
+       if err != nil {
                return err
        }
 
-       if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
+       if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
                return err
        }
-
-       // Avoid self
-       if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
-               return errors.New("Ignoring connection from self")
+       if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
+               return err
        }
 
-       // Check version, chain id
-       if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
+       peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
+       if err := sw.filterConnByPeer(peer); err != nil {
                return err
        }
 
-       // Check for duplicate peer
-       if sw.peers.Has(peer.Key) {
-               return ErrSwitchDuplicatePeer
-
+       if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
+               return ErrConnectSpvPeer
        }
 
        // Start peer
@@ -263,158 +202,86 @@ func (sw *Switch) AddPeer(peer *Peer) error {
                }
        }
 
-       // Add the peer to .peers.
-       // We start it first so that a peer in the list is safe to Stop.
-       // It should not err since we already checked peers.Has()
-       if err := sw.peers.Add(peer); err != nil {
-               return err
-       }
-
-       log.WithField("peer", peer).Info("Added peer")
-       return nil
+       return sw.peers.Add(peer)
 }
 
-func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
-       if sw.filterConnByAddr != nil {
-               return sw.filterConnByAddr(addr)
-       }
-       return nil
-}
-
-func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
-       if sw.filterConnByPubKey != nil {
-               return sw.filterConnByPubKey(pubkey)
+// AddReactor adds the given reactor to the switch.
+// NOTE: Not goroutine safe.
+func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
+       // Validate the reactor.
+       // No two reactors can share the same channel.
+       for _, chDesc := range reactor.GetChannels() {
+               chID := chDesc.ID
+               if sw.reactorsByCh[chID] != nil {
+                       cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
+               }
+               sw.chDescs = append(sw.chDescs, chDesc)
+               sw.reactorsByCh[chID] = reactor
        }
-       return nil
-
-}
-
-func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
-       sw.filterConnByAddr = f
+       sw.reactors[name] = reactor
+       reactor.SetSwitch(sw)
+       return reactor
 }
 
-func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
-       sw.filterConnByPubKey = f
+// AddListener adds the given listener to the switch for listening to incoming peer connections.
+// NOTE: Not goroutine safe.
+func (sw *Switch) AddListener(l Listener) {
+       sw.listeners = append(sw.listeners, l)
 }
 
-func (sw *Switch) startInitPeer(peer *Peer) error {
-       peer.Start() // spawn send/recv routines
-       for _, reactor := range sw.reactors {
-               if err := reactor.AddPeer(peer); err != nil {
-                       return err
-               }
+//DialPeerWithAddress dial node from net address
+func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
+       log.Debug("Dialing peer address:", addr)
+       sw.dialing.Set(addr.IP.String(), addr)
+       defer sw.dialing.Delete(addr.IP.String())
+       if err := sw.filterConnByIP(addr.IP.String()); err != nil {
+               return err
        }
-       return nil
-}
-
-// Dial a list of seeds asynchronously in random order
-func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
 
-       netAddrs, err := NewNetAddressStrings(seeds)
+       pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
+               log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
                return err
        }
 
-       if addrBook != nil {
-               // add seeds to `addrBook`
-               ourAddrS := sw.nodeInfo.ListenAddr
-               ourAddr, _ := NewNetAddressString(ourAddrS)
-               for _, netAddr := range netAddrs {
-                       // do not add ourselves
-                       if netAddr.Equals(ourAddr) {
-                               continue
-                       }
-                       addrBook.AddAddress(netAddr, ourAddr)
-               }
-
-               addrBook.Save()
-       }
-       //permute the list, dial them in random order.
-       perm := rand.Perm(len(netAddrs))
-       for i := 0; i < len(perm)/2; i++ {
-               j := perm[i]
-               sw.dialSeed(netAddrs[j])
+       if err = sw.AddPeer(pc); err != nil {
+               log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
+               pc.CloseConn()
+               return err
        }
-
+       log.Debug("DialPeer added peer:", addr)
        return nil
 }
 
-func (sw *Switch) dialSeed(addr *NetAddress) {
-       peer, err := sw.DialPeerWithAddress(addr, false)
-       if err != nil {
-               log.WithField("error", err).Error("Error dialing seed")
-       } else {
-               log.WithField("peer", peer).Info("Connected to seed")
-       }
+//IsDialing prevent duplicate dialing
+func (sw *Switch) IsDialing(addr *NetAddress) bool {
+       return sw.dialing.Has(addr.IP.String())
 }
 
-func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
-       if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
-               return nil, err
-       }
-       if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
-               return nil, ErrConnectSelf
-       }
-       for _, v := range sw.Peers().list {
-               if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
-                       return nil, ErrPeerConnected
-               }
-       }
-       sw.dialing.Set(addr.IP.String(), addr)
-       defer sw.dialing.Delete(addr.IP.String())
+// IsListening returns true if the switch has at least one listener.
+// NOTE: Not goroutine safe.
+func (sw *Switch) IsListening() bool {
+       return len(sw.listeners) > 0
+}
 
-       log.WithField("address", addr).Info("Dialing peer")
-       peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
-       if err != nil {
-               log.WithFields(log.Fields{
-                       "address": addr,
-                       "error":   err,
-               }).Info("Failed to dial peer")
-               return nil, err
-       }
-       peer.SetLogger(sw.Logger.With("peer", addr))
-       if persistent {
-               peer.makePersistent()
-       }
-       err = sw.AddPeer(peer)
-       if err != nil {
-               log.WithFields(log.Fields{
-                       "address": addr,
-                       "error":   err,
-               }).Info("Failed to add peer")
-               peer.CloseConn()
-               return nil, err
+// loadBannedPeers load banned peers from db
+func (sw *Switch) loadBannedPeers() error {
+       if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
+               if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
+                       return err
+               }
        }
-       log.WithFields(log.Fields{
-               "address": addr,
-       }).Info("Dialed and added peer")
-       return peer, nil
-}
 
-func (sw *Switch) IsDialing(addr *NetAddress) bool {
-       return sw.dialing.Has(addr.IP.String())
+       return nil
 }
 
-// Broadcast runs a go routine for each attempted send, which will block
-// trying to send for defaultSendTimeoutSeconds. Returns a channel
-// which receives success values for each attempted send (false if times out)
-// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
-func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
-       successChan := make(chan bool, len(sw.peers.List()))
-       log.WithFields(log.Fields{
-               "chID": chID,
-               "msg":  msg,
-       }).Debug("Broadcast")
-       for _, peer := range sw.peers.List() {
-               go func(peer *Peer) {
-                       success := peer.Send(chID, msg)
-                       successChan <- success
-               }(peer)
-       }
-       return successChan
+// Listeners returns the list of listeners the switch listens on.
+// NOTE: Not goroutine safe.
+func (sw *Switch) Listeners() []Listener {
+       return sw.listeners
 }
 
-// Returns the count of outbound/inbound and outbound-dialing peers.
+// NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        peers := sw.peers.List()
        for _, peer := range peers {
@@ -428,69 +295,99 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        return
 }
 
+// NodeInfo returns the switch's NodeInfo.
+// NOTE: Not goroutine safe.
+func (sw *Switch) NodeInfo() *NodeInfo {
+       return sw.nodeInfo
+}
+
+//Peers return switch peerset
 func (sw *Switch) Peers() *PeerSet {
        return sw.peers
 }
 
-// Disconnect from a peer due to external error, retry if it is a persistent peer.
-// TODO: make record depending on reason.
+// StopPeerForError disconnects from a peer due to external error.
 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
-       addr := NewNetAddress(peer.Addr())
-       log.WithFields(log.Fields{
-               "peer":  peer,
-               "error": reason,
-       }).Info("Stopping peer due to error")
+       log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
        sw.stopAndRemovePeer(peer, reason)
+}
 
-       if peer.IsPersistent() {
-               log.WithField("peer", peer).Info("Reconnecting to peer")
-               for i := 1; i < reconnectAttempts; i++ {
-                       if !sw.IsRunning() {
-                               return
-                       }
+// StopPeerGracefully disconnect from a peer gracefully.
+func (sw *Switch) StopPeerGracefully(peerID string) {
+       if peer := sw.peers.Get(peerID); peer != nil {
+               sw.stopAndRemovePeer(peer, nil)
+       }
+}
 
-                       peer, err := sw.DialPeerWithAddress(addr, false)
-                       if err != nil {
-                               if i == reconnectAttempts {
-                                       log.WithFields(log.Fields{
-                                               "retries": i,
-                                               "error":   err,
-                                       }).Info("Error reconnecting to peer. Giving up")
-                                       return
-                               }
-
-                               if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
-                                       log.WithField("error", err).Info("Error reconnecting to peer. ")
-                                       return
-                               }
-
-                               log.WithFields(log.Fields{
-                                       "retries": i,
-                                       "error":   err,
-                               }).Info("Error reconnecting to peer. Trying again")
-                               time.Sleep(reconnectInterval)
-                               continue
-                       }
+func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
+       peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
+       if err != nil {
+               if err := conn.Close(); err != nil {
+                       log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
+               }
+               return err
+       }
 
-                       log.WithField("peer", peer).Info("Reconnected to peer")
-                       return
+       if err = sw.AddPeer(peerConn); err != nil {
+               if err := conn.Close(); err != nil {
+                       log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
+               }
+               return err
+       }
+
+       return nil
+}
+
+func (sw *Switch) checkBannedPeer(peer string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       if banEnd, ok := sw.bannedPeer[peer]; ok {
+               if time.Now().Before(banEnd) {
+                       return ErrConnectBannedPeer
+               }
+
+               if err := sw.delBannedPeer(peer); err != nil {
+                       return err
                }
        }
+       return nil
 }
 
-// Disconnect from a peer gracefully.
-// TODO: handle graceful disconnects.
-func (sw *Switch) StopPeerGracefully(peer *Peer) {
-       log.Info("Stopping peer gracefully")
-       sw.stopAndRemovePeer(peer, nil)
+func (sw *Switch) delBannedPeer(addr string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       delete(sw.bannedPeer, addr)
+       datajson, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
+               return err
+       }
+
+       sw.db.Set([]byte(bannedPeerKey), datajson)
+       return nil
 }
 
-func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
-       for _, reactor := range sw.reactors {
-               reactor.RemovePeer(peer, reason)
+func (sw *Switch) filterConnByIP(ip string) error {
+       if ip == sw.nodeInfo.listenHost() {
+               return ErrConnectSelf
        }
-       sw.peers.Remove(peer)
-       peer.Stop()
+       return sw.checkBannedPeer(ip)
+}
+
+func (sw *Switch) filterConnByPeer(peer *Peer) error {
+       if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
+               return err
+       }
+
+       if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) {
+               return ErrConnectSelf
+       }
+
+       if sw.peers.Has(peer.Key) {
+               return ErrDuplicatePeer
+       }
+       return nil
 }
 
 func (sw *Switch) listenerRoutine(l Listener) {
@@ -500,204 +397,109 @@ func (sw *Switch) listenerRoutine(l Listener) {
                        break
                }
 
-               // ignore connection if we already have enough
-               maxPeers := sw.config.MaxNumPeers
-               if maxPeers <= sw.peers.Size() {
-                       // close inConn
-                       inConn.Close()
-                       log.WithFields(log.Fields{
-                               "address":  inConn.RemoteAddr().String(),
-                               "numPeers": sw.peers.Size(),
-                               "max":      maxPeers,
-                       }).Info("Ignoring inbound connection: already have enough peers")
+               // disconnect if we alrady have MaxNumPeers
+               if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
+                       if err := inConn.Close(); err != nil {
+                               log.WithFields(log.Fields{"remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
+                       }
+                       log.Info("Ignoring inbound connection: already have enough peers.")
                        continue
                }
 
                // New inbound connection!
-               err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
-               if err != nil {
-                       // conn close for returing err
-                       inConn.Close()
-                       log.WithFields(log.Fields{
-                               "address": inConn.RemoteAddr().String(),
-                               "error":   err,
-                       }).Info("Ignoring inbound connection: error while adding peer")
+               if err := sw.addPeerWithConnection(inConn); err != nil {
+                       log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
                        continue
                }
-
-               // NOTE: We don't yet have the listening port of the
-               // remote (if they have a listener at all).
-               // The peerHandshake will handle that
        }
-
-       // cleanup
-}
-
-//-----------------------------------------------------------------------------
-
-type SwitchEventNewPeer struct {
-       Peer *Peer
 }
 
-type SwitchEventDonePeer struct {
-       Peer  *Peer
-       Error interface{}
+func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
+       if err := sw.DialPeerWithAddress(a); err != nil {
+               log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
+       }
+       wg.Done()
 }
 
-//------------------------------------------------------------------
-// Switches connected via arbitrary net.Conn; useful for testing
-
-// Returns n switches, connected according to the connect func.
-// If connect==Connect2Switches, the switches will be fully connected.
-// initSwitch defines how the ith switch should be initialized (ie. with what reactors).
-// NOTE: panics if any switch fails to start.
-func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
-       switches := make([]*Switch, n)
-       for i := 0; i < n; i++ {
-               switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
+func (sw *Switch) ensureOutboundPeers() {
+       numOutPeers, _, numDialing := sw.NumPeers()
+       numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
+       log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
+       if numToDial <= 0 {
+               return
        }
 
-       if err := StartSwitches(switches); err != nil {
-               panic(err)
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.remoteAddrHost()] = struct{}{}
        }
 
+       var wg sync.WaitGroup
+       nodes := make([]*discover.Node, numToDial)
+       n := sw.discv.ReadRandomNodes(nodes)
        for i := 0; i < n; i++ {
-               for j := i; j < n; j++ {
-                       connect(switches, i, j)
+               try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
                }
-       }
-
-       return switches
-}
-
-var PanicOnAddPeerErr = false
-
-// Will connect switches i and j via net.Pipe()
-// Blocks until a conection is established.
-// NOTE: caller ensures i and j are within bounds
-func Connect2Switches(switches []*Switch, i, j int) {
-       switchI := switches[i]
-       switchJ := switches[j]
-       c1, c2 := net.Pipe()
-       doneCh := make(chan struct{})
-       go func() {
-               err := switchI.addPeerWithConnection(c1)
-               if PanicOnAddPeerErr && err != nil {
-                       panic(err)
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
                }
-               doneCh <- struct{}{}
-       }()
-       go func() {
-               err := switchJ.addPeerWithConnection(c2)
-               if PanicOnAddPeerErr && err != nil {
-                       panic(err)
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
                }
-               doneCh <- struct{}{}
-       }()
-       <-doneCh
-       <-doneCh
-}
 
-func StartSwitches(switches []*Switch) error {
-       for _, s := range switches {
-               _, err := s.Start() // start switch and reactors
-               if err != nil {
-                       return err
-               }
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
        }
-       return nil
+       wg.Wait()
 }
 
-func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
-       privKey := crypto.GenPrivKeyEd25519()
-       // new switch, add reactors
-       // TODO: let the config be passed in?
-       s := initSwitch(i, NewSwitch(cfg, nil))
-       s.SetNodeInfo(&NodeInfo{
-               PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker:    cmn.Fmt("switch%d", i),
-               Network:    network,
-               Version:    version,
-               RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
-               ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
-       })
-       s.SetNodePrivKey(privKey)
-       return s
-}
+func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureOutboundPeers()
 
-func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
-       if err != nil {
-               conn.Close()
-               return err
-       }
-       peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
-       if err = sw.AddPeer(peer); err != nil {
-               conn.Close()
-               return err
-       }
-
-       return nil
-}
-
-func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
-       fullAddr := conn.RemoteAddr().String()
-       host, _, err := net.SplitHostPort(fullAddr)
-       if err != nil {
-               return err
-       }
+       ticker := time.NewTicker(10 * time.Second)
+       defer ticker.Stop()
 
-       if err = sw.checkBannedPeer(host); err != nil {
-               return err
-       }
-
-       peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
-       if err != nil {
-               return err
-       }
-       peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
-       if err = sw.AddPeer(peer); err != nil {
-               return err
+       for {
+               select {
+               case <-ticker.C:
+                       sw.ensureOutboundPeers()
+               case <-sw.Quit:
+                       return
+               }
        }
-
-       return nil
 }
 
-func (sw *Switch) AddBannedPeer(peer *Peer) error {
-       sw.mtx.Lock()
-       defer sw.mtx.Unlock()
-       if peer == nil {
-               return nil
-       }
-       key := peer.mconn.RemoteAddress.IP.String()
-       sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
-       datajson, err := json.Marshal(sw.bannedPeer)
-       if err != nil {
-               return err
+func (sw *Switch) startInitPeer(peer *Peer) error {
+       // spawn send/recv routines
+       if _, err := peer.Start(); err != nil {
+               log.WithFields(log.Fields{"remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
        }
-       sw.db.Set([]byte(bannedPeerKey), datajson)
-       return nil
-}
 
-func (sw *Switch) delBannedPeer(addr string) error {
-       delete(sw.bannedPeer, addr)
-       datajson, err := json.Marshal(sw.bannedPeer)
-       if err != nil {
-               return err
+       for _, reactor := range sw.reactors {
+               if err := reactor.AddPeer(peer); err != nil {
+                       return err
+               }
        }
-       sw.db.Set([]byte(bannedPeerKey), datajson)
        return nil
 }
 
-func (sw *Switch) checkBannedPeer(peer string) error {
-       sw.mtx.Lock()
-       defer sw.mtx.Unlock()
-
-       if banEnd, ok := sw.bannedPeer[peer]; ok {
-               if time.Now().Before(banEnd) {
-                       return ErrConnectBannedPeer
-               }
-               sw.delBannedPeer(peer)
+func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
+       sw.peers.Remove(peer)
+       for _, reactor := range sw.reactors {
+               reactor.RemovePeer(peer, reason)
        }
-       return nil
+       peer.Stop()
+
+       sentStatus, receivedStatus := peer.TrafficStatus()
+       log.WithFields(log.Fields{
+               "address":               peer.Addr().String(),
+               "reason":                reason,
+               "duration":              sentStatus.Duration.String(),
+               "total_sent":            sentStatus.Bytes,
+               "total_received":        receivedStatus.Bytes,
+               "average_sent_rate":     sentStatus.AvgRate,
+               "average_received_rate": receivedStatus.AvgRate,
+       }).Info("disconnect with peer")
 }