OSDN Git Service

Merge pull request #981 from Bytom/edit_addressbook
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/p2p/trust"
18 )
19
20 const (
21         bannedPeerKey      = "BannedPeer"
22         defaultBanDuration = time.Hour * 1
23 )
24
25 //pre-define errors for connecting fail
26 var (
27         ErrDuplicatePeer     = errors.New("Duplicate peer")
28         ErrConnectSelf       = errors.New("Connect self")
29         ErrConnectBannedPeer = errors.New("Connect banned peer")
30 )
31
32 // An AddrBook represents an address book from the pex package, which is used to store peer addresses.
33 type AddrBook interface {
34         AddAddress(*NetAddress, *NetAddress) error
35         AddOurAddress(*NetAddress)
36         MarkGood(*NetAddress)
37         RemoveAddress(*NetAddress)
38         SaveToFile() error
39 }
40
41 //-----------------------------------------------------------------------------
42
43 // Switch handles peer connections and exposes an API to receive incoming messages
44 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
45 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
46 // incoming messages are received on the reactor.
47 type Switch struct {
48         cmn.BaseService
49
50         Config       *cfg.P2PConfig
51         peerConfig   *PeerConfig
52         listeners    []Listener
53         reactors     map[string]Reactor
54         chDescs      []*ChannelDescriptor
55         reactorsByCh map[byte]Reactor
56         peers        *PeerSet
57         dialing      *cmn.CMap
58         nodeInfo     *NodeInfo             // our node info
59         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
60         addrBook     AddrBook
61         bannedPeer   map[string]time.Time
62         db           dbm.DB
63         mtx          sync.Mutex
64 }
65
66 // NewSwitch creates a new Switch with the given config.
67 func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
68         sw := &Switch{
69                 Config:       config,
70                 peerConfig:   DefaultPeerConfig(config),
71                 reactors:     make(map[string]Reactor),
72                 chDescs:      make([]*ChannelDescriptor, 0),
73                 reactorsByCh: make(map[byte]Reactor),
74                 peers:        NewPeerSet(),
75                 dialing:      cmn.NewCMap(),
76                 nodeInfo:     nil,
77                 addrBook:     addrBook,
78                 db:           trustHistoryDB,
79         }
80         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
81         sw.bannedPeer = make(map[string]time.Time)
82         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
83                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
84                         return nil
85                 }
86         }
87         trust.Init()
88         return sw
89 }
90
91 // AddReactor adds the given reactor to the switch.
92 // NOTE: Not goroutine safe.
93 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
94         // Validate the reactor.
95         // No two reactors can share the same channel.
96         reactorChannels := reactor.GetChannels()
97         for _, chDesc := range reactorChannels {
98                 chID := chDesc.ID
99                 if sw.reactorsByCh[chID] != nil {
100                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
101                 }
102                 sw.chDescs = append(sw.chDescs, chDesc)
103                 sw.reactorsByCh[chID] = reactor
104         }
105         sw.reactors[name] = reactor
106         reactor.SetSwitch(sw)
107         return reactor
108 }
109
110 // Reactors returns a map of reactors registered on the switch.
111 // NOTE: Not goroutine safe.
112 func (sw *Switch) Reactors() map[string]Reactor {
113         return sw.reactors
114 }
115
116 // Reactor returns the reactor with the given name.
117 // NOTE: Not goroutine safe.
118 func (sw *Switch) Reactor(name string) Reactor {
119         return sw.reactors[name]
120 }
121
122 // AddListener adds the given listener to the switch for listening to incoming peer connections.
123 // NOTE: Not goroutine safe.
124 func (sw *Switch) AddListener(l Listener) {
125         sw.listeners = append(sw.listeners, l)
126 }
127
128 // Listeners returns the list of listeners the switch listens on.
129 // NOTE: Not goroutine safe.
130 func (sw *Switch) Listeners() []Listener {
131         return sw.listeners
132 }
133
134 // IsListening returns true if the switch has at least one listener.
135 // NOTE: Not goroutine safe.
136 func (sw *Switch) IsListening() bool {
137         return len(sw.listeners) > 0
138 }
139
140 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
141 // NOTE: Not goroutine safe.
142 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
143         sw.nodeInfo = nodeInfo
144 }
145
146 // NodeInfo returns the switch's NodeInfo.
147 // NOTE: Not goroutine safe.
148 func (sw *Switch) NodeInfo() *NodeInfo {
149         return sw.nodeInfo
150 }
151
152 // SetNodePrivKey sets the switch's private key for authenticated encryption.
153 // NOTE: Not goroutine safe.
154 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
155         sw.nodePrivKey = nodePrivKey
156         if sw.nodeInfo != nil {
157                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
158         }
159 }
160
161 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
162 func (sw *Switch) OnStart() error {
163         // Start reactors
164         for _, reactor := range sw.reactors {
165                 _, err := reactor.Start()
166                 if err != nil {
167                         return err
168                 }
169         }
170         // Start listeners
171         for _, listener := range sw.listeners {
172                 go sw.listenerRoutine(listener)
173         }
174         return nil
175 }
176
177 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
178 func (sw *Switch) OnStop() {
179         // Stop listeners
180         for _, listener := range sw.listeners {
181                 listener.Stop()
182         }
183         sw.listeners = nil
184         // Stop peers
185         for _, peer := range sw.peers.List() {
186                 peer.Stop()
187                 sw.peers.Remove(peer)
188         }
189         // Stop reactors
190         for _, reactor := range sw.reactors {
191                 reactor.Stop()
192         }
193 }
194
195 // AddPeer performs the P2P handshake with a peer
196 // that already has a SecretConnection. If all goes well,
197 // it starts the peer and adds it to the switch.
198 // NOTE: This performs a blocking handshake before the peer is added.
199 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
200 func (sw *Switch) AddPeer(pc *peerConn) error {
201         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
202         if err != nil {
203                 return ErrConnectBannedPeer
204         }
205         // Check version, chain id
206         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
207                 return err
208         }
209
210         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
211
212         //filter peer
213         if err := sw.filterConnByPeer(peer); err != nil {
214                 return err
215         }
216
217         // Start peer
218         if sw.IsRunning() {
219                 if err := sw.startInitPeer(peer); err != nil {
220                         return err
221                 }
222         }
223
224         // Add the peer to .peers.
225         // We start it first so that a peer in the list is safe to Stop.
226         // It should not err since we already checked peers.Has()
227         if err := sw.peers.Add(peer); err != nil {
228                 return err
229         }
230
231         log.Info("Added peer:", peer)
232         return nil
233 }
234
235 func (sw *Switch) startInitPeer(peer *Peer) error {
236         peer.Start() // spawn send/recv routines
237         for _, reactor := range sw.reactors {
238                 if err := reactor.AddPeer(peer); err != nil {
239                         return err
240                 }
241         }
242         return nil
243 }
244
245 func (sw *Switch) dialSeed(addr *NetAddress) {
246         err := sw.DialPeerWithAddress(addr)
247         if err != nil {
248                 log.Info("Error dialing seed:", addr.String())
249         }
250 }
251
252 func (sw *Switch) addrBookDelSelf() error {
253         addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
254         if err != nil {
255                 return err
256         }
257         // remove the given address from the address book if we're added it earlier
258         sw.addrBook.RemoveAddress(addr)
259         // add the given address to the address book to avoid dialing ourselves
260         // again this is our public address
261         sw.addrBook.AddOurAddress(addr)
262         return nil
263 }
264
265 func (sw *Switch) filterConnByIP(ip string) error {
266         if err := sw.checkBannedPeer(ip); err != nil {
267                 return ErrConnectBannedPeer
268         }
269
270         if ip == sw.nodeInfo.ListenHost() {
271                 sw.addrBookDelSelf()
272                 return ErrConnectSelf
273         }
274
275         return nil
276 }
277
278 func (sw *Switch) filterConnByPeer(peer *Peer) error {
279         if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
280                 return ErrConnectBannedPeer
281         }
282
283         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
284                 sw.addrBookDelSelf()
285                 return ErrConnectSelf
286         }
287
288         // Check for duplicate peer
289         if sw.peers.Has(peer.Key) {
290                 return ErrDuplicatePeer
291         }
292         return nil
293 }
294
295 //DialPeerWithAddress dial node from net address
296 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
297         log.Debug("Dialing peer address:", addr)
298
299         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
300                 return err
301         }
302
303         sw.dialing.Set(addr.IP.String(), addr)
304         defer sw.dialing.Delete(addr.IP.String())
305
306         pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
307         if err != nil {
308                 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
309                 return err
310         }
311
312         err = sw.AddPeer(pc)
313         if err != nil {
314                 log.Info("Failed to add peer:", addr, " err:", err)
315                 pc.CloseConn()
316                 return err
317         }
318         log.Info("Dialed and added peer:", addr)
319         return nil
320 }
321
322 //IsDialing prevent duplicate dialing
323 func (sw *Switch) IsDialing(addr *NetAddress) bool {
324         return sw.dialing.Has(addr.IP.String())
325 }
326
327 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
328 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
329         peers := sw.peers.List()
330         for _, peer := range peers {
331                 if peer.outbound {
332                         outbound++
333                 } else {
334                         inbound++
335                 }
336         }
337         dialing = sw.dialing.Size()
338         return
339 }
340
341 //Peers return switch peerset
342 func (sw *Switch) Peers() *PeerSet {
343         return sw.peers
344 }
345
346 // StopPeerForError disconnects from a peer due to external error.
347 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
348         log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
349         sw.stopAndRemovePeer(peer, reason)
350 }
351
352 // StopPeerGracefully disconnect from a peer gracefully.
353 func (sw *Switch) StopPeerGracefully(peer *Peer) {
354         log.Info("Stopping peer gracefully")
355         sw.stopAndRemovePeer(peer, nil)
356 }
357
358 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
359         for _, reactor := range sw.reactors {
360                 reactor.RemovePeer(peer, reason)
361         }
362         sw.peers.Remove(peer)
363         peer.Stop()
364 }
365
366 func (sw *Switch) listenerRoutine(l Listener) {
367         for {
368                 inConn, ok := <-l.Connections()
369                 if !ok {
370                         break
371                 }
372
373                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
374                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
375                 // be double of MaxNumPeers
376                 if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
377                         inConn.Close()
378                         log.Info("Ignoring inbound connection: already have enough peers.")
379                         continue
380                 }
381
382                 // New inbound connection!
383                 err := sw.addPeerWithConnection(inConn)
384                 if err != nil {
385                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
386                         continue
387                 }
388         }
389 }
390
391 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
392         peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
393         if err != nil {
394                 conn.Close()
395                 return err
396         }
397         if err = sw.AddPeer(peerConn); err != nil {
398                 conn.Close()
399                 return err
400         }
401
402         return nil
403 }
404
405 //AddBannedPeer add peer to blacklist
406 func (sw *Switch) AddBannedPeer(peer *Peer) error {
407         sw.mtx.Lock()
408         defer sw.mtx.Unlock()
409         key := peer.NodeInfo.RemoteAddrHost()
410         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
411         datajson, err := json.Marshal(sw.bannedPeer)
412         if err != nil {
413                 return err
414         }
415         sw.db.Set([]byte(bannedPeerKey), datajson)
416         return nil
417 }
418
419 func (sw *Switch) delBannedPeer(addr string) error {
420         delete(sw.bannedPeer, addr)
421         datajson, err := json.Marshal(sw.bannedPeer)
422         if err != nil {
423                 return err
424         }
425         sw.db.Set([]byte(bannedPeerKey), datajson)
426         return nil
427 }
428
429 func (sw *Switch) checkBannedPeer(peer string) error {
430         sw.mtx.Lock()
431         defer sw.mtx.Unlock()
432
433         if banEnd, ok := sw.bannedPeer[peer]; ok {
434                 if time.Now().Before(banEnd) {
435                         return ErrConnectBannedPeer
436                 }
437                 sw.delBannedPeer(peer)
438         }
439         return nil
440 }