OSDN Git Service

Merge pull request #1472 from Bytom/mnemonic
[bytom/bytom.git] / netsync / protocol_reactor.go
index 550208f..4500f87 100644 (file)
@@ -1,83 +1,47 @@
 package netsync
 
 import (
-       "reflect"
        "time"
 
        log "github.com/sirupsen/logrus"
-       cmn "github.com/tendermint/tmlibs/common"
 
        "github.com/bytom/errors"
        "github.com/bytom/p2p"
-       "github.com/bytom/p2p/trust"
-       "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
+       "github.com/bytom/p2p/connection"
 )
 
 const (
-       // BlockchainChannel is a channel for blocks and status updates
-       BlockchainChannel        = byte(0x40)
-       protocolHandshakeTimeout = time.Second * 10
+       handshakeTimeout    = 10 * time.Second
+       handshakeCheckPerid = 500 * time.Millisecond
 )
 
 var (
-       //ErrProtocolHandshakeTimeout peers handshake timeout
-       ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+       errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+       errStatusRequest            = errors.New("Status request error")
 )
 
-// Response describes the response standard.
-type Response struct {
-       Status string      `json:"status,omitempty"`
-       Msg    string      `json:"msg,omitempty"`
-       Data   interface{} `json:"data,omitempty"`
-}
-
-type initalPeerStatus struct {
-       peerID string
-       height uint64
-       hash   *bc.Hash
-}
-
 //ProtocolReactor handles new coming protocol message.
 type ProtocolReactor struct {
        p2p.BaseReactor
 
-       chain       *protocol.Chain
-       blockKeeper *blockKeeper
-       txPool      *protocol.TxPool
-       sw          *p2p.Switch
-       fetcher     *Fetcher
-       peers       *peerSet
-
-       newPeerCh      chan struct{}
-       quitReqBlockCh chan *string
-       txSyncCh       chan *txsync
-       peerStatusCh   chan *initalPeerStatus
+       sm    *SyncManager
+       peers *peerSet
 }
 
 // NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
        pr := &ProtocolReactor{
-               chain:          chain,
-               blockKeeper:    blockPeer,
-               txPool:         txPool,
-               sw:             sw,
-               fetcher:        fetcher,
-               peers:          peers,
-               newPeerCh:      newPeerCh,
-               txSyncCh:       txSyncCh,
-               quitReqBlockCh: quitReqBlockCh,
-               peerStatusCh:   make(chan *initalPeerStatus),
+               sm:    sm,
+               peers: peers,
        }
        pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
        return pr
 }
 
 // GetChannels implements Reactor
-func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
-       return []*p2p.ChannelDescriptor{
-               &p2p.ChannelDescriptor{
+func (pr *ProtocolReactor) GetChannels() []*connection.ChannelDescriptor {
+       return []*connection.ChannelDescriptor{
+               &connection.ChannelDescriptor{
                        ID:                BlockchainChannel,
                        Priority:          5,
                        SendQueueCapacity: 100,
@@ -96,118 +60,40 @@ func (pr *ProtocolReactor) OnStop() {
        pr.BaseReactor.OnStop()
 }
 
-// syncTransactions starts sending all currently pending transactions to the given peer.
-func (pr *ProtocolReactor) syncTransactions(p *peer) {
-       pending := pr.txPool.GetTransactions()
-       if len(pending) == 0 {
-               return
-       }
-       txs := make([]*types.Tx, len(pending))
-       for i, batch := range pending {
-               txs[i] = batch.Tx
-       }
-       pr.txSyncCh <- &txsync{p, txs}
-}
-
 // AddPeer implements Reactor by sending our state to peer.
 func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
-       peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
-       handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+       if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
+               return errStatusRequest
+       }
+
+       checkTicker := time.NewTimer(handshakeCheckPerid)
+       timeoutTicker := time.NewTimer(handshakeTimeout)
        for {
                select {
-               case status := <-pr.peerStatusCh:
-                       if status.peerID == peer.Key {
-                               pr.peers.AddPeer(peer)
-                               pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
-                               pr.syncTransactions(pr.peers.Peer(peer.Key))
-                               pr.newPeerCh <- struct{}{}
+               case <-checkTicker.C:
+                       if exist := pr.peers.getPeer(peer.Key); exist != nil {
+                               pr.sm.syncTransactions(peer.Key)
                                return nil
                        }
-               case <-handshakeWait.C:
-                       return ErrProtocolHandshakeTimeout
+
+               case <-timeoutTicker.C:
+                       return errProtocolHandshakeTimeout
                }
        }
 }
 
 // RemovePeer implements Reactor by removing peer from the pool.
 func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       pr.quitReqBlockCh <- &peer.Key
-       pr.peers.RemovePeer(peer.Key)
+       pr.peers.removePeer(peer.Key)
 }
 
 // Receive implements Reactor by handling 4 types of messages (look below).
 func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
-       var tm *trust.TrustMetric
-       key := src.Connection().RemoteAddress.IP.String()
-       if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
-               log.Errorf("Can't get peer trust metric")
-               return
-       }
-
-       _, msg, err := DecodeMessage(msgBytes)
+       msgType, msg, err := DecodeMessage(msgBytes)
        if err != nil {
-               log.Errorf("Error decoding messagek %v", err)
+               log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
                return
        }
-       log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
-
-       switch msg := msg.(type) {
-       case *BlockRequestMessage:
-               var block *types.Block
-               var err error
-               if msg.Height != 0 {
-                       block, err = pr.chain.GetBlockByHeight(msg.Height)
-               } else {
-                       block, err = pr.chain.GetBlockByHash(msg.GetHash())
-               }
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage get block: %v", err)
-                       return
-               }
-               response, err := NewBlockResponseMessage(block)
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
-                       return
-               }
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
-
-       case *BlockResponseMessage:
-               log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
-               pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
-
-       case *StatusRequestMessage:
-               blockHeader := pr.chain.BestBlockHeader()
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader)})
 
-       case *StatusResponseMessage:
-               peerStatus := &initalPeerStatus{
-                       peerID: src.Key,
-                       height: msg.Height,
-                       hash:   msg.GetHash(),
-               }
-               pr.peerStatusCh <- peerStatus
-
-       case *TransactionNotifyMessage:
-               tx, err := msg.GetTransaction()
-               if err != nil {
-                       log.Errorf("Error decoding new tx %v", err)
-                       return
-               }
-               pr.blockKeeper.AddTx(tx, src.Key)
-
-       case *MineBlockMessage:
-               block, err := msg.GetMineBlock()
-               if err != nil {
-                       log.Errorf("Error decoding mined block %v", err)
-                       return
-               }
-               // Mark the peer as owning the block and schedule it for import
-               hash := block.Hash()
-               pr.peers.MarkBlock(src.Key, &hash)
-               pr.fetcher.Enqueue(src.Key, block)
-               pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
-
-       default:
-               log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
-       }
+       pr.sm.processMsg(src, msgType, msg)
 }