OSDN Git Service

refactor: split ensureOutboundPeers (#1662)
authorHAOYUatHZ <37070449+HAOYUatHZ@users.noreply.github.com>
Fri, 29 Mar 2019 08:40:17 +0000 (16:40 +0800)
committerPaladz <yzhu101@uottawa.ca>
Fri, 29 Mar 2019 08:40:17 +0000 (16:40 +0800)
* feat(model): add cmd option for p2p keep_connect & white_list

* feat: init keep_connect & inbound_white_list params for p2p config

* doc: improve cmdline help message

* fix: remove port in whitelist

* feat: allow whitelist inbound connection

* feat: impl try_connect

* fix: fix numToDial check

* refactor: remove white_list

* refactor: rename keep_connect

* refactor: decomposire

* fix: fix potential nil pointer in ensureOutboundPeers()

* doc: change func name to StrsToNodes

* refactor: split ensureOutboundPeers

* refactor: use NewNetAddressStrings()

* refactor: make logic cleaner

* refactor: remove peers stats in ensureKeepConnectPeers()

* fix: fix read cmd args err

* doc: add log for ensureKeepConnectPeers()

* doc: remove hardcoded logmodule

* doc: add logmodule in netutil package

cmd/bytomd/commands/run_node.go
config/config.go
p2p/discover/udp.go
p2p/netaddress.go
p2p/netutil/net.go
p2p/switch.go

index 54fc02f..da24e8b 100644 (file)
@@ -46,6 +46,7 @@ func init() {
        runNodeCmd.Flags().String("p2p.proxy_address", config.P2P.ProxyAddress, "Connect via SOCKS5 proxy (eg. 127.0.0.1:1086)")
        runNodeCmd.Flags().String("p2p.proxy_username", config.P2P.ProxyUsername, "Username for proxy server")
        runNodeCmd.Flags().String("p2p.proxy_password", config.P2P.ProxyPassword, "Password for proxy server")
+       runNodeCmd.Flags().String("p2p.keep_dial", config.P2P.KeepDial, "Peers addresses try keeping connecting to, separated by ',' (for example \"1.1.1.1:46657;2.2.2.2:46658\")")
 
        // log flags
        runNodeCmd.Flags().String("log_file", config.LogFile, "Log output file")
index 8308025..4e66838 100644 (file)
@@ -151,6 +151,7 @@ type P2PConfig struct {
        ProxyAddress     string `mapstructure:"proxy_address"`
        ProxyUsername    string `mapstructure:"proxy_username"`
        ProxyPassword    string `mapstructure:"proxy_password"`
+       KeepDial         string `mapstructure:"keep_dial"`
 }
 
 // Default configurable p2p parameters.
index 5caf6a9..000e66f 100644 (file)
@@ -9,7 +9,6 @@ import (
        "net"
        "path"
        "strconv"
-       "strings"
        "time"
 
        log "github.com/sirupsen/logrus"
@@ -38,8 +37,6 @@ var (
        errTimeout          = errors.New("RPC timeout")
        errClockWarp        = errors.New("reply deadline too far in the future")
        errClosed           = errors.New("socket closed")
-       errInvalidSeedIP    = errors.New("seed ip is invalid")
-       errInvalidSeedPort  = errors.New("seed port is invalid")
 )
 
 // Timeouts
@@ -285,26 +282,8 @@ func NewDiscover(config *cfg.Config, priv ed25519.PrivateKey, port uint16) (*Net
                log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on query dns seeds")
        }
 
-       if config.P2P.Seeds != "" {
-               codedSeeds := strings.Split(config.P2P.Seeds, ",")
-               for _, codedSeed := range codedSeeds {
-                       ip, port, err := net.SplitHostPort(codedSeed)
-                       if err != nil {
-                               return nil, err
-                       }
-
-                       if validIP := net.ParseIP(ip); validIP == nil {
-                               return nil, errInvalidSeedIP
-                       }
-
-                       if _, err := strconv.ParseUint(port, 10, 16); err != nil {
-                               return nil, errInvalidSeedPort
-                       }
-
-                       seeds = append(seeds, codedSeed)
-               }
-       }
-
+       codedSeeds := netutil.CheckAndSplitAddresses(config.P2P.Seeds)
+       seeds = append(seeds, codedSeeds...)
        if len(seeds) == 0 {
                return ntab, nil
        }
index 87a9535..97a5d0e 100644 (file)
@@ -11,8 +11,8 @@ import (
        "strconv"
        "time"
 
-       cmn "github.com/tendermint/tmlibs/common"
        "github.com/btcsuite/go-socks/socks"
+       cmn "github.com/tendermint/tmlibs/common"
 )
 
 // NetAddress defines information about a peer on the network
index 6d1ced7..17b18cd 100644 (file)
@@ -7,11 +7,21 @@ import (
        "fmt"
        "net"
        "sort"
+       "strconv"
        "strings"
+
+       log "github.com/sirupsen/logrus"
 )
 
 var lan4, lan6, special4, special6 Netlist
 
+var (
+       logModule = "netutil"
+
+       errInvalidIP   = errors.New("ip is invalid")
+       errInvalidPort = errors.New("port is invalid")
+)
+
 func init() {
        // Lists from RFC 5735, RFC 5156,
        // https://www.iana.org/assignments/iana-ipv4-special-registry/
@@ -304,3 +314,32 @@ func (s DistinctNetSet) String() string {
        buf.WriteString("}")
        return buf.String()
 }
+
+func CheckAndSplitAddresses(addressesStr string) []string {
+       if addressesStr == "" {
+               return nil
+       }
+
+       var addresses []string
+       splits := strings.Split(addressesStr, ",")
+       for _, address := range splits {
+               ip, port, err := net.SplitHostPort(address)
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err, "address": address}).Warn("net.SplitHostPort")
+                       continue
+               }
+
+               if validIP := net.ParseIP(ip); validIP == nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": errInvalidIP, "ip": ip}).Warn("net.ParseIP")
+                       continue
+               }
+
+               if _, err := strconv.ParseUint(port, 10, 16); err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": errInvalidPort, "port": port}).Warn("strconv parse port")
+                       continue
+               }
+
+               addresses = append(addresses, address)
+       }
+       return addresses
+}
index d85391b..4cc1d3f 100644 (file)
@@ -19,6 +19,7 @@ import (
        "github.com/bytom/errors"
        "github.com/bytom/p2p/connection"
        "github.com/bytom/p2p/discover"
+       "github.com/bytom/p2p/netutil"
        "github.com/bytom/p2p/trust"
        "github.com/bytom/version"
 )
