12 log "github.com/sirupsen/logrus"
13 crypto "github.com/tendermint/go-crypto"
14 cmn "github.com/tendermint/tmlibs/common"
15 dbm "github.com/tendermint/tmlibs/db"
17 cfg "github.com/bytom/config"
18 "github.com/bytom/errors"
19 "github.com/bytom/p2p/trust"
24 reconnectInterval = 10 * time.Second
26 bannedPeerKey = "BannedPeer"
27 defaultBanDuration = time.Hour * 1
30 var ErrConnectBannedPeer = errors.New("Connect banned peer")
32 type Reactor interface {
33 cmn.Service // Start, Stop
36 GetChannels() []*ChannelDescriptor
37 AddPeer(peer *Peer) error
38 RemovePeer(peer *Peer, reason interface{})
39 Receive(chID byte, peer *Peer, msgBytes []byte)
42 //--------------------------------------
44 type BaseReactor struct {
45 cmn.BaseService // Provides Start, Stop, .Quit
49 func NewBaseReactor(name string, impl Reactor) *BaseReactor {
51 BaseService: *cmn.NewBaseService(nil, name, impl),
56 func (br *BaseReactor) SetSwitch(sw *Switch) {
59 func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
60 func (_ *BaseReactor) AddPeer(peer *Peer) {}
61 func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
62 func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
64 //-----------------------------------------------------------------------------
67 The `Switch` handles peer connections and exposes an API to receive incoming messages
68 on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
69 or more `Channels`. So while sending outgoing messages is typically performed on the peer,
70 incoming messages are received on the reactor.
76 peerConfig *PeerConfig
78 reactors map[string]Reactor
79 chDescs []*ChannelDescriptor
80 reactorsByCh map[byte]Reactor
83 nodeInfo *NodeInfo // our node info
84 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
85 bannedPeer map[string]time.Time
89 filterConnByAddr func(net.Addr) error
90 filterConnByPubKey func(crypto.PubKeyEd25519) error
94 ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
95 ErrConnectSelf = errors.New("Connect self")
96 ErrPeerConnected = errors.New("Peer is connected")
99 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
102 peerConfig: DefaultPeerConfig(config),
103 reactors: make(map[string]Reactor),
104 chDescs: make([]*ChannelDescriptor, 0),
105 reactorsByCh: make(map[byte]Reactor),
107 dialing: cmn.NewCMap(),
111 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
113 sw.bannedPeer = make(map[string]time.Time)
114 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
115 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
123 // Not goroutine safe.
124 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
125 // Validate the reactor.
126 // No two reactors can share the same channel.
127 reactorChannels := reactor.GetChannels()
128 for _, chDesc := range reactorChannels {
130 if sw.reactorsByCh[chID] != nil {
131 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
133 sw.chDescs = append(sw.chDescs, chDesc)
134 sw.reactorsByCh[chID] = reactor
136 sw.reactors[name] = reactor
137 reactor.SetSwitch(sw)
141 // Not goroutine safe.
142 func (sw *Switch) Reactors() map[string]Reactor {
146 // Not goroutine safe.
147 func (sw *Switch) Reactor(name string) Reactor {
148 return sw.reactors[name]
151 // Not goroutine safe.
152 func (sw *Switch) AddListener(l Listener) {
153 sw.listeners = append(sw.listeners, l)
156 // Not goroutine safe.
157 func (sw *Switch) Listeners() []Listener {
161 // Not goroutine safe.
162 func (sw *Switch) IsListening() bool {
163 return len(sw.listeners) > 0
166 // Not goroutine safe.
167 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
168 sw.nodeInfo = nodeInfo
171 // Not goroutine safe.
172 func (sw *Switch) NodeInfo() *NodeInfo {
176 // Not goroutine safe.
177 // NOTE: Overwrites sw.nodeInfo.PubKey
178 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
179 sw.nodePrivKey = nodePrivKey
180 if sw.nodeInfo != nil {
181 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
185 // Switch.Start() starts all the reactors, peers, and listeners.
186 func (sw *Switch) OnStart() error {
187 sw.BaseService.OnStart()
189 for _, reactor := range sw.reactors {
190 _, err := reactor.Start()
196 for _, listener := range sw.listeners {
197 go sw.listenerRoutine(listener)
202 func (sw *Switch) OnStop() {
203 sw.BaseService.OnStop()
205 for _, listener := range sw.listeners {
210 for _, peer := range sw.peers.List() {
212 sw.peers.Remove(peer)
215 for _, reactor := range sw.reactors {
220 // NOTE: This performs a blocking handshake before the peer is added.
221 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
222 func (sw *Switch) AddPeer(peer *Peer) error {
223 if err := sw.FilterConnByAddr(peer.Addr()); err != nil {
227 if err := sw.FilterConnByPubKey(peer.PubKey()); err != nil {
231 if err := peer.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second)); err != nil {
235 if err := sw.checkBannedPeer(peer.NodeInfo.ListenHost()); err != nil {
240 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
241 return errors.New("Ignoring connection from self")
244 // Check version, chain id
245 if err := sw.nodeInfo.CompatibleWith(peer.NodeInfo); err != nil {
249 // Check for duplicate peer
250 if sw.peers.Has(peer.Key) {
251 return ErrSwitchDuplicatePeer
257 if err := sw.startInitPeer(peer); err != nil {
262 // Add the peer to .peers.
263 // We start it first so that a peer in the list is safe to Stop.
264 // It should not err since we already checked peers.Has()
265 if err := sw.peers.Add(peer); err != nil {
269 log.WithField("peer", peer).Info("Added peer")
273 func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
274 if sw.filterConnByAddr != nil {
275 return sw.filterConnByAddr(addr)
280 func (sw *Switch) FilterConnByPubKey(pubkey crypto.PubKeyEd25519) error {
281 if sw.filterConnByPubKey != nil {
282 return sw.filterConnByPubKey(pubkey)
288 func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
289 sw.filterConnByAddr = f
292 func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
293 sw.filterConnByPubKey = f
296 func (sw *Switch) startInitPeer(peer *Peer) error {
297 peer.Start() // spawn send/recv routines
298 for _, reactor := range sw.reactors {
299 if err := reactor.AddPeer(peer); err != nil {
306 // Dial a list of seeds asynchronously in random order
307 func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error {
309 netAddrs, err := NewNetAddressStrings(seeds)
315 // add seeds to `addrBook`
316 ourAddrS := sw.nodeInfo.ListenAddr
317 ourAddr, _ := NewNetAddressString(ourAddrS)
318 for _, netAddr := range netAddrs {
319 // do not add ourselves
320 if netAddr.Equals(ourAddr) {
323 addrBook.AddAddress(netAddr, ourAddr)
328 //permute the list, dial them in random order.
329 perm := rand.Perm(len(netAddrs))
330 for i := 0; i < len(perm); i += 2 {
332 sw.dialSeed(netAddrs[j])
338 func (sw *Switch) dialSeed(addr *NetAddress) {
339 peer, err := sw.DialPeerWithAddress(addr, false)
341 log.WithField("error", err).Error("Error dialing seed")
343 log.WithField("peer", peer).Info("Connected to seed")
347 func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) {
348 if err := sw.checkBannedPeer(addr.IP.String()); err != nil {
351 if strings.Compare(addr.IP.String(), sw.nodeInfo.ListenHost()) == 0 {
352 return nil, ErrConnectSelf
354 for _, v := range sw.Peers().list {
355 if strings.Compare(v.mconn.RemoteAddress.IP.String(), addr.IP.String()) == 0 {
356 return nil, ErrPeerConnected
359 sw.dialing.Set(addr.IP.String(), addr)
360 defer sw.dialing.Delete(addr.IP.String())
362 log.Debug("Dialing peer address:", addr)
363 peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
365 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
368 peer.SetLogger(sw.Logger.With("peer", addr))
370 peer.makePersistent()
372 err = sw.AddPeer(peer)
374 log.WithFields(log.Fields{
377 }).Info("Failed to add peer")
381 log.WithFields(log.Fields{
383 }).Info("Dialed and added peer")
387 func (sw *Switch) IsDialing(addr *NetAddress) bool {
388 return sw.dialing.Has(addr.IP.String())
391 // Broadcast runs a go routine for each attempted send, which will block
392 // trying to send for defaultSendTimeoutSeconds. Returns a channel
393 // which receives success values for each attempted send (false if times out)
394 // NOTE: Broadcast uses goroutines, so order of broadcast may not be preserved.
395 func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
396 successChan := make(chan bool, len(sw.peers.List()))
397 log.WithFields(log.Fields{
400 }).Debug("Broadcast")
401 for _, peer := range sw.peers.List() {
402 go func(peer *Peer) {
403 success := peer.Send(chID, msg)
404 successChan <- success
410 // Returns the count of outbound/inbound and outbound-dialing peers.
411 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
412 peers := sw.peers.List()
413 for _, peer := range peers {
420 dialing = sw.dialing.Size()
424 func (sw *Switch) Peers() *PeerSet {
428 // Disconnect from a peer due to external error, retry if it is a persistent peer.
429 // TODO: make record depending on reason.
430 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
431 addr := NewNetAddress(peer.Addr())
432 log.WithFields(log.Fields{
435 }).Info("Stopping peer due to error")
436 sw.stopAndRemovePeer(peer, reason)
438 if peer.IsPersistent() {
439 log.WithField("peer", peer).Info("Reconnecting to peer")
440 for i := 1; i < reconnectAttempts; i++ {
445 peer, err := sw.DialPeerWithAddress(addr, false)
447 if i == reconnectAttempts {
448 log.WithFields(log.Fields{
451 }).Info("Error reconnecting to peer. Giving up")
455 if errors.Root(err) == ErrConnectBannedPeer || errors.Root(err) == ErrPeerConnected || errors.Root(err) == ErrSwitchDuplicatePeer || errors.Root(err) == ErrConnectSelf {
456 log.WithField("error", err).Info("Error reconnecting to peer. ")
460 log.WithFields(log.Fields{
463 }).Info("Error reconnecting to peer. Trying again")
464 time.Sleep(reconnectInterval)
468 log.WithField("peer", peer).Info("Reconnected to peer")
474 // Disconnect from a peer gracefully.
475 // TODO: handle graceful disconnects.
476 func (sw *Switch) StopPeerGracefully(peer *Peer) {
477 log.Info("Stopping peer gracefully")
478 sw.stopAndRemovePeer(peer, nil)
481 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
482 for _, reactor := range sw.reactors {
483 reactor.RemovePeer(peer, reason)
485 sw.peers.Remove(peer)
486 log.Info("Del peer from switch.")
488 log.Info("Peer connection is closed.")
491 func (sw *Switch) listenerRoutine(l Listener) {
493 inConn, ok := <-l.Connections()
498 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
499 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
500 // be double of MaxNumPeers
501 if sw.config.MaxNumPeers*2 <= sw.peers.Size() {
504 log.WithFields(log.Fields{
505 "address": inConn.RemoteAddr().String(),
506 "numPeers": sw.peers.Size(),
507 }).Info("Ignoring inbound connection: already have enough peers")
511 // New inbound connection!
512 err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig)
514 // conn close for returing err
516 log.WithFields(log.Fields{
517 "address": inConn.RemoteAddr().String(),
519 }).Info("Ignoring inbound connection: error while adding peer")
523 // NOTE: We don't yet have the listening port of the
524 // remote (if they have a listener at all).
525 // The peerHandshake will handle that
531 //-----------------------------------------------------------------------------
533 type SwitchEventNewPeer struct {
537 type SwitchEventDonePeer struct {
542 //------------------------------------------------------------------
543 // Switches connected via arbitrary net.Conn; useful for testing
545 // Returns n switches, connected according to the connect func.
546 // If connect==Connect2Switches, the switches will be fully connected.
547 // initSwitch defines how the ith switch should be initialized (ie. with what reactors).
548 // NOTE: panics if any switch fails to start.
549 func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch {
550 switches := make([]*Switch, n)
551 for i := 0; i < n; i++ {
552 switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch)
555 if err := StartSwitches(switches); err != nil {
559 for i := 0; i < n; i++ {
560 for j := i; j < n; j++ {
561 connect(switches, i, j)
568 var PanicOnAddPeerErr = false
570 // Will connect switches i and j via net.Pipe()
571 // Blocks until a conection is established.
572 // NOTE: caller ensures i and j are within bounds
573 func Connect2Switches(switches []*Switch, i, j int) {
574 switchI := switches[i]
575 switchJ := switches[j]
577 doneCh := make(chan struct{})
579 err := switchI.addPeerWithConnection(c1)
580 if PanicOnAddPeerErr && err != nil {
586 err := switchJ.addPeerWithConnection(c2)
587 if PanicOnAddPeerErr && err != nil {
596 func StartSwitches(switches []*Switch) error {
597 for _, s := range switches {
598 _, err := s.Start() // start switch and reactors
606 func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
607 privKey := crypto.GenPrivKeyEd25519()
608 // new switch, add reactors
609 // TODO: let the config be passed in?
610 s := initSwitch(i, NewSwitch(cfg, nil))
611 s.SetNodeInfo(&NodeInfo{
612 PubKey: privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
613 Moniker: cmn.Fmt("switch%d", i),
616 RemoteAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
617 ListenAddr: cmn.Fmt("%v:%v", network, rand.Intn(64512)+1023),
619 s.SetNodePrivKey(privKey)
623 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
624 peer, err := newInboundPeer(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
629 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
630 if err = sw.AddPeer(peer); err != nil {
638 func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error {
639 fullAddr := conn.RemoteAddr().String()
640 host, _, err := net.SplitHostPort(fullAddr)
645 if err = sw.checkBannedPeer(host); err != nil {
649 peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config)
653 peer.SetLogger(sw.Logger.With("peer", conn.RemoteAddr()))
654 if err = sw.AddPeer(peer); err != nil {
661 func (sw *Switch) AddBannedPeer(peer *Peer) error {
663 defer sw.mtx.Unlock()
667 key := peer.mconn.RemoteAddress.IP.String()
668 sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
669 datajson, err := json.Marshal(sw.bannedPeer)
673 sw.db.Set([]byte(bannedPeerKey), datajson)
677 func (sw *Switch) delBannedPeer(addr string) error {
678 delete(sw.bannedPeer, addr)
679 datajson, err := json.Marshal(sw.bannedPeer)
683 sw.db.Set([]byte(bannedPeerKey), datajson)
687 func (sw *Switch) checkBannedPeer(peer string) error {
689 defer sw.mtx.Unlock()
691 if banEnd, ok := sw.bannedPeer[peer]; ok {
692 if time.Now().Before(banEnd) {
693 return ErrConnectBannedPeer
695 sw.delBannedPeer(peer)