X-Git-Url: http://git.sourceforge.jp/view?a=blobdiff_plain;f=p2p%2Fswitch.go;h=ec4b73d588afdc2411a877133b72e278d2506cdb;hb=6a330d7cc312143dc9b1a48721c07b9671102bcc;hp=9f9882036cc5b9406266afb4f5bd0b9aa87eb17f;hpb=1e39e295cbe81d0fcd5e89179d216e924a90eb60;p=bytom%2Fbytom.git diff --git a/p2p/switch.go b/p2p/switch.go index 9f988203..ec4b73d5 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -1,265 +1,198 @@ package p2p import ( + "encoding/hex" "encoding/json" "fmt" - "math/rand" "net" "sync" "time" - "strings" log "github.com/sirupsen/logrus" - crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" dbm "github.com/tendermint/tmlibs/db" cfg "github.com/bytom/config" + "github.com/bytom/consensus" "github.com/bytom/errors" + "github.com/bytom/p2p/connection" + "github.com/bytom/p2p/discover" "github.com/bytom/p2p/trust" + "github.com/bytom/version" ) const ( - reconnectAttempts = 10 - reconnectInterval = 10 * time.Second - - bannedPeerKey = "BannedPeer" - defaultBanDuration = time.Hour * 24 - peerBannedTM = 20 + bannedPeerKey = "BannedPeer" + defaultBanDuration = time.Hour * 1 + minNumOutboundPeers = 3 ) -var ErrConnectBannedPeer = errors.New("Connect banned peer") - -type Reactor interface { - cmn.Service // Start, Stop +//pre-define errors for connecting fail +var ( + ErrDuplicatePeer = errors.New("Duplicate peer") + ErrConnectSelf = errors.New("Connect self") + ErrConnectBannedPeer = errors.New("Connect banned peer") + ErrConnectSpvPeer = errors.New("Outbound connect spv peer") +) - SetSwitch(*Switch) - GetChannels() []*ChannelDescriptor - AddPeer(peer *Peer) error - RemovePeer(peer *Peer, reason interface{}) - Receive(chID byte, peer *Peer, msgBytes []byte) +type discv interface { + ReadRandomNodes(buf []*discover.Node) (n int) } -//-------------------------------------- +// Switch handles peer connections and exposes an API to receive incoming messages +// on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one +// or more `Channels`. So while sending outgoing messages is typically performed on the peer, +// incoming messages are received on the reactor. +type Switch struct { + cmn.BaseService -type BaseReactor struct { - cmn.BaseService // Provides Start, Stop, .Quit - Switch *Switch -} + Config *cfg.Config + peerConfig *PeerConfig + listeners []Listener + reactors map[string]Reactor + chDescs []*connection.ChannelDescriptor + reactorsByCh map[byte]Reactor + peers *PeerSet + dialing *cmn.CMap + nodeInfo *NodeInfo // our node info + nodePrivKey crypto.PrivKeyEd25519 // our node privkey + discv discv + bannedPeer map[string]time.Time + db dbm.DB + mtx sync.Mutex +} + +// NewSwitch create a new Switch and set discover. +func NewSwitch(config *cfg.Config) (*Switch, error) { + var err error + var l Listener + var listenAddr string + var discv *discover.Network + + blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()) + config.P2P.PrivateKey, err = config.NodeKey() + if err != nil { + return nil, err + } -func NewBaseReactor(name string, impl Reactor) *BaseReactor { - return &BaseReactor{ - BaseService: *cmn.NewBaseService(nil, name, impl), - Switch: nil, + bytes, err := hex.DecodeString(config.P2P.PrivateKey) + if err != nil { + return nil, err } -} -func (br *BaseReactor) SetSwitch(sw *Switch) { - br.Switch = sw -} -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } -func (_ *BaseReactor) AddPeer(peer *Peer) {} -func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {} -func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {} - -//----------------------------------------------------------------------------- - -/* -The `Switch` handles peer connections and exposes an API to receive incoming messages -on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one -or more `Channels`. So while sending outgoing messages is typically performed on the peer, -incoming messages are received on the reactor. -*/ -type Switch struct { - cmn.BaseService + var newKey [64]byte + copy(newKey[:], bytes) + privKey := crypto.PrivKeyEd25519(newKey) + if !config.VaultMode { + // Create listener + l, listenAddr = GetListener(config.P2P) + discv, err = discover.NewDiscover(config, &privKey, l.ExternalAddress().Port) + if err != nil { + return nil, err + } + } - config *cfg.P2PConfig - peerConfig *PeerConfig - listeners []Listener - reactors map[string]Reactor - chDescs []*ChannelDescriptor - reactorsByCh map[byte]Reactor - peers *PeerSet - dialing *cmn.CMap - nodeInfo *NodeInfo // our node info - nodePrivKey crypto.PrivKeyEd25519 // our node privkey - bannedPeer map[string]time.Time - db dbm.DB - TrustMetricStore *trust.TrustMetricStore - ScamPeerCh chan *Peer - mtx sync.Mutex - - filterConnByAddr func(net.Addr) error - filterConnByPubKey func(crypto.PubKeyEd25519) error + return newSwitch(config, discv, blacklistDB, l, privKey, listenAddr) } -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrConnectSelf = errors.New("Connect self") - ErrPeerConnected = errors.New("Peer is connected") -) - -func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch { +// newSwitch creates a new Switch with the given config. +func newSwitch(config *cfg.Config, discv discv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) { sw := &Switch{ - config: config, - peerConfig: DefaultPeerConfig(config), + Config: config, + peerConfig: DefaultPeerConfig(config.P2P), reactors: make(map[string]Reactor), - chDescs: make([]*ChannelDescriptor, 0), + chDescs: make([]*connection.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), - nodeInfo: nil, - db: trustHistoryDB, - ScamPeerCh: make(chan *Peer), - } - sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) - sw.TrustMetricStore = trust.NewTrustMetricStore(trustHistoryDB, trust.DefaultConfig()) - sw.TrustMetricStore.Start() - - sw.bannedPeer = make(map[string]time.Time) - if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil { - if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil { - return nil - } + nodePrivKey: priv, + discv: discv, + db: blacklistDB, + nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr), + bannedPeer: make(map[string]time.Time), } - go sw.scamPeerHandler() - return sw -} - -// Not goroutine safe. -func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { - // Validate the reactor. - // No two reactors can share the same channel. - reactorChannels := reactor.GetChannels() - for _, chDesc := range reactorChannels { - chID := chDesc.ID - if sw.reactorsByCh[chID] != nil { - cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) - } - sw.chDescs = append(sw.chDescs, chDesc) - sw.reactorsByCh[chID] = reactor + if err := sw.loadBannedPeers(); err != nil { + return nil, err } - sw.reactors[name] = reactor - reactor.SetSwitch(sw) - return reactor -} - -// Not goroutine safe. -func (sw *Switch) Reactors() map[string]Reactor { - return sw.reactors -} - -// Not goroutine safe. -func (sw *Switch) Reactor(name string) Reactor { - return sw.reactors[name] -} - -// Not goroutine safe. -func (sw *Switch) AddListener(l Listener) { - sw.listeners = append(sw.listeners, l) -} - -// Not goroutine safe. -func (sw *Switch) Listeners() []Listener { - return sw.listeners -} -// Not goroutine safe. -func (sw *Switch) IsListening() bool { - return len(sw.listeners) > 0 -} - -// Not goroutine safe. -func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) { - sw.nodeInfo = nodeInfo -} - -// Not goroutine safe. -func (sw *Switch) NodeInfo() *NodeInfo { - return sw.nodeInfo -} - -// Not goroutine safe. -// NOTE: Overwrites sw.nodeInfo.PubKey -func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) { - sw.nodePrivKey = nodePrivKey - if sw.nodeInfo != nil { - sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519) - } + sw.AddListener(l) + sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) + trust.Init() + return sw, nil } -// Switch.Start() starts all the reactors, peers, and listeners. +// OnStart implements BaseService. It starts all the reactors, peers, and listeners. func (sw *Switch) OnStart() error { - sw.BaseService.OnStart() - // Start reactors for _, reactor := range sw.reactors { - _, err := reactor.Start() - if err != nil { + if _, err := reactor.Start(); err != nil { return err } } - // Start peers - for _, peer := range sw.peers.List() { - sw.startInitPeer(peer) - } - // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } + go sw.ensureOutboundPeersRoutine() return nil } +// OnStop implements BaseService. It stops all listeners, peers, and reactors. func (sw *Switch) OnStop() { - sw.BaseService.OnStop() - // Stop listeners for _, listener := range sw.listeners { listener.Stop() } sw.listeners = nil - // Stop peers + for _, peer := range sw.peers.List() { peer.Stop() sw.peers.Remove(peer) } - // Stop reactors + for _, reactor := range sw.reactors { reactor.Stop() } } -// NOTE: This performs a blocking handshake before the peer is added. -// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. -func (sw *Switch) AddPeer(peer *Peer) error { - if err := sw.FilterConnByAddr(peer.Addr()); err != nil { - return err - } +//AddBannedPeer add peer to blacklist +func (sw *Switch) AddBannedPeer(ip string) error { + sw.mtx.Lock() + defer sw.mtx.Unlock() - if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil { + sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration) + dataJSON, err := json.Marshal(sw.bannedPeer) + if err != nil { return err } - if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil { + sw.db.Set([]byte(bannedPeerKey), dataJSON) + return nil +} + +// AddPeer performs the P2P handshake with a peer +// that already has a SecretConnection. If all goes well, +// it starts the peer and adds it to the switch. +// NOTE: This performs a blocking handshake before the peer is added. +// CONTRACT: If error is returned, peer is nil, and conn is immediately closed. +func (sw *Switch) AddPeer(pc *peerConn) error { + peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout) + if err != nil { return err } - if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil { + if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil { return err } - - // Avoid self - if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) { - return errors.New("Ignoring connection from self") + if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil { + return err } - // Check version, chain id - if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil { + peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError) + if err := sw.filterConnByPeer(peer); err != nil { return err } - // Check for duplicate peer - if sw.peers.Has(peer.Key) { - return ErrSwitchDuplicatePeer - + if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) { + return ErrConnectSpvPeer } // Start peer @@ -269,162 +202,86 @@ func (sw *Switch) AddPeer(peer *Peer) error { } } - // Add the peer to .peers. - // We start it first so that a peer in the list is safe to Stop. - // It should not err since we already checked peers.Has() - if err := sw.peers.Add(peer); err != nil { - return err - } - - tm := trust.NewMetric() - - tm.Start() - sw.TrustMetricStore.AddPeerTrustMetric(peer.mconn.RemoteAddress.IP.String(), tm) - - log.WithField("peer", peer).Info("Added peer") - return nil -} - -func (sw *Switch) FilterConnByAddr(addr net.Addr) error { - if sw.filterConnByAddr != nil { - return sw.filterConnByAddr(addr) - } - return nil + return sw.peers.Add(peer) } -func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error { - if sw.filterConnByPubKey != nil { - return sw.filterConnByPubKey(pubkey) +// AddReactor adds the given reactor to the switch. +// NOTE: Not goroutine safe. +func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor { + // Validate the reactor. + // No two reactors can share the same channel. + for _, chDesc := range reactor.GetChannels() { + chID := chDesc.ID + if sw.reactorsByCh[chID] != nil { + cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor)) + } + sw.chDescs = append(sw.chDescs, chDesc) + sw.reactorsByCh[chID] = reactor } - return nil - -} - -func (sw *Switch) SetAddrFilter(f func(net.Addr) error) { - sw.filterConnByAddr = f + sw.reactors[name] = reactor + reactor.SetSwitch(sw) + return reactor } -func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) { - sw.filterConnByPubKey = f +// AddListener adds the given listener to the switch for listening to incoming peer connections. +// NOTE: Not goroutine safe. +func (sw *Switch) AddListener(l Listener) { + sw.listeners = append(sw.listeners, l) } -func (sw *Switch) startInitPeer(peer *Peer) error { - peer.Start() // spawn send/recv routines - for _, reactor := range sw.reactors { - if err := reactor.AddPeer(peer); err != nil { - return err - } +//DialPeerWithAddress dial node from net address +func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error { + log.Debug("Dialing peer address:", addr) + sw.dialing.Set(addr.IP.String(), addr) + defer sw.dialing.Delete(addr.IP.String()) + if err := sw.filterConnByIP(addr.IP.String()); err != nil { + return err } - return nil -} - -// Dial a list of seeds asynchronously in random order -func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { - netAddrs, err := NewNetAddressStrings(seeds) + pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig) if err != nil { + log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn") return err } - if addrBook != nil { - // add seeds to `addrBook` - ourAddrS := sw.nodeInfo.ListenAddr - ourAddr, _ := NewNetAddressString(ourAddrS) - for _, netAddr := range netAddrs { - // do not add ourselves - if netAddr.Equals(ourAddr) { - continue - } - addrBook.AddAddress(netAddr, ourAddr) - } - addrBook.Save() - } - - // permute the list, dial them in random order. - perm := rand.Perm(len(netAddrs)) - for i := 0; i < len(perm); i++ { - j := perm[i] - sw.dialSeed(netAddrs[j]) + if err = sw.AddPeer(pc); err != nil { + log.WithFields(log.Fields{"address": addr, " err": err}).Error("DialPeer fail on switch AddPeer") + pc.CloseConn() + return err } + log.Debug("DialPeer added peer:", addr) return nil } -func (sw *Switch) dialSeed(addr *NetAddress) { - peer, err := sw.DialPeerWithAddress(addr, true) - if err != nil { - log.WithField("error", err).Error("Error dialing seed") - } else { - log.WithField("peer", peer).Info("Connected to seed") - } +//IsDialing prevent duplicate dialing +func (sw *Switch) IsDialing(addr *NetAddress) bool { + return sw.dialing.Has(addr.IP.String()) } -func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { - if err := sw.checkBannedPeer(addr.IP.String()); err != nil { - return nil, err - } - if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 { - return nil, ErrConnectSelf - } - for _, v := range sw.Peers().list { - if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 { - return nil, ErrPeerConnected +// IsListening returns true if the switch has at least one listener. +// NOTE: Not goroutine safe. +func (sw *Switch) IsListening() bool { + return len(sw.listeners) > 0 +} + +// loadBannedPeers load banned peers from db +func (sw *Switch) loadBannedPeers() error { + if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil { + if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil { + return err } } - sw.dialing.Set(addr.IP.String(), addr) - defer sw.dialing.Delete(addr.IP.String()) - log.WithField("address", addr).Info("Dialing peer") - peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) - if err != nil { - log.WithFields(log.Fields{ - "address": addr, - "error": err, - }).Info("Failed to dial peer") - return nil, err - } - peer.SetLogger(sw.Logger.With("peer", addr)) - if persistent { - peer.makePersistent() - } - err = sw.AddPeer(peer) - if err != nil { - log.WithFields(log.Fields{ - "address": addr, - "error": err, - }).Info("Failed to add peer") - peer.CloseConn() - return nil, err - } - log.WithFields(log.Fields{ - "address": addr, - }).Info("Dialed and added peer") - return peer, nil -} - -func (sw *Switch) IsDialing(addr *NetAddress) bool { - return sw.dialing.Has(addr.IP.String()) + return nil } -// Broadcast runs a go routine for each attempted send, which will block -// trying to send for defaultSendTimeoutSeconds. Returns a channel -// which receives success values for each attempted send (false if times out) -// NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved. -func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool { - successChan := make(chan bool, len(sw.peers.List())) - log.WithFields(log.Fields{ - "chID": chID, - "msg": msg, - }).Debug("Broadcast") - for _, peer := range sw.peers.List() { - go func(peer *Peer) { - success := peer.Send(chID, msg) - successChan <- success - }(peer) - } - return successChan +// Listeners returns the list of listeners the switch listens on. +// NOTE: Not goroutine safe. +func (sw *Switch) Listeners() []Listener { + return sw.listeners } -// Returns the count of outbound/inbound and outbound-dialing peers. +// NumPeers Returns the count of outbound/inbound and outbound-dialing peers. func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { peers := sw.peers.List() for _, peer := range peers { @@ -438,300 +295,211 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) { return } +// NodeInfo returns the switch's NodeInfo. +// NOTE: Not goroutine safe. +func (sw *Switch) NodeInfo() *NodeInfo { + return sw.nodeInfo +} + +//Peers return switch peerset func (sw *Switch) Peers() *PeerSet { return sw.peers } -// Disconnect from a peer due to external error, retry if it is a persistent peer. -// TODO: make record depending on reason. +// StopPeerForError disconnects from a peer due to external error. func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) { - addr := NewNetAddress(peer.Addr()) - log.WithFields(log.Fields{ - "peer": peer, - "error": reason, - }).Info("Stopping peer due to error") + log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error") sw.stopAndRemovePeer(peer, reason) - - if peer.IsPersistent() { - log.WithField("peer", peer).Info("Reconnecting to peer") - for i := 1; i < reconnectAttempts; i++ { - if !sw.IsRunning() { - return - } - - peer, err := sw.DialPeerWithAddress(addr, true) - if err != nil { - if i == reconnectAttempts { - log.WithFields(log.Fields{ - "retries": i, - "error": err, - }).Info("Error reconnecting to peer. Giving up") - return - } - - if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf { - log.WithField("error", err).Info("Error reconnecting to peer. ") - return - } - - log.WithFields(log.Fields{ - "retries": i, - "error": err, - }).Info("Error reconnecting to peer. Trying again") - time.Sleep(reconnectInterval) - continue - } - - log.WithField("peer", peer).Info("Reconnected to peer") - return - } - } } -// Disconnect from a peer gracefully. -// TODO: handle graceful disconnects. -func (sw *Switch) StopPeerGracefully(peer *Peer) { - log.Info("Stopping peer gracefully") - sw.stopAndRemovePeer(peer, nil) -} - -func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { - sw.peers.Remove(peer) - peer.Stop() - for _, reactor := range sw.reactors { - reactor.RemovePeer(peer, reason) +// StopPeerGracefully disconnect from a peer gracefully. +func (sw *Switch) StopPeerGracefully(peerID string) { + if peer := sw.peers.Get(peerID); peer != nil { + sw.stopAndRemovePeer(peer, nil) } } -func (sw *Switch) listenerRoutine(l Listener) { - for { - inConn, ok := <-l.Connections() - if !ok { - break - } - - // ignore connection if we already have enough - maxPeers := sw.config.MaxNumPeers - if maxPeers <= sw.peers.Size() { - // close inConn - inConn.Close() - log.WithFields(log.Fields{ - "address": inConn.RemoteAddr().String(), - "numPeers": sw.peers.Size(), - "max": maxPeers, - }).Info("Ignoring inbound connection: already have enough peers") - continue - } - - // New inbound connection! - err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) - if err != nil { - // conn close for returing err - inConn.Close() - log.WithFields(log.Fields{ - "address": inConn.RemoteAddr().String(), - "error": err, - }).Info("Ignoring inbound connection: error while adding peer") - continue +func (sw *Switch) addPeerWithConnection(conn net.Conn) error { + peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P) + if err != nil { + if err := conn.Close(); err != nil { + log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err") } - - // NOTE: We don't yet have the listening port of the - // remote (if they have a listener at all). - // The peerHandshake will handle that - } - - // cleanup -} - -//----------------------------------------------------------------------------- - -type SwitchEventNewPeer struct { - Peer *Peer -} - -type SwitchEventDonePeer struct { - Peer *Peer - Error interface{} -} - -//------------------------------------------------------------------ -// Switches connected via arbitrary net.Conn; useful for testing - -// Returns n switches, connected according to the connect func. -// If connect==Connect2Switches, the switches will be fully connected. -// initSwitch defines how the ith switch should be initialized (ie. with what reactors). -// NOTE: panics if any switch fails to start. -func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { - switches := make([]*Switch, n) - for i := 0; i < n; i++ { - switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) - } - - if err := StartSwitches(switches); err != nil { - panic(err) + return err } - for i := 0; i < n; i++ { - for j := i; j < n; j++ { - connect(switches, i, j) + if err = sw.AddPeer(peerConn); err != nil { + if err := conn.Close(); err != nil { + log.WithFields(log.Fields{"remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err") } + return err } - return switches + return nil } -var PanicOnAddPeerErr = false - -// Will connect switches i and j via net.Pipe() -// Blocks until a conection is established. -// NOTE: caller ensures i and j are within bounds -func Connect2Switches(switches []*Switch, i, j int) { - switchI := switches[i] - switchJ := switches[j] - c1, c2 := net.Pipe() - doneCh := make(chan struct{}) - go func() { - err := switchI.addPeerWithConnection(c1) - if PanicOnAddPeerErr && err != nil { - panic(err) - } - doneCh <- struct{}{} - }() - go func() { - err := switchJ.addPeerWithConnection(c2) - if PanicOnAddPeerErr && err != nil { - panic(err) +func (sw *Switch) checkBannedPeer(peer string) error { + sw.mtx.Lock() + defer sw.mtx.Unlock() + + if banEnd, ok := sw.bannedPeer[peer]; ok { + if time.Now().Before(banEnd) { + return ErrConnectBannedPeer } - doneCh <- struct{}{} - }() - <-doneCh - <-doneCh -} -func StartSwitches(switches []*Switch) error { - for _, s := range switches { - _, err := s.Start() // start switch and reactors - if err != nil { + if err := sw.delBannedPeer(peer); err != nil { return err } } return nil } -func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { - privKey := crypto.GenPrivKeyEd25519() - // new switch, add reactors - // TODO: let the config be passed in? - s := initSwitch(i, NewSwitch(cfg, nil)) - s.SetNodeInfo(&NodeInfo{ - PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519), - Moniker: cmn.Fmt("switch%d", i), - Network: network, - Version: version, - RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), - ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023), - }) - s.SetNodePrivKey(privKey) - return s -} +func (sw *Switch) delBannedPeer(addr string) error { + sw.mtx.Lock() + defer sw.mtx.Unlock() -func (sw *Switch) addPeerWithConnection(conn net.Conn) error { - peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config) + delete(sw.bannedPeer, addr) + datajson, err := json.Marshal(sw.bannedPeer) if err != nil { - conn.Close() - return err - } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.AddPeer(peer); err != nil { - conn.Close() return err } + sw.db.Set([]byte(bannedPeerKey), datajson) return nil } -func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { - fullAddr := conn.RemoteAddr().String() - host, _, err := net.SplitHostPort(fullAddr) - if err != nil { - return err +func (sw *Switch) filterConnByIP(ip string) error { + if ip == sw.nodeInfo.listenHost() { + return ErrConnectSelf } + return sw.checkBannedPeer(ip) +} - if err = sw.checkBannedPeer(host); err != nil { +func (sw *Switch) filterConnByPeer(peer *Peer) error { + if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil { return err } - peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) - if err != nil { - return err - } - peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr())) - if err = sw.AddPeer(peer); err != nil { - return err + if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) { + return ErrConnectSelf } + if sw.peers.Has(peer.Key) { + return ErrDuplicatePeer + } return nil } -func (sw *Switch) AddBannedPeer(peer *Peer) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() +func (sw *Switch) listenerRoutine(l Listener) { + for { + inConn, ok := <-l.Connections() + if !ok { + break + } - key := peer.mconn.RemoteAddress.IP.String() - sw.bannedPeer[key] = time.Now().Add(defaultBanDuration) - datajson, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err + // disconnect if we alrady have MaxNumPeers + if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers { + if err := inConn.Close(); err != nil { + log.WithFields(log.Fields{"remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err") + } + log.Info("Ignoring inbound connection: already have enough peers.") + continue + } + + // New inbound connection! + if err := sw.addPeerWithConnection(inConn); err != nil { + log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err) + continue + } } - sw.db.Set([]byte(bannedPeerKey), datajson) - return nil } -func (sw *Switch) DelBannedPeer(addr string) error { - sw.mtx.Lock() - defer sw.mtx.Unlock() - - delete(sw.bannedPeer, addr) - datajson, err := json.Marshal(sw.bannedPeer) - if err != nil { - return err +func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) { + if err := sw.DialPeerWithAddress(a); err != nil { + log.WithFields(log.Fields{"addr": a, "err": err}).Error("dialPeerWorker fail on dial peer") } - sw.db.Set([]byte(bannedPeerKey), datajson) - return nil + wg.Done() } -func (sw *Switch) scamPeerHandler() { - for src := range sw.ScamPeerCh { - var tm *trust.TrustMetric - key := src.Connection().RemoteAddress.IP.String() - if tm = sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil { - log.Errorf("Can't get peer trust metric") +func (sw *Switch) ensureOutboundPeers() { + numOutPeers, _, numDialing := sw.NumPeers() + numToDial := (minNumOutboundPeers - (numOutPeers + numDialing)) + log.WithFields(log.Fields{"numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers") + if numToDial <= 0 { + return + } + + connectedPeers := make(map[string]struct{}) + for _, peer := range sw.Peers().List() { + connectedPeers[peer.remoteAddrHost()] = struct{}{} + } + + var wg sync.WaitGroup + nodes := make([]*discover.Node, numToDial) + n := sw.discv.ReadRandomNodes(nodes) + for i := 0; i < n; i++ { + try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP) + if sw.NodeInfo().ListenAddr == try.String() { + continue + } + if dialling := sw.IsDialing(try); dialling { continue } - sw.delTrustMetric(tm, src) + if _, ok := connectedPeers[try.IP.String()]; ok { + continue + } + + wg.Add(1) + go sw.dialPeerWorker(try, &wg) } + wg.Wait() } -func (sw *Switch) AddScamPeer(src *Peer) { - sw.ScamPeerCh <- src -} +func (sw *Switch) ensureOutboundPeersRoutine() { + sw.ensureOutboundPeers() + + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() -func (sw *Switch) delTrustMetric(tm *trust.TrustMetric, src *Peer) { - key := src.Connection().RemoteAddress.IP.String() - tm.BadEvents(1) - if tm.TrustScore() < peerBannedTM { - sw.AddBannedPeer(src) - sw.TrustMetricStore.PeerDisconnected(key) - sw.StopPeerGracefully(src) + for { + select { + case <-ticker.C: + sw.ensureOutboundPeers() + case <-sw.Quit: + return + } } } -func (sw *Switch) checkBannedPeer(peer string) error { - if banEnd, ok := sw.bannedPeer[peer]; ok { - if time.Now().Before(banEnd) { - return ErrConnectBannedPeer +func (sw *Switch) startInitPeer(peer *Peer) error { + // spawn send/recv routines + if _, err := peer.Start(); err != nil { + log.WithFields(log.Fields{"remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err") + } + + for _, reactor := range sw.reactors { + if err := reactor.AddPeer(peer); err != nil { + return err } - sw.DelBannedPeer(peer) } return nil } + +func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) { + sw.peers.Remove(peer) + for _, reactor := range sw.reactors { + reactor.RemovePeer(peer, reason) + } + peer.Stop() + + sentStatus, receivedStatus := peer.TrafficStatus() + log.WithFields(log.Fields{ + "address": peer.Addr().String(), + "reason": reason, + "duration": sentStatus.Duration.String(), + "total_sent": sentStatus.Bytes, + "total_received": receivedStatus.Bytes, + "average_sent_rate": sentStatus.AvgRate, + "average_received_rate": receivedStatus.AvgRate, + }).Info("disconnect with peer") +}