OSDN Git Service

Add peer start dial seeds function
[bytom/bytom-spv.git] / p2p / switch.go
index e896b67..d2f8f0f 100644 (file)
@@ -1,28 +1,40 @@
 package p2p
 
 import (
-       "errors"
+       "encoding/json"
        "fmt"
        "math/rand"
        "net"
+       "strings"
+       "sync"
        "time"
 
-       cfg "github.com/bytom/config"
+       log "github.com/sirupsen/logrus"
        crypto "github.com/tendermint/go-crypto"
        cmn "github.com/tendermint/tmlibs/common"
+       dbm "github.com/tendermint/tmlibs/db"
+
+       cfg "github.com/bytom/config"
+       "github.com/bytom/errors"
+       "github.com/bytom/p2p/trust"
 )
 
 const (
-       reconnectAttempts = 30
-       reconnectInterval = 3 * time.Second
+       reconnectAttempts = 10
+       reconnectInterval = 10 * time.Second
+
+       bannedPeerKey      = "BannedPeer"
+       defaultBanDuration = time.Hour * 24
 )
 
+var ErrConnectBannedPeer = errors.New("Connect banned peer")
+
 type Reactor interface {
        cmn.Service // Start, Stop
 
        SetSwitch(*Switch)
        GetChannels() []*ChannelDescriptor
-       AddPeer(peer *Peer)
+       AddPeer(peer *Peer) error
        RemovePeer(peer *Peer, reason interface{})
        Receive(chID byte, peer *Peer, msgBytes []byte)
 }
@@ -70,6 +82,9 @@ type Switch struct {
        dialing      *cmn.CMap
        nodeInfo     *NodeInfo             // our node info
        nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
+       bannedPeer   map[string]time.Time
+       db           dbm.DB
+       mtx          sync.Mutex
 
        filterConnByAddr   func(net.Addr) error
        filterConnByPubKey func(crypto.PubKeyEd25519) error
@@ -77,20 +92,31 @@ type Switch struct {
 
 var (
        ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
+       ErrConnectSelf         = errors.New("Connect self")
+       ErrPeerConnected       = errors.New("Peer is connected")
 )
 
-func NewSwitch(config *cfg.P2PConfig) *Switch {
+func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
        sw := &Switch{
                config:       config,
-               peerConfig:   DefaultPeerConfig(),
+               peerConfig:   DefaultPeerConfig(config),
                reactors:     make(map[string]Reactor),
                chDescs:      make([]*ChannelDescriptor, 0),
                reactorsByCh: make(map[byte]Reactor),
                peers:        NewPeerSet(),
                dialing:      cmn.NewCMap(),
                nodeInfo:     nil,
+               db:           trustHistoryDB,
        }
        sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
+
+       sw.bannedPeer = make(map[string]time.Time)
+       if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
+               if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
+                       return nil
+               }
+       }
+       trust.Init()
        return sw
 }
 
@@ -210,6 +236,10 @@ func (sw *Switch) AddPeer(peer *Peer) error {
                return err
        }
 
+       if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
+               return err
+       }
+
        // Avoid self
        if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
                return errors.New("Ignoring connection from self")
@@ -228,7 +258,9 @@ func (sw *Switch) AddPeer(peer *Peer) error {
 
        // Start peer
        if sw.IsRunning() {
-               sw.startInitPeer(peer)
+               if err := sw.startInitPeer(peer); err != nil {
+                       return err
+               }
        }
 
        // Add the peer to .peers.
@@ -238,7 +270,7 @@ func (sw *Switch) AddPeer(peer *Peer) error {
                return err
        }
 
-       sw.Logger.Info("Added peer", "peer", peer)
+       log.WithField("peer", peer).Info("Added peer")
        return nil
 }
 
@@ -265,11 +297,14 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
        sw.filterConnByPubKey = f
 }
 
-func (sw *Switch) startInitPeer(peer *Peer) {
+func (sw *Switch) startInitPeer(peer *Peer) error {
        peer.Start() // spawn send/recv routines
        for _, reactor := range sw.reactors {
-               reactor.AddPeer(peer)
+               if err := reactor.AddPeer(peer); err != nil {
+                       return err
+               }
        }
+       return nil
 }
 
 // Dial a list of seeds asynchronously in random order
@@ -291,38 +326,50 @@ func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
                        }
                        addrBook.AddAddress(netAddr, ourAddr)
                }
+
                addrBook.Save()
        }
-
-       // permute the list, dial them in random order.
+       //permute the list, dial them in random order.
        perm := rand.Perm(len(netAddrs))
