OSDN Git Service

Optimize log printing (#1590)
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/hex"
5         "encoding/json"
6         "fmt"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14         dbm "github.com/tendermint/tmlibs/db"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/consensus"
18         "github.com/bytom/crypto/ed25519"
19         "github.com/bytom/errors"
20         "github.com/bytom/p2p/connection"
21         "github.com/bytom/p2p/discover"
22         "github.com/bytom/p2p/trust"
23         "github.com/bytom/version"
24 )
25
26 const (
27         bannedPeerKey       = "BannedPeer"
28         defaultBanDuration  = time.Hour * 1
29         minNumOutboundPeers = 3
30         logModule           = "p2p"
31 )
32
33 //pre-define errors for connecting fail
34 var (
35         ErrDuplicatePeer     = errors.New("Duplicate peer")
36         ErrConnectSelf       = errors.New("Connect self")
37         ErrConnectBannedPeer = errors.New("Connect banned peer")
38         ErrConnectSpvPeer    = errors.New("Outbound connect spv peer")
39 )
40
41 type discv interface {
42         ReadRandomNodes(buf []*discover.Node) (n int)
43 }
44
45 // Switch handles peer connections and exposes an API to receive incoming messages
46 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
47 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
48 // incoming messages are received on the reactor.
49 type Switch struct {
50         cmn.BaseService
51
52         Config       *cfg.Config
53         peerConfig   *PeerConfig
54         listeners    []Listener
55         reactors     map[string]Reactor
56         chDescs      []*connection.ChannelDescriptor
57         reactorsByCh map[byte]Reactor
58         peers        *PeerSet
59         dialing      *cmn.CMap
60         nodeInfo     *NodeInfo             // our node info
61         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
62         discv        discv
63         bannedPeer   map[string]time.Time
64         db           dbm.DB
65         mtx          sync.Mutex
66 }
67
68 // NewSwitch create a new Switch and set discover.
69 func NewSwitch(config *cfg.Config) (*Switch, error) {
70         var err error
71         var l Listener
72         var listenAddr string
73         var discv *discover.Network
74
75         blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
76         config.P2P.PrivateKey, err = config.NodeKey()
77         if err != nil {
78                 return nil, err
79         }
80
81         bytes, err := hex.DecodeString(config.P2P.PrivateKey)
82         if err != nil {
83                 return nil, err
84         }
85
86         var newKey [64]byte
87         copy(newKey[:], bytes)
88         privKey := crypto.PrivKeyEd25519(newKey)
89         if !config.VaultMode {
90                 // Create listener
91                 l, listenAddr = GetListener(config.P2P)
92                 discv, err = discover.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
93                 if err != nil {
94                         return nil, err
95                 }
96         }
97
98         return newSwitch(config, discv, blacklistDB, l, privKey, listenAddr)
99 }
100
101 // newSwitch creates a new Switch with the given config.
102 func newSwitch(config *cfg.Config, discv discv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
103         sw := &Switch{
104                 Config:       config,
105                 peerConfig:   DefaultPeerConfig(config.P2P),
106                 reactors:     make(map[string]Reactor),
107                 chDescs:      make([]*connection.ChannelDescriptor, 0),
108                 reactorsByCh: make(map[byte]Reactor),
109                 peers:        NewPeerSet(),
110                 dialing:      cmn.NewCMap(),
111                 nodePrivKey:  priv,
112                 discv:        discv,
113                 db:           blacklistDB,
114                 nodeInfo:     NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
115                 bannedPeer:   make(map[string]time.Time),
116         }
117         if err := sw.loadBannedPeers(); err != nil {
118                 return nil, err
119         }
120
121         sw.AddListener(l)
122         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
123         trust.Init()
124         return sw, nil
125 }
126
127 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
128 func (sw *Switch) OnStart() error {
129         for _, reactor := range sw.reactors {
130                 if _, err := reactor.Start(); err != nil {
131                         return err
132                 }
133         }
134         for _, listener := range sw.listeners {
135                 go sw.listenerRoutine(listener)
136         }
137         go sw.ensureOutboundPeersRoutine()
138         return nil
139 }
140
141 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
142 func (sw *Switch) OnStop() {
143         for _, listener := range sw.listeners {
144                 listener.Stop()
145         }
146         sw.listeners = nil
147
148         for _, peer := range sw.peers.List() {
149                 peer.Stop()
150                 sw.peers.Remove(peer)
151         }
152
153         for _, reactor := range sw.reactors {
154                 reactor.Stop()
155         }
156 }
157
158 //AddBannedPeer add peer to blacklist
159 func (sw *Switch) AddBannedPeer(ip string) error {
160         sw.mtx.Lock()
161         defer sw.mtx.Unlock()
162
163         sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
164         dataJSON, err := json.Marshal(sw.bannedPeer)
165         if err != nil {
166                 return err
167         }
168
169         sw.db.Set([]byte(bannedPeerKey), dataJSON)
170         return nil
171 }
172
173 // AddPeer performs the P2P handshake with a peer
174 // that already has a SecretConnection. If all goes well,
175 // it starts the peer and adds it to the switch.
176 // NOTE: This performs a blocking handshake before the peer is added.
177 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
178 func (sw *Switch) AddPeer(pc *peerConn) error {
179         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
180         if err != nil {
181                 return err
182         }
183
184         if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
185                 return err
186         }
187         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
188                 return err
189         }
190
191         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
192         if err := sw.filterConnByPeer(peer); err != nil {
193                 return err
194         }
195
196         if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
197                 return ErrConnectSpvPeer
198         }
199
200         // Start peer
201         if sw.IsRunning() {
202                 if err := sw.startInitPeer(peer); err != nil {
203                         return err
204                 }
205         }
206
207         return sw.peers.Add(peer)
208 }
209
210 // AddReactor adds the given reactor to the switch.
211 // NOTE: Not goroutine safe.
212 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
213         // Validate the reactor.
214         // No two reactors can share the same channel.
215         for _, chDesc := range reactor.GetChannels() {
216                 chID := chDesc.ID
217                 if sw.reactorsByCh[chID] != nil {
218                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
219                 }
220                 sw.chDescs = append(sw.chDescs, chDesc)
221                 sw.reactorsByCh[chID] = reactor
222         }
223         sw.reactors[name] = reactor
224         reactor.SetSwitch(sw)
225         return reactor
226 }
227
228 // AddListener adds the given listener to the switch for listening to incoming peer connections.
229 // NOTE: Not goroutine safe.
230 func (sw *Switch) AddListener(l Listener) {
231         sw.listeners = append(sw.listeners, l)
232 }
233
234 //DialPeerWithAddress dial node from net address
235 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
236         log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
237         sw.dialing.Set(addr.IP.String(), addr)
238         defer sw.dialing.Delete(addr.IP.String())
239         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
240                 return err
241         }
242
243         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
244         if err != nil {
245                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
246                 return err
247         }
248
249         if err = sw.AddPeer(pc); err != nil {
250                 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
251                 pc.CloseConn()
252                 return err
253         }
254         log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
255         return nil
256 }
257
258 //IsDialing prevent duplicate dialing
259 func (sw *Switch) IsDialing(addr *NetAddress) bool {
260         return sw.dialing.Has(addr.IP.String())
261 }
262
263 // IsListening returns true if the switch has at least one listener.
264 // NOTE: Not goroutine safe.
265 func (sw *Switch) IsListening() bool {
266         return len(sw.listeners) > 0
267 }
268
269 // loadBannedPeers load banned peers from db
270 func (sw *Switch) loadBannedPeers() error {
271         if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
272                 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
273                         return err
274                 }
275         }
276
277         return nil
278 }
279
280 // Listeners returns the list of listeners the switch listens on.
281 // NOTE: Not goroutine safe.
282 func (sw *Switch) Listeners() []Listener {
283         return sw.listeners
284 }
285
286 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
287 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
288         peers := sw.peers.List()
289         for _, peer := range peers {
290                 if peer.outbound {
291                         outbound++
292                 } else {
293                         inbound++
294                 }
295         }
296         dialing = sw.dialing.Size()
297         return
298 }
299
300 // NodeInfo returns the switch's NodeInfo.
301 // NOTE: Not goroutine safe.
302 func (sw *Switch) NodeInfo() *NodeInfo {
303         return sw.nodeInfo
304 }
305
306 //Peers return switch peerset
307 func (sw *Switch) Peers() *PeerSet {
308         return sw.peers
309 }
310
311 // StopPeerForError disconnects from a peer due to external error.
312 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
313         log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
314         sw.stopAndRemovePeer(peer, reason)
315 }
316
317 // StopPeerGracefully disconnect from a peer gracefully.
318 func (sw *Switch) StopPeerGracefully(peerID string) {
319         if peer := sw.peers.Get(peerID); peer != nil {
320                 sw.stopAndRemovePeer(peer, nil)
321         }
322 }
323
324 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
325         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
326         if err != nil {
327                 if err := conn.Close(); err != nil {
328                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
329                 }
330                 return err
331         }
332
333         if err = sw.AddPeer(peerConn); err != nil {
334                 if err := conn.Close(); err != nil {
335                         log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
336                 }
337                 return err
338         }
339
340         log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
341         return nil
342 }
343
344 func (sw *Switch) checkBannedPeer(peer string) error {
345         sw.mtx.Lock()
346         defer sw.mtx.Unlock()
347
348         if banEnd, ok := sw.bannedPeer[peer]; ok {
349                 if time.Now().Before(banEnd) {
350                         return ErrConnectBannedPeer
351                 }
352
353                 if err := sw.delBannedPeer(peer); err != nil {
354                         return err
355                 }
356         }
357         return nil
358 }
359
360 func (sw *Switch) delBannedPeer(addr string) error {
361         sw.mtx.Lock()
362         defer sw.mtx.Unlock()
363
364         delete(sw.bannedPeer, addr)
365         datajson, err := json.Marshal(sw.bannedPeer)
366         if err != nil {
367                 return err
368         }
369
370         sw.db.Set([]byte(bannedPeerKey), datajson)
371         return nil
372 }
373
374 func (sw *Switch) filterConnByIP(ip string) error {
375         if ip == sw.nodeInfo.listenHost() {
376                 return ErrConnectSelf
377         }
378         return sw.checkBannedPeer(ip)
379 }
380
381 func (sw *Switch) filterConnByPeer(peer *Peer) error {
382         if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
383                 return err
384         }
385
386         if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) {
387                 return ErrConnectSelf
388         }
389
390         if sw.peers.Has(peer.Key) {
391                 return ErrDuplicatePeer
392         }
393         return nil
394 }
395
396 func (sw *Switch) listenerRoutine(l Listener) {
397         for {
398                 inConn, ok := <-l.Connections()
399                 if !ok {
400                         break
401                 }
402
403                 // disconnect if we alrady have MaxNumPeers
404                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
405                         if err := inConn.Close(); err != nil {
406                                 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
407                         }
408                         log.Info("Ignoring inbound connection: already have enough peers.")
409                         continue
410                 }
411
412                 // New inbound connection!
413                 if err := sw.addPeerWithConnection(inConn); err != nil {
414                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
415                         continue
416                 }
417         }
418 }
419
420 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
421         if err := sw.DialPeerWithAddress(a); err != nil {
422                 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
423         }
424         wg.Done()
425 }
426
427 func (sw *Switch) ensureOutboundPeers() {
428         numOutPeers, _, numDialing := sw.NumPeers()
429         numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
430         log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
431         if numToDial <= 0 {
432                 return
433         }
434
435         connectedPeers := make(map[string]struct{})
436         for _, peer := range sw.Peers().List() {
437                 connectedPeers[peer.remoteAddrHost()] = struct{}{}
438         }
439
440         var wg sync.WaitGroup
441         nodes := make([]*discover.Node, numToDial)
442         n := sw.discv.ReadRandomNodes(nodes)
443         for i := 0; i < n; i++ {
444                 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
445                 if sw.NodeInfo().ListenAddr == try.String() {
446                         continue
447                 }
448                 if dialling := sw.IsDialing(try); dialling {
449                         continue
450                 }
451                 if _, ok := connectedPeers[try.IP.String()]; ok {
452                         continue
453                 }
454
455                 wg.Add(1)
456                 go sw.dialPeerWorker(try, &wg)
457         }
458         wg.Wait()
459 }
460
461 func (sw *Switch) ensureOutboundPeersRoutine() {
462         sw.ensureOutboundPeers()
463
464         ticker := time.NewTicker(10 * time.Second)
465         defer ticker.Stop()
466
467         for {
468                 select {
469                 case <-ticker.C:
470                         sw.ensureOutboundPeers()
471                 case <-sw.Quit:
472                         return
473                 }
474         }
475 }
476
477 func (sw *Switch) startInitPeer(peer *Peer) error {
478         // spawn send/recv routines
479         if _, err := peer.Start(); err != nil {
480                 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
481         }
482
483         for _, reactor := range sw.reactors {
484                 if err := reactor.AddPeer(peer); err != nil {
485                         return err
486                 }
487         }
488         return nil
489 }
490
491 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
492         sw.peers.Remove(peer)
493         for _, reactor := range sw.reactors {
494                 reactor.RemovePeer(peer, reason)
495         }
496         peer.Stop()
497
498         sentStatus, receivedStatus := peer.TrafficStatus()
499         log.WithFields(log.Fields{
500                 "module":                logModule,
501                 "address":               peer.Addr().String(),
502                 "reason":                reason,
503                 "duration":              sentStatus.Duration.String(),
504                 "total_sent":            sentStatus.Bytes,
505                 "total_received":        receivedStatus.Bytes,
506                 "average_sent_rate":     sentStatus.AvgRate,
507                 "average_received_rate": receivedStatus.AvgRate,
508                 "peer num":              sw.peers.Size(),
509         }).Info("disconnect with peer")
510 }