11 log "github.com/sirupsen/logrus"
12 crypto "github.com/tendermint/go-crypto"
13 cmn "github.com/tendermint/tmlibs/common"
14 dbm "github.com/tendermint/tmlibs/db"
16 cfg "github.com/bytom/config"
17 "github.com/bytom/errors"
18 "github.com/bytom/p2p/trust"
22 bannedPeerKey = "BannedPeer"
23 defaultBanDuration = time.Hour * 1
26 //pre-define errors for connecting fail
28 ErrDuplicatePeer = errors.New("Duplicate peer")
29 ErrConnectSelf = errors.New("Connect self")
30 ErrConnectBannedPeer = errors.New("Connect banned peer")
33 //-----------------------------------------------------------------------------
35 // Switch handles peer connections and exposes an API to receive incoming messages
36 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
37 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
38 // incoming messages are received on the reactor.
43 peerConfig *PeerConfig
45 reactors map[string]Reactor
46 chDescs []*ChannelDescriptor
47 reactorsByCh map[byte]Reactor
50 nodeInfo *NodeInfo // our node info
51 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
53 bannedPeer map[string]time.Time
58 // NewSwitch creates a new Switch with the given config.
59 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
62 peerConfig: DefaultPeerConfig(config),
63 reactors: make(map[string]Reactor),
64 chDescs: make([]*ChannelDescriptor, 0),
65 reactorsByCh: make(map[byte]Reactor),
67 dialing: cmn.NewCMap(),
71 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
73 // Optionally, start the pex reactor
74 if config.PexReactor {
75 sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
76 pexReactor := NewPEXReactor(sw.addrBook, sw)
77 sw.AddReactor("PEX", pexReactor)
80 sw.bannedPeer = make(map[string]time.Time)
81 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
82 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
90 // AddReactor adds the given reactor to the switch.
91 // NOTE: Not goroutine safe.
92 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
93 // Validate the reactor.
94 // No two reactors can share the same channel.
95 reactorChannels := reactor.GetChannels()
96 for _, chDesc := range reactorChannels {
98 if sw.reactorsByCh[chID] != nil {
99 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
101 sw.chDescs = append(sw.chDescs, chDesc)
102 sw.reactorsByCh[chID] = reactor
104 sw.reactors[name] = reactor
105 reactor.SetSwitch(sw)
109 // Reactors returns a map of reactors registered on the switch.
110 // NOTE: Not goroutine safe.
111 func (sw *Switch) Reactors() map[string]Reactor {
115 // Reactor returns the reactor with the given name.
116 // NOTE: Not goroutine safe.
117 func (sw *Switch) Reactor(name string) Reactor {
118 return sw.reactors[name]
121 // AddListener adds the given listener to the switch for listening to incoming peer connections.
122 // NOTE: Not goroutine safe.
123 func (sw *Switch) AddListener(l Listener) {
124 sw.listeners = append(sw.listeners, l)
127 // Listeners returns the list of listeners the switch listens on.
128 // NOTE: Not goroutine safe.
129 func (sw *Switch) Listeners() []Listener {
133 // IsListening returns true if the switch has at least one listener.
134 // NOTE: Not goroutine safe.
135 func (sw *Switch) IsListening() bool {
136 return len(sw.listeners) > 0
139 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
140 // NOTE: Not goroutine safe.
141 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
142 sw.nodeInfo = nodeInfo
145 // NodeInfo returns the switch's NodeInfo.
146 // NOTE: Not goroutine safe.
147 func (sw *Switch) NodeInfo() *NodeInfo {
151 // SetNodePrivKey sets the switch's private key for authenticated encryption.
152 // NOTE: Not goroutine safe.
153 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
154 sw.nodePrivKey = nodePrivKey
155 if sw.nodeInfo != nil {
156 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
160 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
161 func (sw *Switch) OnStart() error {
163 for _, reactor := range sw.reactors {
164 _, err := reactor.Start()
170 for _, listener := range sw.listeners {
171 go sw.listenerRoutine(listener)
176 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
177 func (sw *Switch) OnStop() {
179 for _, listener := range sw.listeners {
184 for _, peer := range sw.peers.List() {
186 sw.peers.Remove(peer)
189 for _, reactor := range sw.reactors {
194 // AddPeer performs the P2P handshake with a peer
195 // that already has a SecretConnection. If all goes well,
196 // it starts the peer and adds it to the switch.
197 // NOTE: This performs a blocking handshake before the peer is added.
198 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
199 func (sw *Switch) AddPeer(pc *peerConn) error {
200 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
202 return ErrConnectBannedPeer
204 // Check version, chain id
205 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
209 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
212 if err := sw.filterConnByPeer(peer); err != nil {
218 if err := sw.startInitPeer(peer); err != nil {
223 // Add the peer to .peers.
224 // We start it first so that a peer in the list is safe to Stop.
225 // It should not err since we already checked peers.Has()
226 if err := sw.peers.Add(peer); err != nil {
230 log.Info("Added peer:", peer)
234 func (sw *Switch) startInitPeer(peer *Peer) error {
235 peer.Start() // spawn send/recv routines
236 for _, reactor := range sw.reactors {
237 if err := reactor.AddPeer(peer); err != nil {
244 // DialSeeds a list of seeds asynchronously in random order
245 func (sw *Switch) DialSeeds(seeds []string) error {
246 netAddrs, err := NewNetAddressStrings(seeds)
251 if sw.addrBook != nil {
252 // add seeds to `addrBook`
253 ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
254 for _, netAddr := range netAddrs {
255 // do not add ourselves
256 if netAddr.Equals(ourAddr) {
259 sw.addrBook.AddAddress(netAddr, ourAddr)
265 //permute the list, dial them in random order.
266 perm := rand.Perm(len(netAddrs))
267 for i := 0; i < len(perm); i += 2 {
269 sw.dialSeed(netAddrs[j])
275 func (sw *Switch) dialSeed(addr *NetAddress) {
276 err := sw.DialPeerWithAddress(addr)
278 log.Info("Error dialing seed:", addr.String())
282 func (sw *Switch) addrBookDelSelf() error {
283 addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
287 // remove the given address from the address book if we're added it earlier
288 sw.addrBook.RemoveAddress(addr)
289 // add the given address to the address book to avoid dialing ourselves
290 // again this is our public address
291 sw.addrBook.AddOurAddress(addr)
295 func (sw *Switch) filterConnByIP(ip string) error {
296 if err := sw.checkBannedPeer(ip); err != nil {
297 return ErrConnectBannedPeer
300 if ip == sw.nodeInfo.ListenHost() {
302 return ErrConnectSelf
308 func (sw *Switch) filterConnByPeer(peer *Peer) error {
309 if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
310 return ErrConnectBannedPeer
313 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
315 return ErrConnectSelf
318 // Check for duplicate peer
319 if sw.peers.Has(peer.Key) {
320 return ErrDuplicatePeer
325 //DialPeerWithAddress dial node from net address
326 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
327 log.Debug("Dialing peer address:", addr)
329 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
333 sw.dialing.Set(addr.IP.String(), addr)
334 defer sw.dialing.Delete(addr.IP.String())
336 pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
338 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
344 log.Info("Failed to add peer:", addr, " err:", err)
348 log.Info("Dialed and added peer:", addr)
352 //IsDialing prevent duplicate dialing
353 func (sw *Switch) IsDialing(addr *NetAddress) bool {
354 return sw.dialing.Has(addr.IP.String())
357 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
358 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
359 peers := sw.peers.List()
360 for _, peer := range peers {
367 dialing = sw.dialing.Size()
371 //Peers return switch peerset
372 func (sw *Switch) Peers() *PeerSet {
376 // StopPeerForError disconnects from a peer due to external error.
377 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
378 log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
379 sw.stopAndRemovePeer(peer, reason)
382 // StopPeerGracefully disconnect from a peer gracefully.
383 func (sw *Switch) StopPeerGracefully(peer *Peer) {
384 log.Info("Stopping peer gracefully")
385 sw.stopAndRemovePeer(peer, nil)
388 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
389 for _, reactor := range sw.reactors {
390 reactor.RemovePeer(peer, reason)
392 sw.peers.Remove(peer)
396 func (sw *Switch) listenerRoutine(l Listener) {
398 inConn, ok := <-l.Connections()
403 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
404 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
405 // be double of MaxNumPeers
406 if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
408 log.Info("Ignoring inbound connection: already have enough peers.")
412 // New inbound connection!
413 err := sw.addPeerWithConnection(inConn)
415 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
421 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
422 peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
427 if err = sw.AddPeer(peerConn); err != nil {
435 //AddBannedPeer add peer to blacklist
436 func (sw *Switch) AddBannedPeer(peer *Peer) error {
438 defer sw.mtx.Unlock()
439 key := peer.NodeInfo.RemoteAddrHost()
440 sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
441 datajson, err := json.Marshal(sw.bannedPeer)
445 sw.db.Set([]byte(bannedPeerKey), datajson)
449 func (sw *Switch) delBannedPeer(addr string) error {
450 delete(sw.bannedPeer, addr)
451 datajson, err := json.Marshal(sw.bannedPeer)
455 sw.db.Set([]byte(bannedPeerKey), datajson)
459 func (sw *Switch) checkBannedPeer(peer string) error {
461 defer sw.mtx.Unlock()
463 if banEnd, ok := sw.bannedPeer[peer]; ok {
464 if time.Now().Before(banEnd) {
465 return ErrConnectBannedPeer
467 sw.delBannedPeer(peer)