OSDN Git Service

Merge pull request #993 from Bytom/connection
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/p2p/connection"
18         "github.com/bytom/p2p/trust"
19 )
20
21 const (
22         bannedPeerKey      = "BannedPeer"
23         defaultBanDuration = time.Hour * 1
24 )
25
26 //pre-define errors for connecting fail
27 var (
28         ErrDuplicatePeer     = errors.New("Duplicate peer")
29         ErrConnectSelf       = errors.New("Connect self")
30         ErrConnectBannedPeer = errors.New("Connect banned peer")
31 )
32
33 // An AddrBook represents an address book from the pex package, which is used to store peer addresses.
34 type AddrBook interface {
35         AddAddress(*NetAddress, *NetAddress) error
36         AddOurAddress(*NetAddress)
37         MarkGood(*NetAddress)
38         RemoveAddress(*NetAddress)
39         SaveToFile() error
40 }
41
42 //-----------------------------------------------------------------------------
43
44 // Switch handles peer connections and exposes an API to receive incoming messages
45 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
46 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
47 // incoming messages are received on the reactor.
48 type Switch struct {
49         cmn.BaseService
50
51         Config       *cfg.P2PConfig
52         peerConfig   *PeerConfig
53         listeners    []Listener
54         reactors     map[string]Reactor
55         chDescs      []*connection.ChannelDescriptor
56         reactorsByCh map[byte]Reactor
57         peers        *PeerSet
58         dialing      *cmn.CMap
59         nodeInfo     *NodeInfo             // our node info
60         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
61         addrBook     AddrBook
62         bannedPeer   map[string]time.Time
63         db           dbm.DB
64         mtx          sync.Mutex
65 }
66
67 // NewSwitch creates a new Switch with the given config.
68 func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
69         sw := &Switch{
70                 Config:       config,
71                 peerConfig:   DefaultPeerConfig(config),
72                 reactors:     make(map[string]Reactor),
73                 chDescs:      make([]*connection.ChannelDescriptor, 0),
74                 reactorsByCh: make(map[byte]Reactor),
75                 peers:        NewPeerSet(),
76                 dialing:      cmn.NewCMap(),
77                 nodeInfo:     nil,
78                 addrBook:     addrBook,
79                 db:           trustHistoryDB,
80         }
81         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
82         sw.bannedPeer = make(map[string]time.Time)
83         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
84                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
85                         return nil
86                 }
87         }
88         trust.Init()
89         return sw
90 }
91
92 // AddReactor adds the given reactor to the switch.
93 // NOTE: Not goroutine safe.
94 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
95         // Validate the reactor.
96         // No two reactors can share the same channel.
97         reactorChannels := reactor.GetChannels()
98         for _, chDesc := range reactorChannels {
99                 chID := chDesc.ID
100                 if sw.reactorsByCh[chID] != nil {
101                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
102                 }
103                 sw.chDescs = append(sw.chDescs, chDesc)
104                 sw.reactorsByCh[chID] = reactor
105         }
106         sw.reactors[name] = reactor
107         reactor.SetSwitch(sw)
108         return reactor
109 }
110
111 // Reactors returns a map of reactors registered on the switch.
112 // NOTE: Not goroutine safe.
113 func (sw *Switch) Reactors() map[string]Reactor {
114         return sw.reactors
115 }
116
117 // Reactor returns the reactor with the given name.
118 // NOTE: Not goroutine safe.
119 func (sw *Switch) Reactor(name string) Reactor {
120         return sw.reactors[name]
121 }
122
123 // AddListener adds the given listener to the switch for listening to incoming peer connections.
124 // NOTE: Not goroutine safe.
125 func (sw *Switch) AddListener(l Listener) {
126         sw.listeners = append(sw.listeners, l)
127 }
128
129 // Listeners returns the list of listeners the switch listens on.
130 // NOTE: Not goroutine safe.
131 func (sw *Switch) Listeners() []Listener {
132         return sw.listeners
133 }
134
135 // IsListening returns true if the switch has at least one listener.
136 // NOTE: Not goroutine safe.
137 func (sw *Switch) IsListening() bool {
138         return len(sw.listeners) > 0
139 }
140
141 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
142 // NOTE: Not goroutine safe.
143 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
144         sw.nodeInfo = nodeInfo
145 }
146
147 // NodeInfo returns the switch's NodeInfo.
148 // NOTE: Not goroutine safe.
149 func (sw *Switch) NodeInfo() *NodeInfo {
150         return sw.nodeInfo
151 }
152
153 // SetNodePrivKey sets the switch's private key for authenticated encryption.
154 // NOTE: Not goroutine safe.
155 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
156         sw.nodePrivKey = nodePrivKey
157         if sw.nodeInfo != nil {
158                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
159         }
160 }
161
162 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
163 func (sw *Switch) OnStart() error {
164         // Start reactors
165         for _, reactor := range sw.reactors {
166                 _, err := reactor.Start()
167                 if err != nil {
168                         return err
169                 }
170         }
171         // Start listeners
172         for _, listener := range sw.listeners {
173                 go sw.listenerRoutine(listener)
174         }
175         return nil
176 }
177
178 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
179 func (sw *Switch) OnStop() {
180         // Stop listeners
181         for _, listener := range sw.listeners {
182                 listener.Stop()
183         }
184         sw.listeners = nil
185         // Stop peers
186         for _, peer := range sw.peers.List() {
187                 peer.Stop()
188                 sw.peers.Remove(peer)
189         }
190         // Stop reactors
191         for _, reactor := range sw.reactors {
192                 reactor.Stop()
193         }
194 }
195
196 // AddPeer performs the P2P handshake with a peer
197 // that already has a SecretConnection. If all goes well,
198 // it starts the peer and adds it to the switch.
199 // NOTE: This performs a blocking handshake before the peer is added.
200 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
201 func (sw *Switch) AddPeer(pc *peerConn) error {
202         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
203         if err != nil {
204                 return err
205         }
206         // Check version, chain id
207         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
208                 return err
209         }
210
211         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
212
213         //filter peer
214         if err := sw.filterConnByPeer(peer); err != nil {
215                 return err
216         }
217
218         // Start peer
219         if sw.IsRunning() {
220                 if err := sw.startInitPeer(peer); err != nil {
221                         return err
222                 }
223         }
224
225         // Add the peer to .peers.
226         // We start it first so that a peer in the list is safe to Stop.
227         // It should not err since we already checked peers.Has()
228         if err := sw.peers.Add(peer); err != nil {
229                 return err
230         }
231
232         log.Info("Added peer:", peer)
233         return nil
234 }
235
236 func (sw *Switch) startInitPeer(peer *Peer) error {
237         peer.Start() // spawn send/recv routines
238         for _, reactor := range sw.reactors {
239                 if err := reactor.AddPeer(peer); err != nil {
240                         return err
241                 }
242         }
243         return nil
244 }
245
246 func (sw *Switch) dialSeed(addr *NetAddress) {
247         err := sw.DialPeerWithAddress(addr)
248         if err != nil {
249                 log.Info("Error dialing seed:", addr.String())
250         }
251 }
252
253 func (sw *Switch) addrBookDelSelf() error {
254         addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
255         if err != nil {
256                 return err
257         }
258         // remove the given address from the address book if we're added it earlier
259         sw.addrBook.RemoveAddress(addr)
260         // add the given address to the address book to avoid dialing ourselves
261         // again this is our public address
262         sw.addrBook.AddOurAddress(addr)
263         return nil
264 }
265
266 func (sw *Switch) filterConnByIP(ip string) error {
267         if err := sw.checkBannedPeer(ip); err != nil {
268                 return ErrConnectBannedPeer
269         }
270
271         if ip == sw.nodeInfo.ListenHost() {
272                 sw.addrBookDelSelf()
273                 return ErrConnectSelf
274         }
275
276         return nil
277 }
278
279 func (sw *Switch) filterConnByPeer(peer *Peer) error {
280         if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
281                 return ErrConnectBannedPeer
282         }
283
284         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
285                 sw.addrBookDelSelf()
286                 return ErrConnectSelf
287         }
288
289         // Check for duplicate peer
290         if sw.peers.Has(peer.Key) {
291                 return ErrDuplicatePeer
292         }
293         return nil
294 }
295
296 //DialPeerWithAddress dial node from net address
297 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
298         log.Debug("Dialing peer address:", addr)
299
300         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
301                 return err
302         }
303
304         sw.dialing.Set(addr.IP.String(), addr)
305         defer sw.dialing.Delete(addr.IP.String())
306
307         pc, err := newOutboundPeerConn(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig)
308         if err != nil {
309                 log.Debug("Failed to dial peer", " address:", addr, " error:", err)
310                 return err
311         }
312
313         err = sw.AddPeer(pc)
314         if err != nil {
315                 log.Info("Failed to add peer:", addr, " err:", err)
316                 pc.CloseConn()
317                 return err
318         }
319         log.Info("Dialed and added peer:", addr)
320         return nil
321 }
322
323 //IsDialing prevent duplicate dialing
324 func (sw *Switch) IsDialing(addr *NetAddress) bool {
325         return sw.dialing.Has(addr.IP.String())
326 }
327
328 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
329 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
330         peers := sw.peers.List()
331         for _, peer := range peers {
332                 if peer.outbound {
333                         outbound++
334                 } else {
335                         inbound++
336                 }
337         }
338         dialing = sw.dialing.Size()
339         return
340 }
341
342 //Peers return switch peerset
343 func (sw *Switch) Peers() *PeerSet {
344         return sw.peers
345 }
346
347 // StopPeerForError disconnects from a peer due to external error.
348 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
349         log.Info("Stopping peer for error.", " peer:", peer, " err:", reason)
350         sw.stopAndRemovePeer(peer, reason)
351 }
352
353 // StopPeerGracefully disconnect from a peer gracefully.
354 func (sw *Switch) StopPeerGracefully(peer *Peer) {
355         log.Info("Stopping peer gracefully")
356         sw.stopAndRemovePeer(peer, nil)
357 }
358
359 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
360         for _, reactor := range sw.reactors {
361                 reactor.RemovePeer(peer, reason)
362         }
363         sw.peers.Remove(peer)
364         peer.Stop()
365 }
366
367 func (sw *Switch) listenerRoutine(l Listener) {
368         for {
369                 inConn, ok := <-l.Connections()
370                 if !ok {
371                         break
372                 }
373
374                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
375                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
376                 // be double of MaxNumPeers
377                 if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
378                         inConn.Close()
379                         log.Info("Ignoring inbound connection: already have enough peers.")
380                         continue
381                 }
382
383                 // New inbound connection!
384                 err := sw.addPeerWithConnection(inConn)
385                 if err != nil {
386                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
387                         continue
388                 }
389         }
390 }
391
392 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
393         peerConn, err := newInboundPeerConn(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.Config)
394         if err != nil {
395                 conn.Close()
396                 return err
397         }
398         if err = sw.AddPeer(peerConn); err != nil {
399                 conn.Close()
400                 return err
401         }
402
403         return nil
404 }
405
406 //AddBannedPeer add peer to blacklist
407 func (sw *Switch) AddBannedPeer(peer *Peer) error {
408         sw.mtx.Lock()
409         defer sw.mtx.Unlock()
410         key := peer.NodeInfo.RemoteAddrHost()
411         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
412         datajson, err := json.Marshal(sw.bannedPeer)
413         if err != nil {
414                 return err
415         }
416         sw.db.Set([]byte(bannedPeerKey), datajson)
417         return nil
418 }
419
420 func (sw *Switch) delBannedPeer(addr string) error {
421         delete(sw.bannedPeer, addr)
422         datajson, err := json.Marshal(sw.bannedPeer)
423         if err != nil {
424                 return err
425         }
426         sw.db.Set([]byte(bannedPeerKey), datajson)
427         return nil
428 }
429
430 func (sw *Switch) checkBannedPeer(peer string) error {
431         sw.mtx.Lock()
432         defer sw.mtx.Unlock()
433
434         if banEnd, ok := sw.bannedPeer[peer]; ok {
435                 if time.Now().Before(banEnd) {
436                         return ErrConnectBannedPeer
437                 }
438                 sw.delBannedPeer(peer)
439         }
440         return nil
441 }