-       for i := 0; i < len(perm); i++ {
-               go func(i int) {
-                       time.Sleep(time.Duration(rand.Int63n(3000)) * time.Millisecond)
-                       j := perm[i]
-                       sw.dialSeed(netAddrs[j])
-               }(i)
+       for i := 0; i < len(perm)/2; i++ {
+               j := perm[i]
+               sw.dialSeed(netAddrs[j])
        }
+
        return nil
 }
 
 func (sw *Switch) dialSeed(addr *NetAddress) {
        peer, err := sw.DialPeerWithAddress(addr, true)
        if err != nil {
-               sw.Logger.Error("Error dialing seed", "error", err)
+               log.WithField("error", err).Error("Error dialing seed")
        } else {
-               sw.Logger.Info("Connected to seed", "peer", peer)
+               log.WithField("peer", peer).Info("Connected to seed")
        }
 }
 
 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
+       if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
+               return nil, err
+       }
+       if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
+               return nil, ErrConnectSelf
+       }
+       for _, v := range sw.Peers().list {
+               if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
+                       return nil, ErrPeerConnected
+               }
+       }
        sw.dialing.Set(addr.IP.String(), addr)
        defer sw.dialing.Delete(addr.IP.String())
 
-       sw.Logger.Info("Dialing peer", "address", addr)
+       log.WithField("address", addr).Info("Dialing peer")
        peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
        if err != nil {
-               sw.Logger.Error("Failed to dial peer", "address", addr, "error", err)
+               log.WithFields(log.Fields{
+                       "address": addr,
+                       "error":   err,
+               }).Info("Failed to dial peer")
                return nil, err
        }
        peer.SetLogger(sw.Logger.With("peer", addr))
@@ -331,11 +378,16 @@ func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer,
        }
        err = sw.AddPeer(peer)
        if err != nil {
-               sw.Logger.Error("Failed to add peer", "address", addr, "error", err)
+               log.WithFields(log.Fields{
+                       "address": addr,
+                       "error":   err,
+               }).Info("Failed to add peer")
                peer.CloseConn()
                return nil, err
        }
-       sw.Logger.Info("Dialed and added peer", "address", addr, "peer", peer)
+       log.WithFields(log.Fields{
+               "address": addr,
+       }).Info("Dialed and added peer")
        return peer, nil
 }
 
