if _, err := n.Start(); err != nil {
return fmt.Errorf("Failed to start node: %v", err)
} else {
- log.Info("Started node")
+ log.Info("Start node ", n.SyncManager().NodeInfo())
}
// Trap signal, run forever.
cmn "github.com/tendermint/tmlibs/common"
dbm "github.com/tendermint/tmlibs/db"
- "net"
- "time"
-
cfg "github.com/bytom/config"
"github.com/bytom/p2p"
core "github.com/bytom/protocol"
fetcher *Fetcher
blockKeeper *blockKeeper
peers *peerSet
- mapResult bool
- extIP bool
newBlockCh chan *bc.Hash
newPeerCh chan struct{}
manager.sw.AddReactor("PROTOCOL", protocolReactor)
// Create & add listener
- var mapResult, extIP bool
+ var listenerStatus bool
var l p2p.Listener
if !config.VaultMode {
p, address := protocolAndAddress(manager.config.P2P.ListenAddress)
- l, mapResult, extIP = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
+ l, listenerStatus = p2p.NewDefaultListener(p, address, manager.config.P2P.SkipUPNP, nil)
manager.sw.AddListener(l)
}
- manager.sw.SetNodeInfo(manager.makeNodeInfo(mapResult))
+ manager.sw.SetNodeInfo(manager.makeNodeInfo(listenerStatus))
manager.sw.SetNodePrivKey(manager.privKey)
- manager.mapResult = mapResult
- manager.extIP = extIP
// Optionally, start the pex reactor
//var addrBook *p2p.AddrBook
if config.P2P.PexReactor {
return p, address
}
-func (sm *SyncManager) makeNodeInfo(listenOpen bool) *p2p.NodeInfo {
+func (sm *SyncManager) makeNodeInfo(listenerStatus bool) *p2p.NodeInfo {
nodeInfo := &p2p.NodeInfo{
PubKey: sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
Moniker: sm.config.Moniker,
// We assume that the rpcListener has the same ExternalAddress.
// This is probably true because both P2P and RPC listeners use UPnP,
// except of course if the rpc is only bound to localhost
- if listenOpen {
+ if listenerStatus {
nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
} else {
nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pListener.InternalAddress().IP.String(), p2pListener.InternalAddress().Port)
}
func (sm *SyncManager) netStart() error {
- if !sm.mapResult && sm.extIP {
- p2pListener := sm.sw.Listeners()[0]
- ListenAddr := cmn.Fmt("%v:%v", p2pListener.ExternalAddress().IP.String(), p2pListener.ExternalAddress().Port)
- conn, err := net.DialTimeout("tcp", ListenAddr, 3*time.Second)
-
- if err != nil && conn == nil {
- log.Error("Could not open listen port")
- }
-
- if err == nil && conn != nil {
- log.Info("Success open listen port")
- conn.Close()
- sm.sw.SetNodeInfo(sm.makeNodeInfo(true))
- }
- }
- log.WithField("nodeInfo", sm.sw.NodeInfo()).Info("net start")
-
// Start the switch
_, err := sm.sw.Start()
if err != nil {
}
// skipUPNP: If true, does not try getUPNPExternalAddress()
-func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool, bool) {
+func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger tlog.Logger) (Listener, bool) {
// Local listen IP & port
lAddrIP, lAddrPort := splitHostPort(lAddr)
// Create listener
var listener net.Listener
var err error
- var mapResult = false
- var extIP = false
+ var getExtIP = false
+ var listenerStatus = false
for i := 0; i < tryListenSeconds; i++ {
listener, err = net.Listen(protocol, lAddr)
if lAddrIP == "" || lAddrIP == "0.0.0.0" {
extAddr = getUPNPExternalAddress(lAddrPort, listenerPort)
if extAddr != nil {
- mapResult = true
- } else {
- extIP = true
+ getExtIP = true
+ listenerStatus = true
}
}
}
if extAddr == nil {
if address := GetIP([]string{}, time.Duration(0)); address.Success == true {
extAddr = NewNetAddressIPPort(net.ParseIP(address.Ip), uint16(lAddrPort))
- extIP = true
- } else {
- log.Info("Error get external ip:", address.Error)
+ getExtIP = true
}
}
// Otherwise just use the local address...
}
dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
dl.Start() // Started upon construction
- return dl, mapResult, extIP
+
+ if !listenerStatus && getExtIP {
+ conn, err := net.DialTimeout("tcp", extAddr.String(), 3*time.Second)
+
+ if err != nil && conn == nil {
+ log.Error("Could not open listen port")
+ }
+
+ if err == nil && conn != nil {
+ log.Info("Success open listen port")
+ conn.Close()
+ }
+ }
+
+ return dl, listenerStatus
}
func (l *DefaultListener) OnStart() error {
"math/rand"
"reflect"
"strings"
+ "sync/atomic"
"time"
+ "sync"
log "github.com/sirupsen/logrus"
wire "github.com/tendermint/go-wire"
// tracks message count by peer, so we can prevent abuse
msgCountByPeer *cmn.CMap
maxMsgCountByPeer uint16
+ dialing int32
+ wg sync.WaitGroup
}
// NewPEXReactor creates new PEX reactor.
// close the connect if connect is big than max limit
if r.sw.peers.Size() >= r.sw.config.MaxNumPeers {
if ok := r.SendAddrs(p, r.book.GetSelection()); ok {
- select {
- case <-time.After(1 * time.Second):
- r.sw.StopPeerGracefully(p)
- }
+ <-time.After(1 * time.Second)
+ r.sw.StopPeerGracefully(p)
}
return errors.New("Error in AddPeer: reach the max peer, exchange then close")
}
time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
// fire once immediately.
- r.ensurePeers(r.sw.peers.Size())
+ r.ensurePeers()
// fire periodically
ticker := time.NewTicker(r.ensurePeersPeriod)
- quickTicker := time.NewTicker(time.Second * 5)
+ quickTicker := time.NewTicker(time.Second * 1)
+
for {
select {
case <-ticker.C:
- if r.sw.peers.Size() >= 3 {
- r.ensurePeers(r.sw.peers.Size())
- }
+ r.ensurePeers()
case <-quickTicker.C:
if r.sw.peers.Size() < 3 {
- r.ensurePeers(r.sw.peers.Size())
+ r.ensurePeers()
}
case <-r.Quit:
ticker.Stop()
+ quickTicker.Stop()
return
}
}
// What we're currently doing in terms of marking good/bad peers is just a
// placeholder. It should not be the case that an address becomes old/vetted
// upon a single successful connection.
-func (r *PEXReactor) ensurePeers(num int) {
+func (r *PEXReactor) ensurePeers() {
+ if !atomic.CompareAndSwapInt32(&r.dialing, 0, 1) {
+ log.Info("Ensure peers ...")
+ return
+ }
+ defer atomic.StoreInt32(&r.dialing, 0)
+
numOutPeers, _, numDialing := r.Switch.NumPeers()
- numToDial := minNumOutboundPeers*(minNumOutboundPeers-num) - (numOutPeers + numDialing)
+ numToDial := minNumOutboundPeers*(minNumOutboundPeers-numOutPeers) - (numOutPeers + numDialing)
log.WithFields(log.Fields{
"numOutPeers": numOutPeers,
"numDialing": numDialing,
}
// Dial picked addresses
for _, item := range toDial {
+ r.wg.Add(1)
go func(picked *NetAddress) {
if _, err := r.Switch.DialPeerWithAddress(picked, false); err != nil {
r.book.MarkAttempt(picked)
} else {
r.book.MarkGood(picked)
}
+ r.wg.Done()
}(item)
}
+ r.wg.Wait()
// If we need more addresses, pick a random peer and ask for more.
if r.book.NeedMoreAddrs() {