OSDN Git Service

Merge pull request #970 from Bytom/p2p_test
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "math/rand"
7         "net"
8         "sync"
9         "time"
10
11         log "github.com/sirupsen/logrus"
12         crypto "github.com/tendermint/go-crypto"
13         cmn "github.com/tendermint/tmlibs/common"
14         dbm "github.com/tendermint/tmlibs/db"
15
16         cfg "github.com/bytom/config"
17         "github.com/bytom/errors"
18         "github.com/bytom/p2p/trust"
19 )
20
21 const (
22         bannedPeerKey      = "BannedPeer"
23         defaultBanDuration = time.Hour * 1
24 )
25
26 var (
27         ErrDuplicatePeer     = errors.New("Duplicate peer")
28         ErrConnectSelf       = errors.New("Connect self")
29         ErrConnectBannedPeer = errors.New("Connect banned peer")
30 )
31
32 //-----------------------------------------------------------------------------
33
34 // Switch handles peer connections and exposes an API to receive incoming messages
35 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
36 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
37 // incoming messages are received on the reactor.
38 type Switch struct {
39         cmn.BaseService
40
41         config       *cfg.P2PConfig
42         peerConfig   *PeerConfig
43         listeners    []Listener
44         reactors     map[string]Reactor
45         chDescs      []*ChannelDescriptor
46         reactorsByCh map[byte]Reactor
47         peers        *PeerSet
48         dialing      *cmn.CMap
49         nodeInfo     *NodeInfo             // our node info
50         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
51         addrBook     *AddrBook
52         bannedPeer   map[string]time.Time
53         db           dbm.DB
54         mtx          sync.Mutex
55 }
56
57 // NewSwitch creates a new Switch with the given config.
58 func NewSwitch(config *cfg.P2PConfig, trustHistoryDB dbm.DB) *Switch {
59         sw := &Switch{
60                 config:       config,
61                 peerConfig:   DefaultPeerConfig(config),
62                 reactors:     make(map[string]Reactor),
63                 chDescs:      make([]*ChannelDescriptor, 0),
64                 reactorsByCh: make(map[byte]Reactor),
65                 peers:        NewPeerSet(),
66                 dialing:      cmn.NewCMap(),
67                 nodeInfo:     nil,
68                 db:           trustHistoryDB,
69         }
70         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
71
72         // Optionally, start the pex reactor
73         if config.PexReactor {
74                 sw.addrBook = NewAddrBook(config.AddrBookFile(), config.AddrBookStrict)
75                 pexReactor := NewPEXReactor(sw.addrBook, sw)
76                 sw.AddReactor("PEX", pexReactor)
77         }
78
79         sw.bannedPeer = make(map[string]time.Time)
80         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
81                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
82                         return nil
83                 }
84         }
85         trust.Init()
86         return sw
87 }
88
89 // AddReactor adds the given reactor to the switch.
90 // NOTE: Not goroutine safe.
91 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
92         // Validate the reactor.
93         // No two reactors can share the same channel.
94         reactorChannels := reactor.GetChannels()
95         for _, chDesc := range reactorChannels {
96                 chID := chDesc.ID
97                 if sw.reactorsByCh[chID] != nil {
98                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
99                 }
100                 sw.chDescs = append(sw.chDescs, chDesc)
101                 sw.reactorsByCh[chID] = reactor
102         }
103         sw.reactors[name] = reactor
104         reactor.SetSwitch(sw)
105         return reactor
106 }
107
108 // Reactors returns a map of reactors registered on the switch.
109 // NOTE: Not goroutine safe.
110 func (sw *Switch) Reactors() map[string]Reactor {
111         return sw.reactors
112 }
113
114 // Reactor returns the reactor with the given name.
115 // NOTE: Not goroutine safe.
116 func (sw *Switch) Reactor(name string) Reactor {
117         return sw.reactors[name]
118 }
119
120 // AddListener adds the given listener to the switch for listening to incoming peer connections.
121 // NOTE: Not goroutine safe.
122 func (sw *Switch) AddListener(l Listener) {
123         sw.listeners = append(sw.listeners, l)
124 }
125
126 // Listeners returns the list of listeners the switch listens on.
127 // NOTE: Not goroutine safe.
128 func (sw *Switch) Listeners() []Listener {
129         return sw.listeners
130 }
131
132 // IsListening returns true if the switch has at least one listener.
133 // NOTE: Not goroutine safe.
134 func (sw *Switch) IsListening() bool {
135         return len(sw.listeners) > 0
136 }
137
138 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
139 // NOTE: Not goroutine safe.
140 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
141         sw.nodeInfo = nodeInfo
142 }
143
144 // NodeInfo returns the switch's NodeInfo.
145 // NOTE: Not goroutine safe.
146 func (sw *Switch) NodeInfo() *NodeInfo {
147         return sw.nodeInfo
148 }
149
150 // SetNodeKey sets the switch's private key for authenticated encryption.
151 // NOTE: Not goroutine safe.
152 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
153         sw.nodePrivKey = nodePrivKey
154         if sw.nodeInfo != nil {
155                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
156         }
157 }
158
159 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
160 func (sw *Switch) OnStart() error {
161         // Start reactors
162         for _, reactor := range sw.reactors {
163                 _, err := reactor.Start()
164                 if err != nil {
165                         return err
166                 }
167         }
168         // Start listeners
169         for _, listener := range sw.listeners {
170                 go sw.listenerRoutine(listener)
171         }
172         return nil
173 }
174
175 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
176 func (sw *Switch) OnStop() {
177         // Stop listeners
178         for _, listener := range sw.listeners {
179                 listener.Stop()
180         }
181         sw.listeners = nil
182         // Stop peers
183         for _, peer := range sw.peers.List() {
184                 peer.Stop()
185                 sw.peers.Remove(peer)
186         }
187         // Stop reactors
188         for _, reactor := range sw.reactors {
189                 reactor.Stop()
190         }
191 }
192
193 // addPeer performs the P2P handshake with a peer
194 // that already has a SecretConnection. If all goes well,
195 // it starts the peer and adds it to the switch.
196 // NOTE: This performs a blocking handshake before the peer is added.
197 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
198 func (sw *Switch) AddPeer(pc *peerConn) error {
199         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
200         if err != nil {
201                 return ErrConnectBannedPeer
202         }
203         // Check version, chain id
204         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
205                 return err
206         }
207
208         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
209
210         //filter peer
211         if err := sw.filterConnByPeer(peer); err != nil {
212                 return err
213         }
214
215         // Start peer
216         if sw.IsRunning() {
217                 if err := sw.startInitPeer(peer); err != nil {
218                         return err
219                 }
220         }
221
222         // Add the peer to .peers.
223         // We start it first so that a peer in the list is safe to Stop.
224         // It should not err since we already checked peers.Has()
225         if err := sw.peers.Add(peer); err != nil {
226                 return err
227         }
228
229         log.Info("Added peer:", peer)
230         return nil
231 }
232
233 func (sw *Switch) startInitPeer(peer *Peer) error {
234         peer.Start() // spawn send/recv routines
235         for _, reactor := range sw.reactors {
236                 if err := reactor.AddPeer(peer); err != nil {
237                         return err
238                 }
239         }
240         return nil
241 }
242
243 // Dial a list of seeds asynchronously in random order
244 func (sw *Switch) DialSeeds(seeds []string) error {
245         netAddrs, err := NewNetAddressStrings(seeds)
246         if err != nil {
247                 return err
248         }
249
250         if sw.addrBook != nil {
251                 // add seeds to `addrBook`
252                 ourAddr, _ := NewNetAddressString(sw.nodeInfo.ListenAddr)
253                 for _, netAddr := range netAddrs {
254                         // do not add ourselves
255                         if netAddr.Equals(ourAddr) {
256                                 continue
257                         }
258                         sw.addrBook.AddAddress(netAddr, ourAddr)
259                 }
260
261                 sw.addrBook.Save()
262         }
263
264         //permute the list, dial them in random order.
265         perm := rand.Perm(len(netAddrs))
266         for i := 0; i < len(perm); i += 2 {
267                 j := perm[i]
268                 sw.dialSeed(netAddrs[j])
269         }
270
271         return nil
272 }
273
274 func (sw *Switch) dialSeed(addr *NetAddress) {
275         err := sw.DialPeerWithAddress(addr)
276         if err != nil {
277                 log.Info("Error dialing seed:", addr.String())
278         }
279 }
280
281 func (sw *Switch) addrBookDelSelf() error {
282         addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
283         if err != nil {
284                 return err
285         }
286         // remove the given address from the address book if we're added it earlier
287         sw.addrBook.RemoveAddress(addr)
288         // add the given address to the address book to avoid dialing ourselves
289         // again this is our public address
290         sw.addrBook.AddOurAddress(addr)
291         return nil
292 }
293
294 func (sw *Switch) filterConnByIP(ip string) error {
295         if err := sw.checkBannedPeer(ip); err != nil {
296                 return ErrConnectBannedPeer
297         }
298
299         if ip == sw.nodeInfo.ListenHost() {
300                 sw.addrBookDelSelf()
301                 return ErrConnectSelf
302         }
303
304         return nil
305 }
306
307 func (sw *Switch) filterConnByPeer(peer *Peer) error {
308         if err := sw.checkBannedPeer(peer.NodeInfo.RemoteAddrHost()); err != nil {
309                 return ErrConnectBannedPeer
310         }
311
312         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
313                 sw.addrBookDelSelf()
314                 return ErrConnectSelf
315         }
316
317         // Check for duplicate peer
318         if sw.peers.Has(peer.Key) {
319                 return ErrDuplicatePeer
320         }
321         return nil
322 }
323
324 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
325         log.Debug("Dialing peer address:", addr)
326
327         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
328                 return err
329         }
330
331         sw.dialing.Set(addr.IP.String(), addr)
332         defer sw.dialing.Delete(addr.IP.String())
333
334         pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
335         if err != nil {
336                 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
337                 return err
338         }
339
340         err = sw.AddPeer(pc)
341         if err != nil {
342                 log.Info("Failed to add peer:", addr, " err:", err)
343                 pc.CloseConn()
344                 return err
345         }
346         log.Info("Dialed and added peer:", addr)
347         return nil
348 }
349
350 func (sw *Switch) IsDialing(addr *NetAddress) bool {
351         return sw.dialing.Has(addr.IP.String())
352 }
353
354 // Returns the count of outbound/inbound and outbound-dialing peers.
355 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
356         peers := sw.peers.List()
357         for _, peer := range peers {
358                 if peer.outbound {
359                         outbound++
360                 } else {
361                         inbound++
362                 }
363         }
364         dialing = sw.dialing.Size()
365         return
366 }
367
368 func (sw *Switch) Peers() *PeerSet {
369         return sw.peers
370 }
371
372 // StopPeerForError disconnects from a peer due to external error.
373 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
374         log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
375         sw.stopAndRemovePeer(peer, reason)
376 }
377
378 // Disconnect from a peer gracefully.
379 func (sw *Switch) StopPeerGracefully(peer *Peer) {
380         log.Info("Stopping peer gracefully")
381         sw.stopAndRemovePeer(peer, nil)
382 }
383
384 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
385         for _, reactor := range sw.reactors {
386                 reactor.RemovePeer(peer, reason)
387         }
388         sw.peers.Remove(peer)
389         peer.Stop()
390 }
391
392 func (sw *Switch) listenerRoutine(l Listener) {
393         for {
394                 inConn, ok := <-l.Connections()
395                 if !ok {
396                         break
397                 }
398
399                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
400                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
401                 // be double of MaxNumPeers
402                 if sw.peers.Size() >= sw.config.MaxNumPeers*2 {
403                         inConn.Close()
404                         log.Info("Ignoring inbound connection: already have enough peers.")
405                         continue
406                 }
407
408                 // New inbound connection!
409                 err := sw.addPeerWithConnection(inConn)
410                 if err != nil {
411                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
412                         continue
413                 }
414         }
415 }
416
417 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
418         peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.config)
419         if err != nil {
420                 conn.Close()
421                 return err
422         }
423         if err = sw.AddPeer(peerConn); err != nil {
424                 conn.Close()
425                 return err
426         }
427
428         return nil
429 }
430
431 func (sw *Switch) AddBannedPeer(peer *Peer) error {
432         sw.mtx.Lock()
433         defer sw.mtx.Unlock()
434         key := peer.NodeInfo.RemoteAddrHost()
435         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
436         datajson, err := json.Marshal(sw.bannedPeer)
437         if err != nil {
438                 return err
439         }
440         sw.db.Set([]byte(bannedPeerKey), datajson)
441         return nil
442 }
443
444 func (sw *Switch) delBannedPeer(addr string) error {
445         delete(sw.bannedPeer, addr)
446         datajson, err := json.Marshal(sw.bannedPeer)
447         if err != nil {
448                 return err
449         }
450         sw.db.Set([]byte(bannedPeerKey), datajson)
451         return nil
452 }
453
454 func (sw *Switch) checkBannedPeer(peer string) error {
455         sw.mtx.Lock()
456         defer sw.mtx.Unlock()
457
458         if banEnd, ok := sw.bannedPeer[peer]; ok {
459                 if time.Now().Before(banEnd) {
460                         return ErrConnectBannedPeer
461                 }
462                 sw.delBannedPeer(peer)
463         }
464         return nil
465 }