OSDN Git Service

Merge pull request #1046 from Bytom/dev
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         crypto "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/p2p/connection"
18         "github.com/bytom/p2p/trust"
19 )
20
21 const (
22         bannedPeerKey      = "BannedPeer"
23         defaultBanDuration = time.Hour * 1
24 )
25
26 //pre-define errors for connecting fail
27 var (
28         ErrDuplicatePeer     = errors.New("Duplicate peer")
29         ErrConnectSelf       = errors.New("Connect self")
30         ErrConnectBannedPeer = errors.New("Connect banned peer")
31 )
32
33 // An AddrBook represents an address book from the pex package, which is used to store peer addresses.
34 type AddrBook interface {
35         AddAddress(*NetAddress, *NetAddress) error
36         AddOurAddress(*NetAddress)
37         MarkGood(*NetAddress)
38         RemoveAddress(*NetAddress)
39         SaveToFile() error
40 }
41
42 //-----------------------------------------------------------------------------
43
44 // Switch handles peer connections and exposes an API to receive incoming messages
45 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
46 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
47 // incoming messages are received on the reactor.
48 type Switch struct {
49         cmn.BaseService
50
51         Config       *cfg.P2PConfig
52         peerConfig   *PeerConfig
53         listeners    []Listener
54         reactors     map[string]Reactor
55         chDescs      []*connection.ChannelDescriptor
56         reactorsByCh map[byte]Reactor
57         peers        *PeerSet
58         dialing      *cmn.CMap
59         nodeInfo     *NodeInfo             // our node info
60         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
61         addrBook     AddrBook
62         bannedPeer   map[string]time.Time
63         db           dbm.DB
64         mtx          sync.Mutex
65 }
66
67 // NewSwitch creates a new Switch with the given config.
68 func NewSwitch(config *cfg.P2PConfig, addrBook AddrBook, trustHistoryDB dbm.DB) *Switch {
69         sw := &Switch{
70                 Config:       config,
71                 peerConfig:   DefaultPeerConfig(config),
72                 reactors:     make(map[string]Reactor),
73                 chDescs:      make([]*connection.ChannelDescriptor, 0),
74                 reactorsByCh: make(map[byte]Reactor),
75                 peers:        NewPeerSet(),
76                 dialing:      cmn.NewCMap(),
77                 nodeInfo:     nil,
78                 addrBook:     addrBook,
79                 db:           trustHistoryDB,
80         }
81         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
82         sw.bannedPeer = make(map[string]time.Time)
83         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
84                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
85                         return nil
86                 }
87         }
88         trust.Init()
89         return sw
90 }
91
92 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
93 func (sw *Switch) OnStart() error {
94         for _, reactor := range sw.reactors {
95                 if _, err := reactor.Start(); err != nil {
96                         return err
97                 }
98         }
99         for _, listener := range sw.listeners {
100                 go sw.listenerRoutine(listener)
101         }
102         return nil
103 }
104
105 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
106 func (sw *Switch) OnStop() {
107         for _, listener := range sw.listeners {
108                 listener.Stop()
109         }
110         sw.listeners = nil
111
112         for _, peer := range sw.peers.List() {
113                 peer.Stop()
114                 sw.peers.Remove(peer)
115         }
116
117         for _, reactor := range sw.reactors {
118                 reactor.Stop()
119         }
120 }
121
122 //AddBannedPeer add peer to blacklist
123 func (sw *Switch) AddBannedPeer(peer *Peer) error {
124         sw.mtx.Lock()
125         defer sw.mtx.Unlock()
126
127         key := peer.NodeInfo.RemoteAddrHost()
128         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
129         datajson, err := json.Marshal(sw.bannedPeer)
130         if err != nil {
131                 return err
132         }
133
134         sw.db.Set([]byte(bannedPeerKey), datajson)
135         return nil
136 }
137
138 // AddPeer performs the P2P handshake with a peer
139 // that already has a SecretConnection. If all goes well,
140 // it starts the peer and adds it to the switch.
141 // NOTE: This performs a blocking handshake before the peer is added.
142 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
143 func (sw *Switch) AddPeer(pc *peerConn) error {
144         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
145         if err != nil {
146                 return err
147         }
148
149         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
150                 return err
151         }
152
153         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
154         if err := sw.filterConnByPeer(peer); err != nil {
155                 return err
156         }
157
158         // Start peer
159         if sw.IsRunning() {
160                 if err := sw.startInitPeer(peer); err != nil {
161                         return err
162                 }
163         }
164         return sw.peers.Add(peer)
165 }
166
167 // AddReactor adds the given reactor to the switch.
168 // NOTE: Not goroutine safe.
169 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
170         // Validate the reactor.
171         // No two reactors can share the same channel.
172         for _, chDesc := range reactor.GetChannels() {
173                 chID := chDesc.ID
174                 if sw.reactorsByCh[chID] != nil {
175                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
176                 }
177                 sw.chDescs = append(sw.chDescs, chDesc)
178                 sw.reactorsByCh[chID] = reactor
179         }
180         sw.reactors[name] = reactor
181         reactor.SetSwitch(sw)
182         return reactor
183 }
184
185 // AddListener adds the given listener to the switch for listening to incoming peer connections.
186 // NOTE: Not goroutine safe.
187 func (sw *Switch) AddListener(l Listener) {
188         sw.listeners = append(sw.listeners, l)
189 }
190
191 //DialPeerWithAddress dial node from net address
192 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
193         log.Debug("Dialing peer address:", addr)
194         sw.dialing.Set(addr.IP.String(), addr)
195         defer sw.dialing.Delete(addr.IP.String())
196         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
197                 return err
198         }
199
200         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
201         if err != nil {
202                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
203                 return err
204         }
205
206         if err = sw.AddPeer(pc); err != nil {
207                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
208                 pc.CloseConn()
209                 return err
210         }
211         log.Debug("DialPeer added peer:", addr)
212         return nil
213 }
214
215 //IsDialing prevent duplicate dialing
216 func (sw *Switch) IsDialing(addr *NetAddress) bool {
217         return sw.dialing.Has(addr.IP.String())
218 }
219
220 // IsListening returns true if the switch has at least one listener.
221 // NOTE: Not goroutine safe.
222 func (sw *Switch) IsListening() bool {
223         return len(sw.listeners) > 0
224 }
225
226 // Listeners returns the list of listeners the switch listens on.
227 // NOTE: Not goroutine safe.
228 func (sw *Switch) Listeners() []Listener {
229         return sw.listeners
230 }
231
232 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
233 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
234         peers := sw.peers.List()
235         for _, peer := range peers {
236                 if peer.outbound {
237                         outbound++
238                 } else {
239                         inbound++
240                 }
241         }
242         dialing = sw.dialing.Size()
243         return
244 }
245
246 // NodeInfo returns the switch's NodeInfo.
247 // NOTE: Not goroutine safe.
248 func (sw *Switch) NodeInfo() *NodeInfo {
249         return sw.nodeInfo
250 }
251
252 //Peers return switch peerset
253 func (sw *Switch) Peers() *PeerSet {
254         return sw.peers
255 }
256
257 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
258 // NOTE: Not goroutine safe.
259 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
260         sw.nodeInfo = nodeInfo
261 }
262
263 // SetNodePrivKey sets the switch's private key for authenticated encryption.
264 // NOTE: Not goroutine safe.
265 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
266         sw.nodePrivKey = nodePrivKey
267         if sw.nodeInfo != nil {
268                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
269         }
270 }
271
272 // StopPeerForError disconnects from a peer due to external error.
273 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
274         log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
275         sw.stopAndRemovePeer(peer, reason)
276 }
277
278 // StopPeerGracefully disconnect from a peer gracefully.
279 func (sw *Switch) StopPeerGracefully(peer *Peer) {
280         sw.stopAndRemovePeer(peer, nil)
281 }
282
283 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
284         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config)
285         if err != nil {
286                 conn.Close()
287                 return err
288         }
289
290         if err = sw.AddPeer(peerConn); err != nil {
291                 conn.Close()
292                 return err
293         }
294         return nil
295 }
296
297 func (sw *Switch) addrBookDelSelf() error {
298         addr, err := NewNetAddressString(sw.nodeInfo.ListenAddr)
299         if err != nil {
300                 return err
301         }
302
303         sw.addrBook.RemoveAddress(addr)
304         sw.addrBook.AddOurAddress(addr)
305         return nil
306 }
307
308 func (sw *Switch) checkBannedPeer(peer string) error {
309         sw.mtx.Lock()
310         defer sw.mtx.Unlock()
311
312         if banEnd, ok := sw.bannedPeer[peer]; ok {
313                 if time.Now().Before(banEnd) {
314                         return ErrConnectBannedPeer
315                 }
316                 sw.delBannedPeer(peer)
317         }
318         return nil
319 }
320
321 func (sw *Switch) delBannedPeer(addr string) error {
322         sw.mtx.Lock()
323         defer sw.mtx.Unlock()
324
325         delete(sw.bannedPeer, addr)
326         datajson, err := json.Marshal(sw.bannedPeer)
327         if err != nil {
328                 return err
329         }
330
331         sw.db.Set([]byte(bannedPeerKey), datajson)
332         return nil
333 }
334
335 func (sw *Switch) filterConnByIP(ip string) error {
336         if ip == sw.nodeInfo.ListenHost() {
337                 sw.addrBookDelSelf()
338                 return ErrConnectSelf
339         }
340         return sw.checkBannedPeer(ip)
341 }
342
343 func (sw *Switch) filterConnByPeer(peer *Peer) error {
344         if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
345                 return err
346         }
347
348         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
349                 sw.addrBookDelSelf()
350                 return ErrConnectSelf
351         }
352
353         if sw.peers.Has(peer.Key) {
354                 return ErrDuplicatePeer
355         }
356         return nil
357 }
358
359 func (sw *Switch) listenerRoutine(l Listener) {
360         for {
361                 inConn, ok := <-l.Connections()
362                 if !ok {
363                         break
364                 }
365
366                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
367                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
368                 // be double of MaxNumPeers
369                 if sw.peers.Size() >= sw.Config.MaxNumPeers*2 {
370                         inConn.Close()
371                         log.Info("Ignoring inbound connection: already have enough peers.")
372                         continue
373                 }
374
375                 // New inbound connection!
376                 if err := sw.addPeerWithConnection(inConn); err != nil {
377                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
378                         continue
379                 }
380         }
381 }
382
383 func (sw *Switch) startInitPeer(peer *Peer) error {
384         peer.Start() // spawn send/recv routines
385         for _, reactor := range sw.reactors {
386                 if err := reactor.AddPeer(peer); err != nil {
387                         return err
388                 }
389         }
390         return nil
391 }
392
393 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
394         for _, reactor := range sw.reactors {
395                 reactor.RemovePeer(peer, reason)
396         }
397         sw.peers.Remove(peer)
398         peer.Stop()
399 }