OSDN Git Service

Optimize code logic
authorYahtoo Ma <yahtoo.ma@gmail.com>
Wed, 9 May 2018 08:06:31 +0000 (16:06 +0800)
committerYahtoo Ma <yahtoo.ma@gmail.com>
Wed, 9 May 2018 08:06:31 +0000 (16:06 +0800)
cmd/bytomd/commands/run_node.go
netsync/handle.go
p2p/listener.go
p2p/pex_reactor.go

index f57ef97..41f6cd3 100644 (file)
@@ -45,7 +45,7 @@ func runNode(cmd *cobra.Command, args []string) error {
        if _, err := n.Start(); err != nil {
                return fmt.Errorf("Failed to start node: %v", err)
        } else {
-               log.Info("Started node")
+               log.Info("Start node ", n.SyncManager().NodeInfo())
        }
 
        // Trap signal, run forever.
index d551f57..d7d6e98 100644 (file)
@@ -9,9 +9,6 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
 
-       "net"
-       "time"
-
        cfg "github.com/bytom/config"
        "github.com/bytom/p2p"
        core "github.com/bytom/protocol"
@@ -31,8 +28,6 @@ type SyncManager struct {
        fetcher     *Fetcher
        blockKeeper *blockKeeper
        peers       *peerSet
-       mapResult   bool
-       extIP       bool
 
        newBlockCh    chan *bc.Hash
        newPeerCh     chan struct{}
@@ -69,17 +64,15 @@ func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool,
        manager.sw.AddReactor("PROTOCOL", protocolReactor)
 
        // Create & add listener
-       var mapResult, extIP bool
+       var listenerStatus bool
        var l p2p.Listener
        if !config.VaultMode {
                p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
-               l, mapResult, extIP = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
+               l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
                manager.sw.AddListener(l)
        }
-       manager.sw.SetNodeInfo(manager.makeNodeInfo(mapResult))
+       manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
        manager.sw.SetNodePrivKey(manager.privKey)
-       manager.mapResult = mapResult
-       manager.extIP = extIP
        // Optionally, start the pex reactor
        //var addrBook *p2p.AddrBook
        if config.P2P.PexReactor {
@@ -101,7 +94,7 @@ func protocolAndAddress(listenAddr string) (string, string) {
        return p, address
 }
 
-func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
+func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
        nodeInfo := &p2p.NodeInfo{
                PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
                Moniker: sm.config.Moniker,
@@ -122,7 +115,7 @@ func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
        // We assume that the rpcListener has the same ExternalAddress.
        // This is probably true because both P2P and RPC listeners use UPnP,
        // except of course if the rpc is only bound to localhost
-       if listenOpen {
+       if listenerStatus {
                nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
        } else {
                nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
@@ -131,23 +124,6 @@ func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
 }
 
 func (sm *SyncManager) netStart() error {
-       if !sm.mapResult && sm.extIP {
-               p2pListener := sm.sw.Listeners()[0]
-               ListenAddr := cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
-               conn, err := net.DialTimeout("tcp", ListenAddr, 3*time.Second)
-
-               if err != nil && conn == nil {
-                       log.Error("Could not open listen port")
-               }
-
-               if err == nil && conn != nil {
-                       log.Info("Success open listen port")
-                       conn.Close()
-                       sm.sw.SetNodeInfo(sm.makeNodeInfo(true))
-               }
-       }
-       log.WithField("nodeInfo", sm.sw.NodeInfo()).Info("net start")
-
        // Start the switch
        _, err := sm.sw.Start()
        if err != nil {
index 7a49d7e..5a99e86 100644 (file)
@@ -49,15 +49,15 @@ func splitHostPort(addr string) (host string, port int) {
 }
 
 // skipUPNP: If true, does not try getUPNPExternalAddress()
-func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool, bool) {
+func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool) {
        // Local listen IP & port
        lAddrIP, lAddrPort := splitHostPort(lAddr)
 
        // Create listener
        var listener net.Listener
        var err error
-       var mapResult = false
-       var extIP = false
+       var getExtIP = false
+       var listenerStatus = false
 
        for i := 0; i < tryListenSeconds; i++ {
                listener, err = net.Listen(protocol, lAddr)
@@ -91,18 +91,15 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlo
                if lAddrIP == "" || lAddrIP == "0.0.0.0" {
                        extAddr = getUPNPExternalAddress(lAddrPort, listenerPort)
                        if extAddr != nil {
-                               mapResult = true
-                       } else {
-                               extIP = true
+                               getExtIP = true
+                               listenerStatus = true
                        }
                }
        }
        if extAddr == nil {
                if address := GetIP([]string{}, time.Duration(0)); address.Success == true {
                        extAddr = NewNetAddressIPPort(net.ParseIP(address.Ip), uint16(lAddrPort))
-                       extIP = true
-               } else {
-                       log.Info("Error get external ip:", address.Error)
+                       getExtIP = true
                }
        }
        // Otherwise just use the local address...
@@ -121,7 +118,21 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlo
        }
        dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
        dl.Start() // Started upon construction
-       return dl, mapResult, extIP
+
+       if !listenerStatus && getExtIP {
+               conn, err := net.DialTimeout("tcp", extAddr.String(), 3*time.Second)
+
+               if err != nil && conn == nil {
+                       log.Error("Could not open listen port")
+               }
+
+               if err == nil && conn != nil {
+                       log.Info("Success open listen port")
+                       conn.Close()
+               }
+       }
+
+       return dl, listenerStatus
 }
 
 func (l *DefaultListener) OnStart() error {
index be6f75d..0fdd20d 100644 (file)
@@ -6,7 +6,9 @@ import (
        "math/rand"
        "reflect"
        "strings"
+       "sync/atomic"
        "time"
+       "sync"
 
        log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
@@ -57,6 +59,8 @@ type PEXReactor struct {
        // tracks message count by peer, so we can prevent abuse
        msgCountByPeer    *cmn.CMap
        maxMsgCountByPeer uint16
+       dialing           int32
+       wg                sync.WaitGroup
 }
 
 // NewPEXReactor creates new PEX reactor.
@@ -128,10 +132,8 @@ func (r *PEXReactor) AddPeer(p *Peer) error {
        // close the connect if connect is big than max limit
        if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
                if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
-                       select {
-                       case <-time.After(1 * time.Second):
-                               r.sw.StopPeerGracefully(p)
-                       }
+                       <-time.After(1 * time.Second)
+                       r.sw.StopPeerGracefully(p)
                }
                return errors.New("Error in AddPeer: reach the max peer, exchange then close")
        }
@@ -236,23 +238,23 @@ func (r *PEXReactor) ensurePeersRoutine() {
        time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
 
        // fire once immediately.
-       r.ensurePeers(r.sw.peers.Size())
+       r.ensurePeers()
 
        // fire periodically
        ticker := time.NewTicker(r.ensurePeersPeriod)
-       quickTicker := time.NewTicker(time.Second * 5)
+       quickTicker := time.NewTicker(time.Second * 1)
+
        for {
                select {
                case <-ticker.C:
-                       if r.sw.peers.Size() >= 3 {
-                               r.ensurePeers(r.sw.peers.Size())
-                       }
+                       r.ensurePeers()
                case <-quickTicker.C:
                        if r.sw.peers.Size() < 3 {
-                               r.ensurePeers(r.sw.peers.Size())
+                               r.ensurePeers()
                        }
                case <-r.Quit:
                        ticker.Stop()
+                       quickTicker.Stop()
                        return
                }
        }
@@ -270,9 +272,15 @@ func (r *PEXReactor) ensurePeersRoutine() {
 // What we're currently doing in terms of marking good/bad peers is just a
 // placeholder. It should not be the case that an address becomes old/vetted
 // upon a single successful connection.
-func (r *PEXReactor) ensurePeers(num int) {
+func (r *PEXReactor) ensurePeers() {
+       if !atomic.CompareAndSwapInt32(&r.dialing, 0, 1) {
+               log.Info("Ensure peers ...")
+               return
+       }
+       defer atomic.StoreInt32(&r.dialing, 0)
+
        numOutPeers, _, numDialing := r.Switch.NumPeers()
-       numToDial := minNumOutboundPeers*(minNumOutboundPeers-num) - (numOutPeers + numDialing)
+       numToDial := minNumOutboundPeers*(minNumOutboundPeers-numOutPeers) - (numOutPeers + numDialing)
        log.WithFields(log.Fields{
                "numOutPeers": numOutPeers,
                "numDialing":  numDialing,
@@ -332,14 +340,17 @@ func (r *PEXReactor) ensurePeers(num int) {
        }
        // Dial picked addresses
        for _, item := range toDial {
+               r.wg.Add(1)
                go func(picked *NetAddress) {
                        if _, err := r.Switch.DialPeerWithAddress(picked, false); err != nil {
                                r.book.MarkAttempt(picked)
                        } else {
                                r.book.MarkGood(picked)
                        }
+                       r.wg.Done()
                }(item)
        }
+       r.wg.Wait()
 
        // If we need more addresses, pick a random peer and ask for more.
        if r.book.NeedMoreAddrs() {