@@ -349,7 +401,10 @@ func (sw *Switch) IsDialing(addr *NetAddress) bool {
 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
        successChan := make(chan bool, len(sw.peers.List()))
-       sw.Logger.Debug("Broadcast", "channel", chID, "msg", msg)
+       log.WithFields(log.Fields{
+               "chID": chID,
+               "msg":  msg,
+       }).Debug("Broadcast")
        for _, peer := range sw.peers.List() {
                go func(peer *Peer) {
                        success := peer.Send(chID, msg)
@@ -373,7 +428,7 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        return
 }
 
-func (sw *Switch) Peers() IPeerSet {
+func (sw *Switch) Peers() *PeerSet {
        return sw.peers
 }
 
@@ -381,48 +436,61 @@ func (sw *Switch) Peers() IPeerSet {
 // TODO: make record depending on reason.
 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
        addr := NewNetAddress(peer.Addr())
-       sw.Logger.Info("Stopping peer for error", "peer", peer, "error", reason)
+       log.WithFields(log.Fields{
+               "peer":  peer,
+               "error": reason,
+       }).Info("Stopping peer due to error")
        sw.stopAndRemovePeer(peer, reason)
 
        if peer.IsPersistent() {
-               go func() {
-                       sw.Logger.Info("Reconnecting to peer", "peer", peer)
-                       for i := 1; i < reconnectAttempts; i++ {
-                               if !sw.IsRunning() {
+               log.WithField("peer", peer).Info("Reconnecting to peer")
+               for i := 1; i < reconnectAttempts; i++ {
+                       if !sw.IsRunning() {
+                               return
+                       }
+
+                       peer, err := sw.DialPeerWithAddress(addr, true)
+                       if err != nil {
+                               if i == reconnectAttempts {
+                                       log.WithFields(log.Fields{
+                                               "retries": i,
+                                               "error":   err,
+                                       }).Info("Error reconnecting to peer. Giving up")
                                        return
                                }
 
-                               peer, err := sw.DialPeerWithAddress(addr, true)
-                               if err != nil {
-                                       if i == reconnectAttempts {
-                                               sw.Logger.Info("Error reconnecting to peer. Giving up", "tries", i, "error", err)
-                                               return
-                                       }
-                                       sw.Logger.Info("Error reconnecting to peer. Trying again", "tries", i, "error", err)
-                                       time.Sleep(reconnectInterval)
-                                       continue
+                               if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
+                                       log.WithField("error", err).Info("Error reconnecting to peer. ")
+                                       return
                                }
 
-                               sw.Logger.Info("Reconnected to peer", "peer", peer)
-                               return
+                               log.WithFields(log.Fields{
+                                       "retries": i,
+                                       "error":   err,
+                               }).Info("Error reconnecting to peer. Trying again")
+                               time.Sleep(reconnectInterval)
+                               continue
                        }
-               }()
+
+                       log.WithField("peer", peer).Info("Reconnected to peer")
+                       return
+               }
        }
 }
 
 // Disconnect from a peer gracefully.
 // TODO: handle graceful disconnects.
 func (sw *Switch) StopPeerGracefully(peer *Peer) {
-       sw.Logger.Info("Stopping peer gracefully")
+       log.Info("Stopping peer gracefully")
        sw.stopAndRemovePeer(peer, nil)
 }
 
 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
-       sw.peers.Remove(peer)
-       peer.Stop()
        for _, reactor := range sw.reactors {
                reactor.RemovePeer(peer, reason)
        }
+       sw.peers.Remove(peer)
+       peer.Stop()
 }
 
 func (sw *Switch) listenerRoutine(l Listener) {
@@ -435,14 +503,25 @@ func (sw *Switch) listenerRoutine(l Listener) {
                // ignore connection if we already have enough
                maxPeers := sw.config.MaxNumPeers
                if maxPeers <= sw.peers.Size() {
-                       sw.Logger.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxPeers)
+                       // close inConn
+                       inConn.Close()
+                       log.WithFields(log.Fields{
+                               "address":  inConn.RemoteAddr().String(),
+                               "numPeers": sw.peers.Size(),
+                               "max":      maxPeers,
+                       }).Info("Ignoring inbound connection: already have enough peers")
                        continue
                }
 
                // New inbound connection!
                err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
                if err != nil {
-                       sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "error", err)
+                       // conn close for returing err
+                       inConn.Close()
+                       log.WithFields(log.Fields{
+                               "address": inConn.RemoteAddr().String(),
+                               "error":   err,
+                       }).Info("Ignoring inbound connection: error while adding peer")
                        continue
                }
 
@@ -533,7 +612,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
        privKey := crypto.GenPrivKeyEd25519()
        // new switch, add reactors
        // TODO: let the config be passed in?
-       s := initSwitch(i, NewSwitch(cfg))
+       s := initSwitch(i, NewSwitch(cfg, nil))
        s.SetNodeInfo(&NodeInfo{
                PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
                Moniker:    cmn.Fmt("switch%d", i),
@@ -547,7 +626,7 @@ func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch f
 }
 
 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
-       peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey)
+       peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
        if err != nil {
                conn.Close()
                return err
@@ -562,16 +641,61 @@ func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
 }
 
 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
+       fullAddr := conn.RemoteAddr().String()
+       host, _, err := net.SplitHostPort(fullAddr)
+       if err != nil {
+               return err
+       }
+
+       if err = sw.checkBannedPeer(host); err != nil {
+               return err
+       }
+
        peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
        if err != nil {
-               conn.Close()
                return err
        }
        peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
        if err = sw.AddPeer(peer); err != nil {
-               conn.Close()
                return err
        }
 
        return nil
 }
+
+func (sw *Switch) AddBannedPeer(peer *Peer) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       key := peer.mconn.RemoteAddress.IP.String()
+       sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
+       datajson, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
+               return err
+       }
+       sw.db.Set([]byte(bannedPeerKey), datajson)
+       return nil
+}
+
+func (sw *Switch) DelBannedPeer(addr string) error {
+       sw.mtx.Lock()
+       defer sw.mtx.Unlock()
+
+       delete(sw.bannedPeer, addr)
+       datajson, err := json.Marshal(sw.bannedPeer)
+       if err != nil {
+               return err
+       }
+       sw.db.Set([]byte(bannedPeerKey), datajson)
+       return nil
+}
+
+func (sw *Switch) checkBannedPeer(peer string) error {
+       if banEnd, ok := sw.bannedPeer[peer]; ok {
+               if time.Now().Before(banEnd) {
+                       return ErrConnectBannedPeer
+               }
+               sw.DelBannedPeer(peer)
+       }
+       return nil
+}