10 log "github.com/sirupsen/logrus"
11 crypto "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
13 dbm "github.com/tendermint/tmlibs/db"
15 cfg "github.com/bytom/config"
16 "github.com/bytom/errors"
17 "github.com/bytom/p2p/connection"
18 "github.com/bytom/p2p/trust"
22 bannedPeerKey = "BannedPeer"
23 defaultBanDuration = time.Hour * 1
26 //pre-define errors for connecting fail
28 ErrDuplicatePeer = errors.New("Duplicate peer")
29 ErrConnectSelf = errors.New("Connect self")
30 ErrConnectBannedPeer = errors.New("Connect banned peer")
33 // An AddrBook represents an address book from the pex package, which is used to store peer addresses.
34 type AddrBook interface {
35 AddAddress(*NetAddress, *NetAddress) error
36 AddOurAddress(*NetAddress)
38 RemoveAddress(*NetAddress)
42 //-----------------------------------------------------------------------------
44 // Switch handles peer connections and exposes an API to receive incoming messages
45 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
46 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
47 // incoming messages are received on the reactor.
52 peerConfig *PeerConfig
54 reactors map[string]Reactor
55 chDescs []*connection.ChannelDescriptor
56 reactorsByCh map[byte]Reactor
59 nodeInfo *NodeInfo // our node info
60 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
62 bannedPeer map[string]time.Time
67 // NewSwitch creates a new Switch with the given config.
68 func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
71 peerConfig: DefaultPeerConfig(config),
72 reactors: make(map[string]Reactor),
73 chDescs: make([]*connection.ChannelDescriptor, 0),
74 reactorsByCh: make(map[byte]Reactor),
76 dialing: cmn.NewCMap(),
81 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
82 sw.bannedPeer = make(map[string]time.Time)
83 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
84 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
92 // AddReactor adds the given reactor to the switch.
93 // NOTE: Not goroutine safe.
94 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
95 // Validate the reactor.
96 // No two reactors can share the same channel.
97 reactorChannels := reactor.GetChannels()
98 for _, chDesc := range reactorChannels {
100 if sw.reactorsByCh[chID] != nil {
101 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
103 sw.chDescs = append(sw.chDescs, chDesc)
104 sw.reactorsByCh[chID] = reactor
106 sw.reactors[name] = reactor
107 reactor.SetSwitch(sw)
111 // Reactors returns a map of reactors registered on the switch.
112 // NOTE: Not goroutine safe.
113 func (sw *Switch) Reactors() map[string]Reactor {
117 // Reactor returns the reactor with the given name.
118 // NOTE: Not goroutine safe.
119 func (sw *Switch) Reactor(name string) Reactor {
120 return sw.reactors[name]
123 // AddListener adds the given listener to the switch for listening to incoming peer connections.
124 // NOTE: Not goroutine safe.
125 func (sw *Switch) AddListener(l Listener) {
126 sw.listeners = append(sw.listeners, l)
129 // Listeners returns the list of listeners the switch listens on.
130 // NOTE: Not goroutine safe.
131 func (sw *Switch) Listeners() []Listener {
135 // IsListening returns true if the switch has at least one listener.
136 // NOTE: Not goroutine safe.
137 func (sw *Switch) IsListening() bool {
138 return len(sw.listeners) > 0
141 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
142 // NOTE: Not goroutine safe.
143 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
144 sw.nodeInfo = nodeInfo
147 // NodeInfo returns the switch's NodeInfo.
148 // NOTE: Not goroutine safe.
149 func (sw *Switch) NodeInfo() *NodeInfo {
153 // SetNodePrivKey sets the switch's private key for authenticated encryption.
154 // NOTE: Not goroutine safe.
155 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
156 sw.nodePrivKey = nodePrivKey
157 if sw.nodeInfo != nil {
158 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
162 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
163 func (sw *Switch) OnStart() error {
165 for _, reactor := range sw.reactors {
166 _, err := reactor.Start()
172 for _, listener := range sw.listeners {
173 go sw.listenerRoutine(listener)
178 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
179 func (sw *Switch) OnStop() {
181 for _, listener := range sw.listeners {
186 for _, peer := range sw.peers.List() {
188 sw.peers.Remove(peer)
191 for _, reactor := range sw.reactors {
196 // AddPeer performs the P2P handshake with a peer
197 // that already has a SecretConnection. If all goes well,
198 // it starts the peer and adds it to the switch.
199 // NOTE: This performs a blocking handshake before the peer is added.
200 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
201 func (sw *Switch) AddPeer(pc *peerConn) error {
202 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
206 // Check version, chain id
207 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
211 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
214 if err := sw.filterConnByPeer(peer); err != nil {
220 if err := sw.startInitPeer(peer); err != nil {
225 // Add the peer to .peers.
226 // We start it first so that a peer in the list is safe to Stop.
227 // It should not err since we already checked peers.Has()
228 if err := sw.peers.Add(peer); err != nil {
232 log.Info("Added peer:", peer)
236 func (sw *Switch) startInitPeer(peer *Peer) error {
237 peer.Start() // spawn send/recv routines
238 for _, reactor := range sw.reactors {
239 if err := reactor.AddPeer(peer); err != nil {
246 func (sw *Switch) dialSeed(addr *NetAddress) {
247 err := sw.DialPeerWithAddress(addr)
249 log.Info("Error dialing seed:", addr.String())
253 func (sw *Switch) addrBookDelSelf() error {
254 addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
258 // remove the given address from the address book if we're added it earlier
259 sw.addrBook.RemoveAddress(addr)
260 // add the given address to the address book to avoid dialing ourselves
261 // again this is our public address
262 sw.addrBook.AddOurAddress(addr)
266 func (sw *Switch) filterConnByIP(ip string) error {
267 if err := sw.checkBannedPeer(ip); err != nil {
268 return ErrConnectBannedPeer
271 if ip == sw.nodeInfo.ListenHost() {
273 return ErrConnectSelf
279 func (sw *Switch) filterConnByPeer(peer *Peer) error {
280 if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
281 return ErrConnectBannedPeer
284 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
286 return ErrConnectSelf
289 // Check for duplicate peer
290 if sw.peers.Has(peer.Key) {
291 return ErrDuplicatePeer
296 //DialPeerWithAddress dial node from net address
297 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
298 log.Debug("Dialing peer address:", addr)
300 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
304 sw.dialing.Set(addr.IP.String(), addr)
305 defer sw.dialing.Delete(addr.IP.String())
307 pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
309 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
315 log.Info("Failed to add peer:", addr, " err:", err)
319 log.Info("Dialed and added peer:", addr)
323 //IsDialing prevent duplicate dialing
324 func (sw *Switch) IsDialing(addr *NetAddress) bool {
325 return sw.dialing.Has(addr.IP.String())
328 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
329 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
330 peers := sw.peers.List()
331 for _, peer := range peers {
338 dialing = sw.dialing.Size()
342 //Peers return switch peerset
343 func (sw *Switch) Peers() *PeerSet {
347 // StopPeerForError disconnects from a peer due to external error.
348 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
349 log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
350 sw.stopAndRemovePeer(peer, reason)
353 // StopPeerGracefully disconnect from a peer gracefully.
354 func (sw *Switch) StopPeerGracefully(peer *Peer) {
355 log.Info("Stopping peer gracefully")
356 sw.stopAndRemovePeer(peer, nil)
359 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
360 for _, reactor := range sw.reactors {
361 reactor.RemovePeer(peer, reason)
363 sw.peers.Remove(peer)
367 func (sw *Switch) listenerRoutine(l Listener) {
369 inConn, ok := <-l.Connections()
374 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
375 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
376 // be double of MaxNumPeers
377 if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
379 log.Info("Ignoring inbound connection: already have enough peers.")
383 // New inbound connection!
384 err := sw.addPeerWithConnection(inConn)
386 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
392 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
393 peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
398 if err = sw.AddPeer(peerConn); err != nil {
406 //AddBannedPeer add peer to blacklist
407 func (sw *Switch) AddBannedPeer(peer *Peer) error {
409 defer sw.mtx.Unlock()
410 key := peer.NodeInfo.RemoteAddrHost()
411 sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
412 datajson, err := json.Marshal(sw.bannedPeer)
416 sw.db.Set([]byte(bannedPeerKey), datajson)
420 func (sw *Switch) delBannedPeer(addr string) error {
421 delete(sw.bannedPeer, addr)
422 datajson, err := json.Marshal(sw.bannedPeer)
426 sw.db.Set([]byte(bannedPeerKey), datajson)
430 func (sw *Switch) checkBannedPeer(peer string) error {
432 defer sw.mtx.Unlock()
434 if banEnd, ok := sw.bannedPeer[peer]; ok {
435 if time.Now().Before(banEnd) {
436 return ErrConnectBannedPeer
438 sw.delBannedPeer(peer)