OSDN Git Service

Merge pull request #1041 from Bytom/dev-utxo-tx
[bytom/bytom.git] / p2p / switch.go
1 package p2p
2
3 import (
4         "encoding/json"
5         "fmt"
6         "net"
7         "sync"
8         "time"
9
10         log "github.com/sirupsen/logrus"
11         "github.com/tendermint/go-crypto"
12         cmn "github.com/tendermint/tmlibs/common"
13         dbm "github.com/tendermint/tmlibs/db"
14
15         cfg "github.com/bytom/config"
16         "github.com/bytom/errors"
17         "github.com/bytom/p2p/connection"
18         "github.com/bytom/p2p/trust"
19 )
20
21 const (
22         bannedPeerKey      = "BannedPeer"
23         defaultBanDuration = time.Hour * 1
24 )
25
26 //pre-define errors for connecting fail
27 var (
28         ErrDuplicatePeer     = errors.New("Duplicate peer")
29         ErrConnectSelf       = errors.New("Connect self")
30         ErrConnectBannedPeer = errors.New("Connect banned peer")
31 )
32
33 // Switch handles peer connections and exposes an API to receive incoming messages
34 // on `Reactors`.  Each `Reactor` is responsible for handling incoming messages of one
35 // or more `Channels`.  So while sending outgoing messages is typically performed on the peer,
36 // incoming messages are received on the reactor.
37 type Switch struct {
38         cmn.BaseService
39
40         Config       *cfg.Config
41         peerConfig   *PeerConfig
42         listeners    []Listener
43         reactors     map[string]Reactor
44         chDescs      []*connection.ChannelDescriptor
45         reactorsByCh map[byte]Reactor
46         peers        *PeerSet
47         dialing      *cmn.CMap
48         nodeInfo     *NodeInfo             // our node info
49         nodePrivKey  crypto.PrivKeyEd25519 // our node privkey
50         bannedPeer   map[string]time.Time
51         db           dbm.DB
52         mtx          sync.Mutex
53 }
54
55 // NewSwitch creates a new Switch with the given config.
56 func NewSwitch(config *cfg.Config) *Switch {
57         sw := &Switch{
58                 Config:       config,
59                 peerConfig:   DefaultPeerConfig(config.P2P),
60                 reactors:     make(map[string]Reactor),
61                 chDescs:      make([]*connection.ChannelDescriptor, 0),
62                 reactorsByCh: make(map[byte]Reactor),
63                 peers:        NewPeerSet(),
64                 dialing:      cmn.NewCMap(),
65                 nodeInfo:     nil,
66                 db:           dbm.NewDB("trusthistory", config.DBBackend, config.DBDir()),
67         }
68         sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw)
69         sw.bannedPeer = make(map[string]time.Time)
70         if datajson := sw.db.Get([]byte(bannedPeerKey)); datajson != nil {
71                 if err := json.Unmarshal(datajson, &sw.bannedPeer); err != nil {
72                         return nil
73                 }
74         }
75         trust.Init()
76         return sw
77 }
78
79 // OnStart implements BaseService. It starts all the reactors, peers, and listeners.
80 func (sw *Switch) OnStart() error {
81         for _, reactor := range sw.reactors {
82                 if _, err := reactor.Start(); err != nil {
83                         return err
84                 }
85         }
86         for _, listener := range sw.listeners {
87                 go sw.listenerRoutine(listener)
88         }
89         return nil
90 }
91
92 // OnStop implements BaseService. It stops all listeners, peers, and reactors.
93 func (sw *Switch) OnStop() {
94         for _, listener := range sw.listeners {
95                 listener.Stop()
96         }
97         sw.listeners = nil
98
99         for _, peer := range sw.peers.List() {
100                 peer.Stop()
101                 sw.peers.Remove(peer)
102         }
103
104         for _, reactor := range sw.reactors {
105                 reactor.Stop()
106         }
107 }
108
109 //AddBannedPeer add peer to blacklist
110 func (sw *Switch) AddBannedPeer(peer *Peer) error {
111         sw.mtx.Lock()
112         defer sw.mtx.Unlock()
113
114         key := peer.NodeInfo.RemoteAddrHost()
115         sw.bannedPeer[key] = time.Now().Add(defaultBanDuration)
116         datajson, err := json.Marshal(sw.bannedPeer)
117         if err != nil {
118                 return err
119         }
120
121         sw.db.Set([]byte(bannedPeerKey), datajson)
122         return nil
123 }
124
125 // AddPeer performs the P2P handshake with a peer
126 // that already has a SecretConnection. If all goes well,
127 // it starts the peer and adds it to the switch.
128 // NOTE: This performs a blocking handshake before the peer is added.
129 // CONTRACT: If error is returned, peer is nil, and conn is immediately closed.
130 func (sw *Switch) AddPeer(pc *peerConn) error {
131         peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.peerConfig.HandshakeTimeout*time.Second))
132         if err != nil {
133                 return err
134         }
135
136         if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
137                 return err
138         }
139
140         peer := newPeer(pc, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
141         if err := sw.filterConnByPeer(peer); err != nil {
142                 return err
143         }
144
145         // Start peer
146         if sw.IsRunning() {
147                 if err := sw.startInitPeer(peer); err != nil {
148                         return err
149                 }
150         }
151         return sw.peers.Add(peer)
152 }
153
154 // AddReactor adds the given reactor to the switch.
155 // NOTE: Not goroutine safe.
156 func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
157         // Validate the reactor.
158         // No two reactors can share the same channel.
159         for _, chDesc := range reactor.GetChannels() {
160                 chID := chDesc.ID
161                 if sw.reactorsByCh[chID] != nil {
162                         cmn.PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
163                 }
164                 sw.chDescs = append(sw.chDescs, chDesc)
165                 sw.reactorsByCh[chID] = reactor
166         }
167         sw.reactors[name] = reactor
168         reactor.SetSwitch(sw)
169         return reactor
170 }
171
172 // AddListener adds the given listener to the switch for listening to incoming peer connections.
173 // NOTE: Not goroutine safe.
174 func (sw *Switch) AddListener(l Listener) {
175         sw.listeners = append(sw.listeners, l)
176 }
177
178 //DialPeerWithAddress dial node from net address
179 func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error {
180         log.Debug("Dialing peer address:", addr)
181         sw.dialing.Set(addr.IP.String(), addr)
182         defer sw.dialing.Delete(addr.IP.String())
183         if err := sw.filterConnByIP(addr.IP.String()); err != nil {
184                 return err
185         }
186
187         pc, err := newOutboundPeerConn(addr, sw.nodePrivKey, sw.peerConfig)
188         if err != nil {
189                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on newOutboundPeerConn")
190                 return err
191         }
192
193         if err = sw.AddPeer(pc); err != nil {
194                 log.WithFields(log.Fields{"address": addr, " err": err}).Debug("DialPeer fail on switch AddPeer")
195                 pc.CloseConn()
196                 return err
197         }
198         log.Debug("DialPeer added peer:", addr)
199         return nil
200 }
201
202 //IsDialing prevent duplicate dialing
203 func (sw *Switch) IsDialing(addr *NetAddress) bool {
204         return sw.dialing.Has(addr.IP.String())
205 }
206
207 // IsListening returns true if the switch has at least one listener.
208 // NOTE: Not goroutine safe.
209 func (sw *Switch) IsListening() bool {
210         return len(sw.listeners) > 0
211 }
212
213 // Listeners returns the list of listeners the switch listens on.
214 // NOTE: Not goroutine safe.
215 func (sw *Switch) Listeners() []Listener {
216         return sw.listeners
217 }
218
219 // NumPeers Returns the count of outbound/inbound and outbound-dialing peers.
220 func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
221         peers := sw.peers.List()
222         for _, peer := range peers {
223                 if peer.outbound {
224                         outbound++
225                 } else {
226                         inbound++
227                 }
228         }
229         dialing = sw.dialing.Size()
230         return
231 }
232
233 // NodeInfo returns the switch's NodeInfo.
234 // NOTE: Not goroutine safe.
235 func (sw *Switch) NodeInfo() *NodeInfo {
236         return sw.nodeInfo
237 }
238
239 //Peers return switch peerset
240 func (sw *Switch) Peers() *PeerSet {
241         return sw.peers
242 }
243
244 // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
245 // NOTE: Not goroutine safe.
246 func (sw *Switch) SetNodeInfo(nodeInfo *NodeInfo) {
247         sw.nodeInfo = nodeInfo
248 }
249
250 // SetNodePrivKey sets the switch's private key for authenticated encryption.
251 // NOTE: Not goroutine safe.
252 func (sw *Switch) SetNodePrivKey(nodePrivKey crypto.PrivKeyEd25519) {
253         sw.nodePrivKey = nodePrivKey
254         if sw.nodeInfo != nil {
255                 sw.nodeInfo.PubKey = nodePrivKey.PubKey().Unwrap().(crypto.PubKeyEd25519)
256         }
257 }
258
259 // StopPeerForError disconnects from a peer due to external error.
260 func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
261         log.WithFields(log.Fields{"peer": peer, " err": reason}).Debug("stopping peer for error")
262         sw.stopAndRemovePeer(peer, reason)
263 }
264
265 // StopPeerGracefully disconnect from a peer gracefully.
266 func (sw *Switch) StopPeerGracefully(peer *Peer) {
267         sw.stopAndRemovePeer(peer, nil)
268 }
269
270 func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
271         peerConn, err := newInboundPeerConn(conn, sw.nodePrivKey, sw.Config.P2P)
272         if err != nil {
273                 conn.Close()
274                 return err
275         }
276
277         if err = sw.AddPeer(peerConn); err != nil {
278                 conn.Close()
279                 return err
280         }
281         return nil
282 }
283
284 func (sw *Switch) checkBannedPeer(peer string) error {
285         sw.mtx.Lock()
286         defer sw.mtx.Unlock()
287
288         if banEnd, ok := sw.bannedPeer[peer]; ok {
289                 if time.Now().Before(banEnd) {
290                         return ErrConnectBannedPeer
291                 }
292                 sw.delBannedPeer(peer)
293         }
294         return nil
295 }
296
297 func (sw *Switch) delBannedPeer(addr string) error {
298         sw.mtx.Lock()
299         defer sw.mtx.Unlock()
300
301         delete(sw.bannedPeer, addr)
302         datajson, err := json.Marshal(sw.bannedPeer)
303         if err != nil {
304                 return err
305         }
306
307         sw.db.Set([]byte(bannedPeerKey), datajson)
308         return nil
309 }
310
311 func (sw *Switch) filterConnByIP(ip string) error {
312         if ip == sw.nodeInfo.ListenHost() {
313                 return ErrConnectSelf
314         }
315         return sw.checkBannedPeer(ip)
316 }
317
318 func (sw *Switch) filterConnByPeer(peer *Peer) error {
319         if err := sw.checkBannedPeer(peer.RemoteAddrHost()); err != nil {
320                 return err
321         }
322
323         if sw.nodeInfo.PubKey.Equals(peer.PubKey().Wrap()) {
324                 return ErrConnectSelf
325         }
326
327         if sw.peers.Has(peer.Key) {
328                 return ErrDuplicatePeer
329         }
330         return nil
331 }
332
333 func (sw *Switch) listenerRoutine(l Listener) {
334         for {
335                 inConn, ok := <-l.Connections()
336                 if !ok {
337                         break
338                 }
339
340                 // disconnect if we alrady have 2 * MaxNumPeers, we do this because we wanna address book get exchanged even if
341                 // the connect is full. The pex will disconnect the peer after address exchange, the max connected peer won't
342                 // be double of MaxNumPeers
343                 if sw.peers.Size() >= sw.Config.P2P.MaxNumPeers*2 {
344                         inConn.Close()
345                         log.Info("Ignoring inbound connection: already have enough peers.")
346                         continue
347                 }
348
349                 // New inbound connection!
350                 if err := sw.addPeerWithConnection(inConn); err != nil {
351                         log.Info("Ignoring inbound connection: error while adding peer.", " address:", inConn.RemoteAddr().String(), " error:", err)
352                         continue
353                 }
354         }
355 }
356
357 func (sw *Switch) startInitPeer(peer *Peer) error {
358         peer.Start() // spawn send/recv routines
359         for _, reactor := range sw.reactors {
360                 if err := reactor.AddPeer(peer); err != nil {
361                         return err
362                 }
363         }
364         return nil
365 }
366
367 func (sw *Switch) stopAndRemovePeer(peer *Peer, reason interface{}) {
368         for _, reactor := range sw.reactors {
369                 reactor.RemovePeer(peer, reason)
370         }
371         sw.peers.Remove(peer)
372         peer.Stop()
373 }