OSDN Git Service

Merge branch 'dev' into edit_addressbook
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "math/rand"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         crypto "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14         dbm "github.com/tendermint/tmlibs/db"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/errors"
18         "github.com/bytom/p2p/trust"
19 )
20
21 const (
22         bannedPeerKey      = "BannedPeer"
23         defaultBanDuration = time.Hour * 1
24 )
25
26 //pre-define errors for connecting fail
27 var (
28         ErrDuplicatePeer     = errors.New("Duplicate peer")
29         ErrConnectSelf       = errors.New("Connect self")
30         ErrConnectBannedPeer = errors.New("Connect banned peer")
31 )
32
33 //-----------------------------------------------------------------------------
34
35 // Switch handles peer connections and exposes an API to receive incoming messages
36 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
37 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
38 // incoming messages are received on the reactor.
39 type Switch struct {
40         cmn.BaseService
41
42         config       *cfg.P2PConfig
43         peerConfig   *PeerConfig
44         listeners    []Listener
45         reactors     map[string]Reactor
46         chDescs      []*ChannelDescriptor
47         reactorsByCh map[byte]Reactor
48         peers        *PeerSet
49         dialing      *cmn.CMap
50         nodeInfo     *NodeInfo             // our node info
51         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
52         addrBook     *AddrBook
53         bannedPeer   map[string]time.Time
54         db           dbm.DB
55         mtx          sync.Mutex
56 }
57
58 // NewSwitch creates a new Switch with the given config.
59 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
60         sw := &Switch{
61                 config:       config,
62                 peerConfig:   DefaultPeerConfig(config),
63                 reactors:     make(map[string]Reactor),
64                 chDescs:      make([]*ChannelDescriptor, 0),
65                 reactorsByCh: make(map[byte]Reactor),
66                 peers:        NewPeerSet(),
67                 dialing:      cmn.NewCMap(),
68                 nodeInfo:     nil,
69                 db:           trustHistoryDB,
70         }
71         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
72
73         // Optionally, start the pex reactor
74         if config.PexReactor {
75                 sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
76                 pexReactor := NewPEXReactor(sw.addrBook, sw)
77                 sw.AddReactor("PEX", pexReactor)
78         }
79
80         sw.bannedPeer = make(map[string]time.Time)
81         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
82                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
83                         return nil
84                 }
85         }
86         trust.Init()
87         return sw
88 }
89
90 // AddReactor adds the given reactor to the switch.
91 // NOTE: Not goroutine safe.
92 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
93         // Validate the reactor.
94         // No two reactors can share the same channel.
95         reactorChannels := reactor.GetChannels()
96         for _, chDesc := range reactorChannels {
97                 chID := chDesc.ID
98                 if sw.reactorsByCh[chID] != nil {
99                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
100                 }
101                 sw.chDescs = append(sw.chDescs, chDesc)
102                 sw.reactorsByCh[chID] = reactor
103         }
104         sw.reactors[name] = reactor
105         reactor.SetSwitch(sw)
106         return reactor
107 }
108
109 // Reactors returns a map of reactors registered on the switch.
110 // NOTE: Not goroutine safe.
111 func (sw *Switch) Reactors() map[string]Reactor {
112         return sw.reactors
113 }
114
115 // Reactor returns the reactor with the given name.
116 // NOTE: Not goroutine safe.
117 func (sw *Switch) Reactor(name string) Reactor {
118         return sw.reactors[name]
119 }
120
121 // AddListener adds the given listener to the switch for listening to incoming peer connections.
122 // NOTE: Not goroutine safe.
123 func (sw *Switch) AddListener(l Listener) {
124         sw.listeners = append(sw.listeners, l)
125 }
126
127 // Listeners returns the list of listeners the switch listens on.
128 // NOTE: Not goroutine safe.
129 func (sw *Switch) Listeners() []Listener {
130         return sw.listeners
131 }
132
133 // IsListening returns true if the switch has at least one listener.
134 // NOTE: Not goroutine safe.
135 func (sw *Switch) IsListening() bool {
136         return len(sw.listeners) > 0
137 }
138
139 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
140 // NOTE: Not goroutine safe.
141 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
142         sw.nodeInfo = nodeInfo
143 }
144
145 // NodeInfo returns the switch's NodeInfo.
146 // NOTE: Not goroutine safe.
147 func (sw *Switch) NodeInfo() *NodeInfo {
148         return sw.nodeInfo
149 }
150
151 // SetNodePrivKey sets the switch's private key for authenticated encryption.
152 // NOTE: Not goroutine safe.
153 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
154         sw.nodePrivKey = nodePrivKey
155         if sw.nodeInfo != nil {
156                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
157         }
158 }
159
160 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
161 func (sw *Switch) OnStart() error {
162         // Start reactors
163         for _, reactor := range sw.reactors {
164                 _, err := reactor.Start()
165                 if err != nil {
166                         return err
167                 }
168         }
169         // Start listeners
170         for _, listener := range sw.listeners {
171                 go sw.listenerRoutine(listener)
172         }
173         return nil
174 }
175
176 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
177 func (sw *Switch) OnStop() {
178         // Stop listeners
179         for _, listener := range sw.listeners {
180                 listener.Stop()
181         }
182         sw.listeners = nil
183         // Stop peers
184         for _, peer := range sw.peers.List() {
185                 peer.Stop()
186                 sw.peers.Remove(peer)
187         }
188         // Stop reactors
189         for _, reactor := range sw.reactors {
190                 reactor.Stop()
191         }
192 }
193
194 // AddPeer performs the P2P handshake with a peer
195 // that already has a SecretConnection. If all goes well,
196 // it starts the peer and adds it to the switch.
197 // NOTE: This performs a blocking handshake before the peer is added.
198 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
199 func (sw *Switch) AddPeer(pc *peerConn) error {
200         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
201         if err != nil {
202                 return ErrConnectBannedPeer
203         }
204         // Check version, chain id
205         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
206                 return err
207         }
208
209         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
210
211         //filter peer
212         if err := sw.filterConnByPeer(peer); err != nil {
213                 return err
214         }
215
216         // Start peer
217         if sw.IsRunning() {
218                 if err := sw.startInitPeer(peer); err != nil {
219                         return err
220                 }
221         }
222
223         // Add the peer to .peers.
224         // We start it first so that a peer in the list is safe to Stop.
225         // It should not err since we already checked peers.Has()
226         if err := sw.peers.Add(peer); err != nil {
227                 return err
228         }
229
230         log.Info("Added peer:", peer)
231         return nil
232 }
233
234 func (sw *Switch) startInitPeer(peer *Peer) error {
235         peer.Start() // spawn send/recv routines
236         for _, reactor := range sw.reactors {
237                 if err := reactor.AddPeer(peer); err != nil {
238                         return err
239                 }
240         }
241         return nil
242 }
243
244 // DialSeeds a list of seeds asynchronously in random order
245 func (sw *Switch) DialSeeds(seeds []string) error {
246         netAddrs, err := NewNetAddressStrings(seeds)
247         if err != nil {
248                 return err
249         }
250
251         if sw.addrBook != nil {
252                 // add seeds to `addrBook`
253                 ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
254                 for _, netAddr := range netAddrs {
255                         // do not add ourselves
256                         if netAddr.Equals(ourAddr) {
257                                 continue
258                         }
259                         sw.addrBook.AddAddress(netAddr, ourAddr)
260                 }
261
262                 sw.addrBook.Save()
263         }
264
265         //permute the list, dial them in random order.
266         perm := rand.Perm(len(netAddrs))
267         for i := 0; i < len(perm); i += 2 {
268                 j := perm[i]
269                 sw.dialSeed(netAddrs[j])
270         }
271
272         return nil
273 }
274
275 func (sw *Switch) dialSeed(addr *NetAddress) {
276         err := sw.DialPeerWithAddress(addr)
277         if err != nil {
278                 log.Info("Error dialing seed:", addr.String())
279         }
280 }
281
282 func (sw *Switch) addrBookDelSelf() error {
283         addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
284         if err != nil {
285                 return err
286         }
287         // remove the given address from the address book if we're added it earlier
288         sw.addrBook.RemoveAddress(addr)
289         // add the given address to the address book to avoid dialing ourselves
290         // again this is our public address
291         sw.addrBook.AddOurAddress(addr)
292         return nil
293 }
294
295 func (sw *Switch) filterConnByIP(ip string) error {
296         if err := sw.checkBannedPeer(ip); err != nil {
297                 return ErrConnectBannedPeer
298         }
299
300         if ip == sw.nodeInfo.ListenHost() {
301                 sw.addrBookDelSelf()
302                 return ErrConnectSelf
303         }
304
305         return nil
306 }
307
308 func (sw *Switch) filterConnByPeer(peer *Peer) error {
309         if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
310                 return ErrConnectBannedPeer
311         }
312
313         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
314                 sw.addrBookDelSelf()
315                 return ErrConnectSelf
316         }
317
318         // Check for duplicate peer
319         if sw.peers.Has(peer.Key) {
320                 return ErrDuplicatePeer
321         }
322         return nil
323 }
324
325 //DialPeerWithAddress dial node from net address
326 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
327         log.Debug("Dialing peer address:", addr)
328
329         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
330                 return err
331         }
332
333         sw.dialing.Set(addr.IP.String(), addr)
334         defer sw.dialing.Delete(addr.IP.String())
335
336         pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
337         if err != nil {
338                 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
339                 return err
340         }
341
342         err = sw.AddPeer(pc)
343         if err != nil {
344                 log.Info("Failed to add peer:", addr, " err:", err)
345                 pc.CloseConn()
346                 return err
347         }
348         log.Info("Dialed and added peer:", addr)
349         return nil
350 }
351
352 //IsDialing prevent duplicate dialing
353 func (sw *Switch) IsDialing(addr *NetAddress) bool {
354         return sw.dialing.Has(addr.IP.String())
355 }
356
357 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
358 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
359         peers := sw.peers.List()
360         for _, peer := range peers {
361                 if peer.outbound {
362                         outbound++
363                 } else {
364                         inbound++
365                 }
366         }
367         dialing = sw.dialing.Size()
368         return
369 }
370
371 //Peers return switch peerset
372 func (sw *Switch) Peers() *PeerSet {
373         return sw.peers
374 }
375
376 // StopPeerForError disconnects from a peer due to external error.
377 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
378         log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
379         sw.stopAndRemovePeer(peer, reason)
380 }
381
382 // StopPeerGracefully disconnect from a peer gracefully.
383 func (sw *Switch) StopPeerGracefully(peer *Peer) {
384         log.Info("Stopping peer gracefully")
385         sw.stopAndRemovePeer(peer, nil)
386 }
387
388 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
389         for _, reactor := range sw.reactors {
390                 reactor.RemovePeer(peer, reason)
391         }
392         sw.peers.Remove(peer)
393         peer.Stop()
394 }
395
396 func (sw *Switch) listenerRoutine(l Listener) {
397         for {
398                 inConn, ok := <-l.Connections()
399                 if !ok {
400                         break
401                 }
402
403                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
404                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
405                 // be double of MaxNumPeers
406                 if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
407                         inConn.Close()
408                         log.Info("Ignoring inbound connection: already have enough peers.")
409                         continue
410                 }
411
412                 // New inbound connection!
413                 err := sw.addPeerWithConnection(inConn)
414                 if err != nil {
415                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
416                         continue
417                 }
418         }
419 }
420
421 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
422         peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
423         if err != nil {
424                 conn.Close()
425                 return err
426         }
427         if err = sw.AddPeer(peerConn); err != nil {
428                 conn.Close()
429                 return err
430         }
431
432         return nil
433 }
434
435 //AddBannedPeer add peer to blacklist
436 func (sw *Switch) AddBannedPeer(peer *Peer) error {
437         sw.mtx.Lock()
438         defer sw.mtx.Unlock()
439         key := peer.NodeInfo.RemoteAddrHost()
440         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
441         datajson, err := json.Marshal(sw.bannedPeer)
442         if err != nil {
443                 return err
444         }
445         sw.db.Set([]byte(bannedPeerKey), datajson)
446         return nil
447 }
448
449 func (sw *Switch) delBannedPeer(addr string) error {
450         delete(sw.bannedPeer, addr)
451         datajson, err := json.Marshal(sw.bannedPeer)
452         if err != nil {
453                 return err
454         }
455         sw.db.Set([]byte(bannedPeerKey), datajson)
456         return nil
457 }
458
459 func (sw *Switch) checkBannedPeer(peer string) error {
460         sw.mtx.Lock()
461         defer sw.mtx.Unlock()
462
463         if banEnd, ok := sw.bannedPeer[peer]; ok {
464                 if time.Now().Before(banEnd) {
465                         return ErrConnectBannedPeer
466                 }
467                 sw.delBannedPeer(peer)
468         }
469         return nil
470 }