@@ -425,6 +426,37 @@ func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
        wg.Done()
 }
 
+func (sw *Switch) ensureKeepConnectPeers() {
+       keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
+       connectedPeers := make(map[string]struct{})
+       for _, peer := range sw.Peers().List() {
+               connectedPeers[peer.remoteAddrHost()] = struct{}{}
+       }
+
+       var wg sync.WaitGroup
+       for _, keepDial := range keepDials {
+               try, err := NewNetAddressString(keepDial)
+               if err != nil {
+                       log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
+                       continue
+               }
+
+               if sw.NodeInfo().ListenAddr == try.String() {
+                       continue
+               }
+               if dialling := sw.IsDialing(try); dialling {
+                       continue
+               }
+               if _, ok := connectedPeers[try.IP.String()]; ok {
+                       continue
+               }
+
+               wg.Add(1)
+               go sw.dialPeerWorker(try, &wg)
+       }
+       wg.Wait()
+}
+
 func (sw *Switch) ensureOutboundPeers() {
        numOutPeers, _, numDialing := sw.NumPeers()
        numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
@@ -460,6 +492,7 @@ func (sw *Switch) ensureOutboundPeers() {
 }
 
 func (sw *Switch) ensureOutboundPeersRoutine() {
+       sw.ensureKeepConnectPeers()
        sw.ensureOutboundPeers()
 
        ticker := time.NewTicker(10 * time.Second)
@@ -468,6 +501,7 @@ func (sw *Switch) ensureOutboundPeersRoutine() {
        for {
                select {
                case <-ticker.C:
+                       sw.ensureKeepConnectPeers()
                        sw.ensureOutboundPeers()
                case <-sw.Quit:
                        return