11 log "github.com/sirupsen/logrus"
12 "github.com/tendermint/go-crypto"
13 cmn "github.com/tendermint/tmlibs/common"
14 dbm "github.com/tendermint/tmlibs/db"
16 cfg "github.com/bytom/config"
17 "github.com/bytom/consensus"
18 "github.com/bytom/crypto/ed25519"
19 "github.com/bytom/errors"
20 "github.com/bytom/p2p/connection"
21 "github.com/bytom/p2p/discover"
22 "github.com/bytom/p2p/netutil"
23 "github.com/bytom/p2p/trust"
24 "github.com/bytom/version"
28 bannedPeerKey = "BannedPeer"
29 defaultBanDuration = time.Hour * 1
32 minNumOutboundPeers = 4
35 //pre-define errors for connecting fail
37 ErrDuplicatePeer = errors.New("Duplicate peer")
38 ErrConnectSelf = errors.New("Connect self")
39 ErrConnectBannedPeer = errors.New("Connect banned peer")
40 ErrConnectSpvPeer = errors.New("Outbound connect spv peer")
43 type discv interface {
44 ReadRandomNodes(buf []*discover.Node) (n int)
47 // Switch handles peer connections and exposes an API to receive incoming messages
48 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
49 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
50 // incoming messages are received on the reactor.
55 peerConfig *PeerConfig
57 reactors map[string]Reactor
58 chDescs []*connection.ChannelDescriptor
59 reactorsByCh map[byte]Reactor
62 nodeInfo *NodeInfo // our node info
63 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
65 bannedPeer map[string]time.Time
70 // NewSwitch create a new Switch and set discover.
71 func NewSwitch(config *cfg.Config) (*Switch, error) {
75 var discv *discover.Network
77 blacklistDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
78 config.P2P.PrivateKey, err = config.NodeKey()
83 bytes, err := hex.DecodeString(config.P2P.PrivateKey)
89 copy(newKey[:], bytes)
90 privKey := crypto.PrivKeyEd25519(newKey)
91 if !config.VaultMode {
93 l, listenAddr = GetListener(config.P2P)
94 discv, err = discover.NewDiscover(config, ed25519.PrivateKey(bytes), l.ExternalAddress().Port)
100 return newSwitch(config, discv, blacklistDB, l, privKey, listenAddr)
103 // newSwitch creates a new Switch with the given config.
104 func newSwitch(config *cfg.Config, discv discv, blacklistDB dbm.DB, l Listener, priv crypto.PrivKeyEd25519, listenAddr string) (*Switch, error) {
107 peerConfig: DefaultPeerConfig(config.P2P),
108 reactors: make(map[string]Reactor),
109 chDescs: make([]*connection.ChannelDescriptor, 0),
110 reactorsByCh: make(map[byte]Reactor),
112 dialing: cmn.NewCMap(),
116 nodeInfo: NewNodeInfo(config, priv.PubKey().Unwrap().(crypto.PubKeyEd25519), listenAddr),
117 bannedPeer: make(map[string]time.Time),
119 if err := sw.loadBannedPeers(); err != nil {
124 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
129 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
130 func (sw *Switch) OnStart() error {
131 for _, reactor := range sw.reactors {
132 if _, err := reactor.Start(); err != nil {
136 for _, listener := range sw.listeners {
137 go sw.listenerRoutine(listener)
139 go sw.ensureOutboundPeersRoutine()
143 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
144 func (sw *Switch) OnStop() {
145 for _, listener := range sw.listeners {
150 for _, peer := range sw.peers.List() {
152 sw.peers.Remove(peer)
155 for _, reactor := range sw.reactors {
160 //AddBannedPeer add peer to blacklist
161 func (sw *Switch) AddBannedPeer(ip string) error {
163 defer sw.mtx.Unlock()
165 sw.bannedPeer[ip] = time.Now().Add(defaultBanDuration)
166 dataJSON, err := json.Marshal(sw.bannedPeer)
171 sw.db.Set([]byte(bannedPeerKey), dataJSON)
175 // AddPeer performs the P2P handshake with a peer
176 // that already has a SecretConnection. If all goes well,
177 // it starts the peer and adds it to the switch.
178 // NOTE: This performs a blocking handshake before the peer is added.
179 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
180 func (sw *Switch) AddPeer(pc *peerConn) error {
181 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, sw.peerConfig.HandshakeTimeout)
186 if err := version.Status.CheckUpdate(sw.nodeInfo.Version, peerNodeInfo.Version, peerNodeInfo.RemoteAddr); err != nil {
189 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
193 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
194 if err := sw.filterConnByPeer(peer); err != nil {
198 if pc.outbound && !peer.ServiceFlag().IsEnable(consensus.SFFullNode) {
199 return ErrConnectSpvPeer
204 if err := sw.startInitPeer(peer); err != nil {
209 return sw.peers.Add(peer)
212 // AddReactor adds the given reactor to the switch.
213 // NOTE: Not goroutine safe.
214 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
215 // Validate the reactor.
216 // No two reactors can share the same channel.
217 for _, chDesc := range reactor.GetChannels() {
219 if sw.reactorsByCh[chID] != nil {
220 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
222 sw.chDescs = append(sw.chDescs, chDesc)
223 sw.reactorsByCh[chID] = reactor
225 sw.reactors[name] = reactor
226 reactor.SetSwitch(sw)
230 // AddListener adds the given listener to the switch for listening to incoming peer connections.
231 // NOTE: Not goroutine safe.
232 func (sw *Switch) AddListener(l Listener) {
233 sw.listeners = append(sw.listeners, l)
236 //DialPeerWithAddress dial node from net address
237 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
238 log.WithFields(log.Fields{"module": logModule, "address": addr}).Debug("Dialing peer")
239 sw.dialing.Set(addr.IP.String(), addr)
240 defer sw.dialing.Delete(addr.IP.String())
241 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
245 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
247 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on newOutboundPeerConn")
251 if err = sw.AddPeer(pc); err != nil {
252 log.WithFields(log.Fields{"module": logModule, "address": addr, " err": err}).Error("DialPeer fail on switch AddPeer")
256 log.WithFields(log.Fields{"module": logModule, "address": addr, "peer num": sw.peers.Size()}).Debug("DialPeer added peer")
260 //IsDialing prevent duplicate dialing
261 func (sw *Switch) IsDialing(addr *NetAddress) bool {
262 return sw.dialing.Has(addr.IP.String())
265 // IsListening returns true if the switch has at least one listener.
266 // NOTE: Not goroutine safe.
267 func (sw *Switch) IsListening() bool {
268 return len(sw.listeners) > 0
271 // loadBannedPeers load banned peers from db
272 func (sw *Switch) loadBannedPeers() error {
273 if dataJSON := sw.db.Get([]byte(bannedPeerKey)); dataJSON != nil {
274 if err := json.Unmarshal(dataJSON, &sw.bannedPeer); err != nil {
282 // Listeners returns the list of listeners the switch listens on.
283 // NOTE: Not goroutine safe.
284 func (sw *Switch) Listeners() []Listener {
288 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
289 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
290 peers := sw.peers.List()
291 for _, peer := range peers {
298 dialing = sw.dialing.Size()
302 // NodeInfo returns the switch's NodeInfo.
303 // NOTE: Not goroutine safe.
304 func (sw *Switch) NodeInfo() *NodeInfo {
308 //Peers return switch peerset
309 func (sw *Switch) Peers() *PeerSet {
313 // StopPeerForError disconnects from a peer due to external error.
314 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
315 log.WithFields(log.Fields{"module": logModule, "peer": peer, " err": reason}).Debug("stopping peer for error")
316 sw.stopAndRemovePeer(peer, reason)
319 // StopPeerGracefully disconnect from a peer gracefully.
320 func (sw *Switch) StopPeerGracefully(peerID string) {
321 if peer := sw.peers.Get(peerID); peer != nil {
322 sw.stopAndRemovePeer(peer, nil)
326 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
327 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
329 if err := conn.Close(); err != nil {
330 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
335 if err = sw.AddPeer(peerConn); err != nil {
336 if err := conn.Close(); err != nil {
337 log.WithFields(log.Fields{"module": logModule, "remote peer:": conn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
342 log.WithFields(log.Fields{"module": logModule, "address": conn.RemoteAddr().String(), "peer num": sw.peers.Size()}).Debug("add inbound peer")
346 func (sw *Switch) checkBannedPeer(peer string) error {
348 defer sw.mtx.Unlock()
350 if banEnd, ok := sw.bannedPeer[peer]; ok {
351 if time.Now().Before(banEnd) {
352 return ErrConnectBannedPeer
355 if err := sw.delBannedPeer(peer); err != nil {
362 func (sw *Switch) delBannedPeer(addr string) error {
364 defer sw.mtx.Unlock()
366 delete(sw.bannedPeer, addr)
367 datajson, err := json.Marshal(sw.bannedPeer)
372 sw.db.Set([]byte(bannedPeerKey), datajson)
376 func (sw *Switch) filterConnByIP(ip string) error {
377 if ip == sw.nodeInfo.listenHost() {
378 return ErrConnectSelf
380 return sw.checkBannedPeer(ip)
383 func (sw *Switch) filterConnByPeer(peer *Peer) error {
384 if err := sw.checkBannedPeer(peer.remoteAddrHost()); err != nil {
388 if sw.nodeInfo.getPubkey().Equals(peer.PubKey().Wrap()) {
389 return ErrConnectSelf
392 if sw.peers.Has(peer.Key) {
393 return ErrDuplicatePeer
398 func (sw *Switch) listenerRoutine(l Listener) {
400 inConn, ok := <-l.Connections()
405 // disconnect if we alrady have MaxNumPeers
406 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers {
407 if err := inConn.Close(); err != nil {
408 log.WithFields(log.Fields{"module": logModule, "remote peer:": inConn.RemoteAddr().String(), " err:": err}).Error("closes connection err")
410 log.Info("Ignoring inbound connection: already have enough peers.")
414 // New inbound connection!
415 if err := sw.addPeerWithConnection(inConn); err != nil {
416 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
422 func (sw *Switch) dialPeerWorker(a *NetAddress, wg *sync.WaitGroup) {
423 if err := sw.DialPeerWithAddress(a); err != nil {
424 log.WithFields(log.Fields{"module": logModule, "addr": a, "err": err}).Error("dialPeerWorker fail on dial peer")
429 func (sw *Switch) ensureKeepConnectPeers() {
430 keepDials := netutil.CheckAndSplitAddresses(sw.Config.P2P.KeepDial)
431 connectedPeers := make(map[string]struct{})
432 for _, peer := range sw.Peers().List() {
433 connectedPeers[peer.remoteAddrHost()] = struct{}{}
436 var wg sync.WaitGroup
437 for _, keepDial := range keepDials {
438 try, err := NewNetAddressString(keepDial)
440 log.WithFields(log.Fields{"module": logModule, "err": err, "address": keepDial}).Warn("parse address to NetAddress")
444 if sw.NodeInfo().ListenAddr == try.String() {
447 if dialling := sw.IsDialing(try); dialling {
450 if _, ok := connectedPeers[try.IP.String()]; ok {
455 go sw.dialPeerWorker(try, &wg)
460 func (sw *Switch) ensureOutboundPeers() {
461 numOutPeers, _, numDialing := sw.NumPeers()
462 numToDial := (minNumOutboundPeers - (numOutPeers + numDialing))
463 log.WithFields(log.Fields{"module": logModule, "numOutPeers": numOutPeers, "numDialing": numDialing, "numToDial": numToDial}).Debug("ensure peers")
468 connectedPeers := make(map[string]struct{})
469 for _, peer := range sw.Peers().List() {
470 connectedPeers[peer.remoteAddrHost()] = struct{}{}
473 var wg sync.WaitGroup
474 nodes := make([]*discover.Node, numToDial)
475 n := sw.discv.ReadRandomNodes(nodes)
476 for i := 0; i < n; i++ {
477 try := NewNetAddressIPPort(nodes[i].IP, nodes[i].TCP)
478 if sw.NodeInfo().ListenAddr == try.String() {
481 if dialling := sw.IsDialing(try); dialling {
484 if _, ok := connectedPeers[try.IP.String()]; ok {
489 go sw.dialPeerWorker(try, &wg)
494 func (sw *Switch) ensureOutboundPeersRoutine() {
495 sw.ensureKeepConnectPeers()
496 sw.ensureOutboundPeers()
498 ticker := time.NewTicker(10 * time.Second)
504 sw.ensureKeepConnectPeers()
505 sw.ensureOutboundPeers()
512 func (sw *Switch) startInitPeer(peer *Peer) error {
513 // spawn send/recv routines
514 if _, err := peer.Start(); err != nil {
515 log.WithFields(log.Fields{"module": logModule, "remote peer:": peer.RemoteAddr, " err:": err}).Error("init peer err")
518 for _, reactor := range sw.reactors {
519 if err := reactor.AddPeer(peer); err != nil {
526 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
527 sw.peers.Remove(peer)
528 for _, reactor := range sw.reactors {
529 reactor.RemovePeer(peer, reason)
533 sentStatus, receivedStatus := peer.TrafficStatus()
534 log.WithFields(log.Fields{
536 "address": peer.Addr().String(),
538 "duration": sentStatus.Duration.String(),
539 "total_sent": sentStatus.Bytes,
540 "total_received": receivedStatus.Bytes,
541 "average_sent_rate": sentStatus.AvgRate,
542 "average_received_rate": receivedStatus.AvgRate,
543 "peer num": sw.peers.Size(),
544 }).Info("disconnect with peer")