OSDN Git Service

Merge pull request #680 from Bytom/fix-print-json-invalid-type-assertion
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "math/rand"
7         "net"
8         "sync"
9         "time"
10         "strings"
11
12         log "github.com/sirupsen/logrus"
13         crypto "github.com/tendermint/go-crypto"
14         cmn "github.com/tendermint/tmlibs/common"
15         dbm "github.com/tendermint/tmlibs/db"
16
17         cfg "github.com/bytom/config"
18         "github.com/bytom/errors"
19         "github.com/bytom/p2p/trust"
20 )
21
22 const (
23         reconnectAttempts = 10
24         reconnectInterval = 10 * time.Second
25
26         bannedPeerKey      = "BannedPeer"
27         defaultBanDuration = time.Hour * 24
28 )
29
30 var ErrConnectBannedPeer = errors.New("Connect banned peer")
31
32 type Reactor interface {
33         cmn.Service // Start, Stop
34
35         SetSwitch(*Switch)
36         GetChannels() []*ChannelDescriptor
37         AddPeer(peer *Peer) error
38         RemovePeer(peer *Peer, reason interface{})
39         Receive(chID byte, peer *Peer, msgBytes []byte)
40 }
41
42 //--------------------------------------
43
44 type BaseReactor struct {
45         cmn.BaseService // Provides Start, Stop, .Quit
46         Switch          *Switch
47 }
48
49 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
50         return &BaseReactor{
51                 BaseService: *cmn.NewBaseService(nil, name, impl),
52                 Switch:      nil,
53         }
54 }
55
56 func (br *BaseReactor) SetSwitch(sw *Switch) {
57         br.Switch = sw
58 }
59 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor              { return nil }
60 func (_ *BaseReactor) AddPeer(peer *Peer)                             {}
61 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{})      {}
62 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
63
64 //-----------------------------------------------------------------------------
65
66 /*
67 The `Switch` handles peer connections and exposes an API to receive incoming messages
68 on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
69 or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
70 incoming messages are received on the reactor.
71 */
72 type Switch struct {
73         cmn.BaseService
74
75         config           *cfg.P2PConfig
76         peerConfig       *PeerConfig
77         listeners        []Listener
78         reactors         map[string]Reactor
79         chDescs          []*ChannelDescriptor
80         reactorsByCh     map[byte]Reactor
81         peers            *PeerSet
82         dialing          *cmn.CMap
83         nodeInfo         *NodeInfo             // our node info
84         nodePrivKey      crypto.PrivKeyEd25519 // our node privkey
85         bannedPeer       map[string]time.Time
86         db               dbm.DB
87         ScamPeerCh       chan *Peer
88         mtx              sync.Mutex
89
90         filterConnByAddr   func(net.Addr) error
91         filterConnByPubKey func(crypto.PubKeyEd25519) error
92 }
93
94 var (
95         ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
96         ErrConnectSelf         = errors.New("Connect self")
97         ErrPeerConnected       = errors.New("Peer is connected")
98 )
99
100 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
101         sw := &Switch{
102                 config:       config,
103                 peerConfig:   DefaultPeerConfig(config),
104                 reactors:     make(map[string]Reactor),
105                 chDescs:      make([]*ChannelDescriptor, 0),
106                 reactorsByCh: make(map[byte]Reactor),
107                 peers:        NewPeerSet(),
108                 dialing:      cmn.NewCMap(),
109                 nodeInfo:     nil,
110                 db:           trustHistoryDB,
111                 ScamPeerCh:   make(chan *Peer),
112         }
113         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
114
115         sw.bannedPeer = make(map[string]time.Time)
116         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
117                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
118                         return nil
119                 }
120         }
121         trust.Init()
122         return sw
123 }
124
125 // Not goroutine safe.
126 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
127         // Validate the reactor.
128         // No two reactors can share the same channel.
129         reactorChannels := reactor.GetChannels()
130         for _, chDesc := range reactorChannels {
131                 chID := chDesc.ID
132                 if sw.reactorsByCh[chID] != nil {
133                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
134                 }
135                 sw.chDescs = append(sw.chDescs, chDesc)
136                 sw.reactorsByCh[chID] = reactor
137         }
138         sw.reactors[name] = reactor
139         reactor.SetSwitch(sw)
140         return reactor
141 }
142
143 // Not goroutine safe.
144 func (sw *Switch) Reactors() map[string]Reactor {
145         return sw.reactors
146 }
147
148 // Not goroutine safe.
149 func (sw *Switch) Reactor(name string) Reactor {
150         return sw.reactors[name]
151 }
152
153 // Not goroutine safe.
154 func (sw *Switch) AddListener(l Listener) {
155         sw.listeners = append(sw.listeners, l)
156 }
157
158 // Not goroutine safe.
159 func (sw *Switch) Listeners() []Listener {
160         return sw.listeners
161 }
162
163 // Not goroutine safe.
164 func (sw *Switch) IsListening() bool {
165         return len(sw.listeners) > 0
166 }
167
168 // Not goroutine safe.
169 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
170         sw.nodeInfo = nodeInfo
171 }
172
173 // Not goroutine safe.
174 func (sw *Switch) NodeInfo() *NodeInfo {
175         return sw.nodeInfo
176 }
177
178 // Not goroutine safe.
179 // NOTE: Overwrites sw.nodeInfo.PubKey
180 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
181         sw.nodePrivKey = nodePrivKey
182         if sw.nodeInfo != nil {
183                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
184         }
185 }
186
187 // Switch.Start() starts all the reactors, peers, and listeners.
188 func (sw *Switch) OnStart() error {
189         sw.BaseService.OnStart()
190         // Start reactors
191         for _, reactor := range sw.reactors {
192                 _, err := reactor.Start()
193                 if err != nil {
194                         return err
195                 }
196         }
197         // Start peers
198         for _, peer := range sw.peers.List() {
199                 sw.startInitPeer(peer)
200         }
201         // Start listeners
202         for _, listener := range sw.listeners {
203                 go sw.listenerRoutine(listener)
204         }
205         return nil
206 }
207
208 func (sw *Switch) OnStop() {
209         sw.BaseService.OnStop()
210         // Stop listeners
211         for _, listener := range sw.listeners {
212                 listener.Stop()
213         }
214         sw.listeners = nil
215         // Stop peers
216         for _, peer := range sw.peers.List() {
217                 peer.Stop()
218                 sw.peers.Remove(peer)
219         }
220         // Stop reactors
221         for _, reactor := range sw.reactors {
222                 reactor.Stop()
223         }
224 }
225
226 // NOTE: This performs a blocking handshake before the peer is added.
227 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
228 func (sw *Switch) AddPeer(peer *Peer) error {
229         if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
230                 return err
231         }
232
233         if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
234                 return err
235         }
236
237         if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
238                 return err
239         }
240
241         if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
242                 return err
243         }
244
245         // Avoid self
246         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
247                 return errors.New("Ignoring connection from self")
248         }
249
250         // Check version, chain id
251         if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
252                 return err
253         }
254
255         // Check for duplicate peer
256         if sw.peers.Has(peer.Key) {
257                 return ErrSwitchDuplicatePeer
258
259         }
260
261         // Start peer
262         if sw.IsRunning() {
263                 if err := sw.startInitPeer(peer); err != nil {
264                         return err
265                 }
266         }
267
268         // Add the peer to .peers.
269         // We start it first so that a peer in the list is safe to Stop.
270         // It should not err since we already checked peers.Has()
271         if err := sw.peers.Add(peer); err != nil {
272                 return err
273         }
274
275         log.WithField("peer", peer).Info("Added peer")
276         return nil
277 }
278
279 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
280         if sw.filterConnByAddr != nil {
281                 return sw.filterConnByAddr(addr)
282         }
283         return nil
284 }
285
286 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
287         if sw.filterConnByPubKey != nil {
288                 return sw.filterConnByPubKey(pubkey)
289         }
290         return nil
291
292 }
293
294 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
295         sw.filterConnByAddr = f
296 }
297
298 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
299         sw.filterConnByPubKey = f
300 }
301
302 func (sw *Switch) startInitPeer(peer *Peer) error {
303         peer.Start() // spawn send/recv routines
304         for _, reactor := range sw.reactors {
305                 if err := reactor.AddPeer(peer); err != nil {
306                         return err
307                 }
308         }
309         return nil
310 }
311
312 // Dial a list of seeds asynchronously in random order
313 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
314
315         netAddrs, err := NewNetAddressStrings(seeds)
316         if err != nil {
317                 return err
318         }
319
320         if addrBook != nil {
321                 // add seeds to `addrBook`
322                 ourAddrS := sw.nodeInfo.ListenAddr
323                 ourAddr, _ := NewNetAddressString(ourAddrS)
324                 for _, netAddr := range netAddrs {
325                         // do not add ourselves
326                         if netAddr.Equals(ourAddr) {
327                                 continue
328                         }
329                         addrBook.AddAddress(netAddr, ourAddr)
330                 }
331                 addrBook.Save()
332         }
333
334         // permute the list, dial them in random order.
335         perm := rand.Perm(len(netAddrs))
336         for i := 0; i < len(perm); i++ {
337                 j := perm[i]
338                 sw.dialSeed(netAddrs[j])
339         }
340         return nil
341 }
342
343 func (sw *Switch) dialSeed(addr *NetAddress) {
344         peer, err := sw.DialPeerWithAddress(addr, true)
345         if err != nil {
346                 log.WithField("error", err).Error("Error dialing seed")
347         } else {
348                 log.WithField("peer", peer).Info("Connected to seed")
349         }
350 }
351
352 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
353         if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
354                 return nil, err
355         }
356         if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
357                 return nil, ErrConnectSelf
358         }
359         for _, v := range sw.Peers().list {
360                 if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
361                         return nil, ErrPeerConnected
362                 }
363         }
364         sw.dialing.Set(addr.IP.String(), addr)
365         defer sw.dialing.Delete(addr.IP.String())
366
367         log.WithField("address", addr).Info("Dialing peer")
368         peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
369         if err != nil {
370                 log.WithFields(log.Fields{
371                         "address": addr,
372                         "error":   err,
373                 }).Info("Failed to dial peer")
374                 return nil, err
375         }
376         peer.SetLogger(sw.Logger.With("peer", addr))
377         if persistent {
378                 peer.makePersistent()
379         }
380         err = sw.AddPeer(peer)
381         if err != nil {
382                 log.WithFields(log.Fields{
383                         "address": addr,
384                         "error":   err,
385                 }).Info("Failed to add peer")
386                 peer.CloseConn()
387                 return nil, err
388         }
389         log.WithFields(log.Fields{
390                 "address": addr,
391         }).Info("Dialed and added peer")
392         return peer, nil
393 }
394
395 func (sw *Switch) IsDialing(addr *NetAddress) bool {
396         return sw.dialing.Has(addr.IP.String())
397 }
398
399 // Broadcast runs a go routine for each attempted send, which will block
400 // trying to send for defaultSendTimeoutSeconds. Returns a channel
401 // which receives success values for each attempted send (false if times out)
402 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
403 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
404         successChan := make(chan bool, len(sw.peers.List()))
405         log.WithFields(log.Fields{
406                 "chID": chID,
407                 "msg":  msg,
408         }).Debug("Broadcast")
409         for _, peer := range sw.peers.List() {
410                 go func(peer *Peer) {
411                         success := peer.Send(chID, msg)
412                         successChan <- success
413                 }(peer)
414         }
415         return successChan
416 }
417
418 // Returns the count of outbound/inbound and outbound-dialing peers.
419 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
420         peers := sw.peers.List()
421         for _, peer := range peers {
422                 if peer.outbound {
423                         outbound++
424                 } else {
425                         inbound++
426                 }
427         }
428         dialing = sw.dialing.Size()
429         return
430 }
431
432 func (sw *Switch) Peers() *PeerSet {
433         return sw.peers
434 }
435
436 // Disconnect from a peer due to external error, retry if it is a persistent peer.
437 // TODO: make record depending on reason.
438 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
439         addr := NewNetAddress(peer.Addr())
440         log.WithFields(log.Fields{
441                 "peer":  peer,
442                 "error": reason,
443         }).Info("Stopping peer due to error")
444         sw.stopAndRemovePeer(peer, reason)
445
446         if peer.IsPersistent() {
447                 log.WithField("peer", peer).Info("Reconnecting to peer")
448                 for i := 1; i < reconnectAttempts; i++ {
449                         if !sw.IsRunning() {
450                                 return
451                         }
452
453                         peer, err := sw.DialPeerWithAddress(addr, true)
454                         if err != nil {
455                                 if i == reconnectAttempts {
456                                         log.WithFields(log.Fields{
457                                                 "retries": i,
458                                                 "error":   err,
459                                         }).Info("Error reconnecting to peer. Giving up")
460                                         return
461                                 }
462
463                                 if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
464                                         log.WithField("error", err).Info("Error reconnecting to peer. ")
465                                         return
466                                 }
467
468                                 log.WithFields(log.Fields{
469                                         "retries": i,
470                                         "error":   err,
471                                 }).Info("Error reconnecting to peer. Trying again")
472                                 time.Sleep(reconnectInterval)
473                                 continue
474                         }
475
476                         log.WithField("peer", peer).Info("Reconnected to peer")
477                         return
478                 }
479         }
480 }
481
482 // Disconnect from a peer gracefully.
483 // TODO: handle graceful disconnects.
484 func (sw *Switch) StopPeerGracefully(peer *Peer) {
485         log.Info("Stopping peer gracefully")
486         sw.stopAndRemovePeer(peer, nil)
487 }
488
489 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
490         sw.peers.Remove(peer)
491         peer.Stop()
492         for _, reactor := range sw.reactors {
493                 reactor.RemovePeer(peer, reason)
494         }
495 }
496
497 func (sw *Switch) listenerRoutine(l Listener) {
498         for {
499                 inConn, ok := <-l.Connections()
500                 if !ok {
501                         break
502                 }
503
504                 // ignore connection if we already have enough
505                 maxPeers := sw.config.MaxNumPeers
506                 if maxPeers <= sw.peers.Size() {
507                         // close inConn
508                         inConn.Close()
509                         log.WithFields(log.Fields{
510                                 "address":  inConn.RemoteAddr().String(),
511                                 "numPeers": sw.peers.Size(),
512                                 "max":      maxPeers,
513                         }).Info("Ignoring inbound connection: already have enough peers")
514                         continue
515                 }
516
517                 // New inbound connection!
518                 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
519                 if err != nil {
520                         // conn close for returing err
521                         inConn.Close()
522                         log.WithFields(log.Fields{
523                                 "address": inConn.RemoteAddr().String(),
524                                 "error":   err,
525                         }).Info("Ignoring inbound connection: error while adding peer")
526                         continue
527                 }
528
529                 // NOTE: We don't yet have the listening port of the
530                 // remote (if they have a listener at all).
531                 // The peerHandshake will handle that
532         }
533
534         // cleanup
535 }
536
537 //-----------------------------------------------------------------------------
538
539 type SwitchEventNewPeer struct {
540         Peer *Peer
541 }
542
543 type SwitchEventDonePeer struct {
544         Peer  *Peer
545         Error interface{}
546 }
547
548 //------------------------------------------------------------------
549 // Switches connected via arbitrary net.Conn; useful for testing
550
551 // Returns n switches, connected according to the connect func.
552 // If connect==Connect2Switches, the switches will be fully connected.
553 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
554 // NOTE: panics if any switch fails to start.
555 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
556         switches := make([]*Switch, n)
557         for i := 0; i < n; i++ {
558                 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
559         }
560
561         if err := StartSwitches(switches); err != nil {
562                 panic(err)
563         }
564
565         for i := 0; i < n; i++ {
566                 for j := i; j < n; j++ {
567                         connect(switches, i, j)
568                 }
569         }
570
571         return switches
572 }
573
574 var PanicOnAddPeerErr = false
575
576 // Will connect switches i and j via net.Pipe()
577 // Blocks until a conection is established.
578 // NOTE: caller ensures i and j are within bounds
579 func Connect2Switches(switches []*Switch, i, j int) {
580         switchI := switches[i]
581         switchJ := switches[j]
582         c1, c2 := net.Pipe()
583         doneCh := make(chan struct{})
584         go func() {
585                 err := switchI.addPeerWithConnection(c1)
586                 if PanicOnAddPeerErr && err != nil {
587                         panic(err)
588                 }
589                 doneCh <- struct{}{}
590         }()
591         go func() {
592                 err := switchJ.addPeerWithConnection(c2)
593                 if PanicOnAddPeerErr && err != nil {
594                         panic(err)
595                 }
596                 doneCh <- struct{}{}
597         }()
598         <-doneCh
599         <-doneCh
600 }
601
602 func StartSwitches(switches []*Switch) error {
603         for _, s := range switches {
604                 _, err := s.Start() // start switch and reactors
605                 if err != nil {
606                         return err
607                 }
608         }
609         return nil
610 }
611
612 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
613         privKey := crypto.GenPrivKeyEd25519()
614         // new switch, add reactors
615         // TODO: let the config be passed in?
616         s := initSwitch(i, NewSwitch(cfg, nil))
617         s.SetNodeInfo(&NodeInfo{
618                 PubKey:     privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
619                 Moniker:    cmn.Fmt("switch%d", i),
620                 Network:    network,
621                 Version:    version,
622                 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
623                 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
624         })
625         s.SetNodePrivKey(privKey)
626         return s
627 }
628
629 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
630         peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
631         if err != nil {
632                 conn.Close()
633                 return err
634         }
635         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
636         if err = sw.AddPeer(peer); err != nil {
637                 conn.Close()
638                 return err
639         }
640
641         return nil
642 }
643
644 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
645         fullAddr := conn.RemoteAddr().String()
646         host, _, err := net.SplitHostPort(fullAddr)
647         if err != nil {
648                 return err
649         }
650
651         if err = sw.checkBannedPeer(host); err != nil {
652                 return err
653         }
654
655         peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
656         if err != nil {
657                 return err
658         }
659         peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
660         if err = sw.AddPeer(peer); err != nil {
661                 return err
662         }
663
664         return nil
665 }
666
667 func (sw *Switch) AddBannedPeer(peer *Peer) error {
668         sw.mtx.Lock()
669         defer sw.mtx.Unlock()
670
671         key := peer.mconn.RemoteAddress.IP.String()
672         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
673         datajson, err := json.Marshal(sw.bannedPeer)
674         if err != nil {
675                 return err
676         }
677         sw.db.Set([]byte(bannedPeerKey), datajson)
678         return nil
679 }
680
681 func (sw *Switch) DelBannedPeer(addr string) error {
682         sw.mtx.Lock()
683         defer sw.mtx.Unlock()
684
685         delete(sw.bannedPeer, addr)
686         datajson, err := json.Marshal(sw.bannedPeer)
687         if err != nil {
688                 return err
689         }
690         sw.db.Set([]byte(bannedPeerKey), datajson)
691         return nil
692 }
693
694 func (sw *Switch) checkBannedPeer(peer string) error {
695         if banEnd, ok := sw.bannedPeer[peer]; ok {
696                 if time.Now().Before(banEnd) {
697                         return ErrConnectBannedPeer
698                 }
699                 sw.DelBannedPeer(peer)
700         }
701         return nil
702 }