10 log "github.com/sirupsen/logrus"
11 wire "github.com/tendermint/go-wire"
12 cmn "github.com/tendermint/tmlibs/common"
16 // PexChannel is a channel for PEX messages
17 PexChannel = byte(0x00)
19 // period to ensure peers connected
20 defaultEnsurePeersPeriod = 30 * time.Second
21 minNumOutboundPeers = 10
22 maxPexMessageSize = 1048576 // 1MB
24 // maximum messages one peer can send to us during `msgCountByPeerFlushInterval`
25 defaultMaxMsgCountByPeer = 1000
26 msgCountByPeerFlushInterval = 1 * time.Hour
29 // PEXReactor handles PEX (peer exchange) and ensures that an
30 // adequate number of peers are connected to the switch.
32 // It uses `AddrBook` (address book) to store `NetAddress`es of the peers.
34 // ## Preventing abuse
36 // For now, it just limits the number of messages from one peer to
37 // `defaultMaxMsgCountByPeer` messages per `msgCountByPeerFlushInterval` (1000
41 // Limiting is fine for now. Maybe down the road we want to keep track of the
42 // quality of peer messages so if peerA keeps telling us about peers we can't
43 // connect to then maybe we should care less about peerA. But I don't think
44 // that kind of complexity is priority right now.
45 type PEXReactor struct {
50 ensurePeersPeriod time.Duration
52 // tracks message count by peer, so we can prevent abuse
53 msgCountByPeer *cmn.CMap
54 maxMsgCountByPeer uint16
57 // NewPEXReactor creates new PEX reactor.
58 func NewPEXReactor(b *AddrBook) *PEXReactor {
61 ensurePeersPeriod: defaultEnsurePeersPeriod,
62 msgCountByPeer: cmn.NewCMap(),
63 maxMsgCountByPeer: defaultMaxMsgCountByPeer,
65 r.BaseReactor = *NewBaseReactor("PEXReactor", r)
69 // OnStart implements BaseService
70 func (r *PEXReactor) OnStart() error {
71 r.BaseReactor.OnStart()
73 go r.ensurePeersRoutine()
74 go r.flushMsgCountByPeer()
78 // OnStop implements BaseService
79 func (r *PEXReactor) OnStop() {
80 r.BaseReactor.OnStop()
84 // GetChannels implements Reactor
85 func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
86 return []*ChannelDescriptor{
90 SendQueueCapacity: 10,
95 // AddPeer implements Reactor by adding peer to the address book (if inbound)
96 // or by requesting more addresses (if outbound).
97 func (r *PEXReactor) AddPeer(p *Peer) {
99 // For outbound peers, the address is already in the books.
100 // Either it was added in DialSeeds or when we
101 // received the peer's address in r.Receive
102 if r.book.NeedMoreAddrs() {
105 } else { // For inbound connections, the peer is its own source
106 addr, err := NewNetAddressString(p.ListenAddr)
108 // this should never happen
109 log.WithFields(log.Fields{
110 "addr": p.ListenAddr,
112 }).Error("Error in AddPeer: Invalid peer address")
115 r.book.AddAddress(addr, addr)
119 // RemovePeer implements Reactor.
120 func (r *PEXReactor) RemovePeer(p *Peer, reason interface{}) {
121 // If we aren't keeping track of local temp data for each peer here, then we
122 // don't have to do anything.
125 // Receive implements Reactor by handling incoming PEX messages.
126 func (r *PEXReactor) Receive(chID byte, src *Peer, msgBytes []byte) {
127 srcAddr := src.Connection().RemoteAddress
128 srcAddrStr := srcAddr.String()
130 r.IncrementMsgCountForPeer(srcAddrStr)
131 if r.ReachedMaxMsgCountForPeer(srcAddrStr) {
132 log.WithField("peer", srcAddrStr).Error("Maximum number of messages reached for peer")
133 // TODO remove src from peers?
137 _, msg, err := DecodeMessage(msgBytes)
139 log.WithField("error", err).Error("Error decoding message")
142 log.WithField("msg", msg).Info("Reveived message")
144 switch msg := msg.(type) {
145 case *pexRequestMessage:
146 // src requested some peers.
147 r.SendAddrs(src, r.book.GetSelection())
148 case *pexAddrsMessage:
149 // We received some peer addresses from src.
150 // (We don't want to get spammed with bad peers)
151 for _, addr := range msg.Addrs {
153 r.book.AddAddress(addr, srcAddr)
157 log.WithField("type", reflect.TypeOf(msg)).Error("Unknown message type")
161 // RequestPEX asks peer for more addresses.
162 func (r *PEXReactor) RequestPEX(p *Peer) {
163 p.Send(PexChannel, struct{ PexMessage }{&pexRequestMessage{}})
166 // SendAddrs sends addrs to the peer.
167 func (r *PEXReactor) SendAddrs(p *Peer, addrs []*NetAddress) {
168 p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}})
171 // SetEnsurePeersPeriod sets period to ensure peers connected.
172 func (r *PEXReactor) SetEnsurePeersPeriod(d time.Duration) {
173 r.ensurePeersPeriod = d
176 // SetMaxMsgCountByPeer sets maximum messages one peer can send to us during 'msgCountByPeerFlushInterval'.
177 func (r *PEXReactor) SetMaxMsgCountByPeer(v uint16) {
178 r.maxMsgCountByPeer = v
181 // ReachedMaxMsgCountForPeer returns true if we received too many
182 // messages from peer with address `addr`.
183 // NOTE: assumes the value in the CMap is non-nil
184 func (r *PEXReactor) ReachedMaxMsgCountForPeer(addr string) bool {
185 return r.msgCountByPeer.Get(addr).(uint16) >= r.maxMsgCountByPeer
188 // Increment or initialize the msg count for the peer in the CMap
189 func (r *PEXReactor) IncrementMsgCountForPeer(addr string) {
191 countI := r.msgCountByPeer.Get(addr)
193 count = countI.(uint16)
196 r.msgCountByPeer.Set(addr, count)
199 // Ensures that sufficient peers are connected. (continuous)
200 func (r *PEXReactor) ensurePeersRoutine() {
201 // Randomize when routine starts
202 ensurePeersPeriodMs := r.ensurePeersPeriod.Nanoseconds() / 1e6
203 time.Sleep(time.Duration(rand.Int63n(ensurePeersPeriodMs)) * time.Millisecond)
205 // fire once immediately.
209 ticker := time.NewTicker(r.ensurePeersPeriod)
222 // ensurePeers ensures that sufficient peers are connected. (once)
224 // Old bucket / New bucket are arbitrary categories to denote whether an
225 // address is vetted or not, and this needs to be determined over time via a
226 // heuristic that we haven't perfected yet, or, perhaps is manually edited by
227 // the node operator. It should not be used to compute what addresses are
228 // already connected or not.
230 // TODO Basically, we need to work harder on our good-peer/bad-peer marking.
231 // What we're currently doing in terms of marking good/bad peers is just a
232 // placeholder. It should not be the case that an address becomes old/vetted
233 // upon a single successful connection.
234 func (r *PEXReactor) ensurePeers() {
235 numOutPeers, _, numDialing := r.Switch.NumPeers()
236 numToDial := minNumOutboundPeers - (numOutPeers + numDialing)
237 log.WithFields(log.Fields{
238 "numOutPeers": numOutPeers,
239 "numDialing": numDialing,
240 "numToDial": numToDial,
241 }).Info("Ensure peers")
246 toDial := make(map[string]*NetAddress)
248 // Try to pick numToDial addresses to dial.
249 for i := 0; i < numToDial; i++ {
250 // The purpose of newBias is to first prioritize old (more vetted) peers
251 // when we have few connections, but to allow for new (less vetted) peers
252 // if we already have many connections. This algorithm isn't perfect, but
253 // it somewhat ensures that we prioritize connecting to more-vetted
255 newBias := cmn.MinInt(numOutPeers, 8)*10 + 10
256 var picked *NetAddress
257 // Try to fetch a new peer 3 times.
258 // This caps the maximum number of tries to 3 * numToDial.
259 for j := 0; j < 3; j++ {
260 try := r.book.PickAddress(newBias)
264 _, alreadySelected := toDial[try.IP.String()]
265 alreadyDialing := r.Switch.IsDialing(try)
266 alreadyConnected := r.Switch.Peers().Has(try.IP.String())
267 if alreadySelected || alreadyDialing || alreadyConnected {
270 log.WithField("addr", try).Info("Will dial address")
278 toDial[picked.IP.String()] = picked
281 // Dial picked addresses
282 for _, item := range toDial {
283 go func(picked *NetAddress) {
284 _, err := r.Switch.DialPeerWithAddress(picked, false)
286 r.book.MarkAttempt(picked)
291 // If we need more addresses, pick a random peer and ask for more.
292 if r.book.NeedMoreAddrs() {
293 if peers := r.Switch.Peers().List(); len(peers) > 0 {
294 i := rand.Int() % len(peers)
296 log.WithField("peer", peer).Info("No addresses to dial. Sending pexRequest to random peer")
302 func (r *PEXReactor) flushMsgCountByPeer() {
303 ticker := time.NewTicker(msgCountByPeerFlushInterval)
308 r.msgCountByPeer.Clear()
316 //-----------------------------------------------------------------------------
320 msgTypeRequest = byte(0x01)
321 msgTypeAddrs = byte(0x02)
324 // PexMessage is a primary type for PEX messages. Underneath, it could contain
325 // either pexRequestMessage, or pexAddrsMessage messages.
326 type PexMessage interface{}
328 var _ = wire.RegisterInterface(
329 struct{ PexMessage }{},
330 wire.ConcreteType{&pexRequestMessage{}, msgTypeRequest},
331 wire.ConcreteType{&pexAddrsMessage{}, msgTypeAddrs},
334 // DecodeMessage implements interface registered above.
335 func DecodeMessage(bz []byte) (msgType byte, msg PexMessage, err error) {
338 r := bytes.NewReader(bz)
339 msg = wire.ReadBinary(struct{ PexMessage }{}, r, maxPexMessageSize, n, &err).(struct{ PexMessage }).PexMessage
344 A pexRequestMessage requests additional peer addresses.
346 type pexRequestMessage struct {
349 func (m *pexRequestMessage) String() string {
350 return "[pexRequest]"
354 A message with announced peer addresses.
356 type pexAddrsMessage struct {
360 func (m *pexAddrsMessage) String() string {
361 return fmt.Sprintf("[pexAddrs %v]", m.Addrs)