10 log "github.com/sirupsen/logrus"
11 "github.com/tendermint/go-crypto"
12 cmn "github.com/tendermint/tmlibs/common"
13 dbm "github.com/tendermint/tmlibs/db"
15 cfg "github.com/bytom/config"
16 "github.com/bytom/errors"
17 "github.com/bytom/p2p/connection"
18 "github.com/bytom/p2p/trust"
22 bannedPeerKey = "BannedPeer"
23 defaultBanDuration = time.Hour * 1
26 //pre-define errors for connecting fail
28 ErrDuplicatePeer = errors.New("Duplicate peer")
29 ErrConnectSelf = errors.New("Connect self")
30 ErrConnectBannedPeer = errors.New("Connect banned peer")
33 // Switch handles peer connections and exposes an API to receive incoming messages
34 // on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
35 // or more `Channels`. So while sending outgoing messages is typically performed on the peer,
36 // incoming messages are received on the reactor.
41 peerConfig *PeerConfig
43 reactors map[string]Reactor
44 chDescs []*connection.ChannelDescriptor
45 reactorsByCh map[byte]Reactor
48 nodeInfo *NodeInfo // our node info
49 nodePrivKey crypto.PrivKeyEd25519 // our node privkey
50 bannedPeer map[string]time.Time
55 // NewSwitch creates a new Switch with the given config.
56 func NewSwitch(config *cfg.Config) *Switch {
59 peerConfig: DefaultPeerConfig(config.P2P),
60 reactors: make(map[string]Reactor),
61 chDescs: make([]*connection.ChannelDescriptor, 0),
62 reactorsByCh: make(map[byte]Reactor),
64 dialing: cmn.NewCMap(),
66 db: dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
68 sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
69 sw.bannedPeer = make(map[string]time.Time)
70 if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
71 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
79 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
80 func (sw *Switch) OnStart() error {
81 for _, reactor := range sw.reactors {
82 if _, err := reactor.Start(); err != nil {
86 for _, listener := range sw.listeners {
87 go sw.listenerRoutine(listener)
92 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
93 func (sw *Switch) OnStop() {
94 for _, listener := range sw.listeners {
99 for _, peer := range sw.peers.List() {
101 sw.peers.Remove(peer)
104 for _, reactor := range sw.reactors {
109 //AddBannedPeer add peer to blacklist
110 func (sw *Switch) AddBannedPeer(peer *Peer) error {
112 defer sw.mtx.Unlock()
114 key := peer.NodeInfo.RemoteAddrHost()
115 sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
116 datajson, err := json.Marshal(sw.bannedPeer)
121 sw.db.Set([]byte(bannedPeerKey), datajson)
125 // AddPeer performs the P2P handshake with a peer
126 // that already has a SecretConnection. If all goes well,
127 // it starts the peer and adds it to the switch.
128 // NOTE: This performs a blocking handshake before the peer is added.
129 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
130 func (sw *Switch) AddPeer(pc *peerConn) error {
131 peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
136 if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
140 peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
141 if err := sw.filterConnByPeer(peer); err != nil {
147 if err := sw.startInitPeer(peer); err != nil {
151 return sw.peers.Add(peer)
154 // AddReactor adds the given reactor to the switch.
155 // NOTE: Not goroutine safe.
156 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
157 // Validate the reactor.
158 // No two reactors can share the same channel.
159 for _, chDesc := range reactor.GetChannels() {
161 if sw.reactorsByCh[chID] != nil {
162 cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
164 sw.chDescs = append(sw.chDescs, chDesc)
165 sw.reactorsByCh[chID] = reactor
167 sw.reactors[name] = reactor
168 reactor.SetSwitch(sw)
172 // AddListener adds the given listener to the switch for listening to incoming peer connections.
173 // NOTE: Not goroutine safe.
174 func (sw *Switch) AddListener(l Listener) {
175 sw.listeners = append(sw.listeners, l)
178 //DialPeerWithAddress dial node from net address
179 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
180 log.Debug("Dialing peer address:", addr)
181 sw.dialing.Set(addr.IP.String(), addr)
182 defer sw.dialing.Delete(addr.IP.String())
183 if err := sw.filterConnByIP(addr.IP.String()); err != nil {
187 pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
189 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
193 if err = sw.AddPeer(pc); err != nil {
194 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
198 log.Debug("DialPeer added peer:", addr)
202 //IsDialing prevent duplicate dialing
203 func (sw *Switch) IsDialing(addr *NetAddress) bool {
204 return sw.dialing.Has(addr.IP.String())
207 // IsListening returns true if the switch has at least one listener.
208 // NOTE: Not goroutine safe.
209 func (sw *Switch) IsListening() bool {
210 return len(sw.listeners) > 0
213 // Listeners returns the list of listeners the switch listens on.
214 // NOTE: Not goroutine safe.
215 func (sw *Switch) Listeners() []Listener {
219 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
220 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
221 peers := sw.peers.List()
222 for _, peer := range peers {
229 dialing = sw.dialing.Size()
233 // NodeInfo returns the switch's NodeInfo.
234 // NOTE: Not goroutine safe.
235 func (sw *Switch) NodeInfo() *NodeInfo {
239 //Peers return switch peerset
240 func (sw *Switch) Peers() *PeerSet {
244 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
245 // NOTE: Not goroutine safe.
246 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
247 sw.nodeInfo = nodeInfo
250 // SetNodePrivKey sets the switch's private key for authenticated encryption.
251 // NOTE: Not goroutine safe.
252 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
253 sw.nodePrivKey = nodePrivKey
254 if sw.nodeInfo != nil {
255 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
259 // StopPeerForError disconnects from a peer due to external error.
260 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
261 log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
262 sw.stopAndRemovePeer(peer, reason)
265 // StopPeerGracefully disconnect from a peer gracefully.
266 func (sw *Switch) StopPeerGracefully(peer *Peer) {
267 sw.stopAndRemovePeer(peer, nil)
270 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
271 peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
277 if err = sw.AddPeer(peerConn); err != nil {
284 func (sw *Switch) checkBannedPeer(peer string) error {
286 defer sw.mtx.Unlock()
288 if banEnd, ok := sw.bannedPeer[peer]; ok {
289 if time.Now().Before(banEnd) {
290 return ErrConnectBannedPeer
292 sw.delBannedPeer(peer)
297 func (sw *Switch) delBannedPeer(addr string) error {
299 defer sw.mtx.Unlock()
301 delete(sw.bannedPeer, addr)
302 datajson, err := json.Marshal(sw.bannedPeer)
307 sw.db.Set([]byte(bannedPeerKey), datajson)
311 func (sw *Switch) filterConnByIP(ip string) error {
312 if ip == sw.nodeInfo.ListenHost() {
313 return ErrConnectSelf
315 return sw.checkBannedPeer(ip)
318 func (sw *Switch) filterConnByPeer(peer *Peer) error {
319 if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
323 if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
324 return ErrConnectSelf
327 if sw.peers.Has(peer.Key) {
328 return ErrDuplicatePeer
333 func (sw *Switch) listenerRoutine(l Listener) {
335 inConn, ok := <-l.Connections()
340 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
341 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
342 // be double of MaxNumPeers
343 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers*2 {
345 log.Info("Ignoring inbound connection: already have enough peers.")
349 // New inbound connection!
350 if err := sw.addPeerWithConnection(inConn); err != nil {
351 log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
357 func (sw *Switch) startInitPeer(peer *Peer) error {
358 peer.Start() // spawn send/recv routines
359 for _, reactor := range sw.reactors {
360 if err := reactor.AddPeer(peer); err != nil {
367 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
368 for _, reactor := range sw.reactors {
369 reactor.RemovePeer(peer, reason)
371 sw.peers.Remove(peer)