msgCountByPeerFlushInterval = 1 * time.Hour
)
+var ErrSendPexFail = errors.New("Send pex message fail")
+
// PEXReactor handles PEX (peer exchange) and ensures that an
// adequate number of peers are connected to the switch.
//
}
// NewPEXReactor creates new PEX reactor.
-func NewPEXReactor(b *AddrBook) *PEXReactor {
+func NewPEXReactor(b *AddrBook, sw *Switch) *PEXReactor {
r := &PEXReactor{
+ sw: sw,
book: b,
ensurePeersPeriod: defaultEnsurePeersPeriod,
msgCountByPeer: cmn.NewCMap(),
// Either it was added in DialSeeds or when we
// received the peer's address in r.Receive
if r.book.NeedMoreAddrs() {
- r.RequestPEX(p)
+ if ok := r.RequestPEX(p); !ok {
+ return ErrSendPexFail
+ }
}
} else { // For inbound connections, the peer is its own source
addr, err := NewNetAddressString(p.ListenAddr)
switch msg := msg.(type) {
case *pexRequestMessage:
// src requested some peers.
- r.SendAddrs(src, r.book.GetSelection())
+ if ok := r.SendAddrs(src, r.book.GetSelection()); !ok {
+ log.Info("Send address message failed. Stop peer.")
+ }
case *pexAddrsMessage:
// We received some peer addresses from src.
// (We don't want to get spammed with bad peers)
}
// RequestPEX asks peer for more addresses.
-func (r *PEXReactor) RequestPEX(p *Peer) {
- if ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}}); !ok {
+func (r *PEXReactor) RequestPEX(p *Peer) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
+ if !ok {
r.sw.StopPeerGracefully(p)
}
+ return ok
}
// SendAddrs sends addrs to the peer.
-func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
- if ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}); !ok {
+func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) bool {
+ ok := p.TrySend(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
+ if !ok {
r.sw.StopPeerGracefully(p)
}
+ return ok
}
// SetEnsurePeersPeriod sets period to ensure peers connected.
i := rand.Int() % len(peers)
peer := peers[i]
log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
- r.RequestPEX(peer)
+ if ok := r.RequestPEX(peer); !ok {
+ log.Info("Send request address message failed. Stop peer.")
+ }
}
}
}