package netsync
import (
- "reflect"
- "strings"
- "sync"
"time"
log "github.com/sirupsen/logrus"
- cmn "github.com/tendermint/tmlibs/common"
"github.com/bytom/errors"
"github.com/bytom/p2p"
"github.com/bytom/p2p/connection"
- "github.com/bytom/protocol"
- "github.com/bytom/protocol/bc"
- "github.com/bytom/protocol/bc/types"
)
const (
- // BlockchainChannel is a channel for blocks and status updates
- BlockchainChannel = byte(0x40)
- protocolHandshakeTimeout = time.Second * 10
- handshakeRetryTicker = 4 * time.Second
+ handshakeTimeout = 10 * time.Second
+ handshakeCheckPerid = 500 * time.Millisecond
)
var (
- //ErrProtocolHandshakeTimeout peers handshake timeout
- ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
- ErrStatusRequest = errors.New("Status request error")
- ErrDiffGenesisHash = errors.New("Different genesis hash")
+ errProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+ errStatusRequest = errors.New("Status request error")
)
-// Response describes the response standard.
-type Response struct {
- Status string `json:"status,omitempty"`
- Msg string `json:"msg,omitempty"`
- Data interface{} `json:"data,omitempty"`
-}
-
-type initalPeerStatus struct {
- peerID string
- height uint64
- hash *bc.Hash
- genesisHash *bc.Hash
-}
-
//ProtocolReactor handles new coming protocol message.
type ProtocolReactor struct {
p2p.BaseReactor
- chain *protocol.Chain
- blockKeeper *blockKeeper
- txPool *protocol.TxPool
- sw *p2p.Switch
- fetcher *Fetcher
- peers *peerSet
- handshakeMu sync.Mutex
- genesisHash bc.Hash
-
- newPeerCh chan struct{}
- quitReqBlockCh chan *string
- txSyncCh chan *txsync
- peerStatusCh chan *initalPeerStatus
+ sm *SyncManager
+ peers *peerSet
}
// NewProtocolReactor returns the reactor of whole blockchain.
-func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+func NewProtocolReactor(sm *SyncManager, peers *peerSet) *ProtocolReactor {
pr := &ProtocolReactor{
- chain: chain,
- blockKeeper: blockPeer,
- txPool: txPool,
- sw: sw,
- fetcher: fetcher,
- peers: peers,
- newPeerCh: newPeerCh,
- txSyncCh: txSyncCh,
- quitReqBlockCh: quitReqBlockCh,
- peerStatusCh: make(chan *initalPeerStatus),
+ sm: sm,
+ peers: peers,
}
pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
- genesisBlock, _ := pr.chain.GetBlockByHeight(0)
- pr.genesisHash = genesisBlock.Hash()
-
return pr
}
pr.BaseReactor.OnStop()
}
-// syncTransactions starts sending all currently pending transactions to the given peer.
-func (pr *ProtocolReactor) syncTransactions(p *peer) {
- if p == nil {
- return
- }
- pending := pr.txPool.GetTransactions()
- if len(pending) == 0 {
- return
- }
- txs := make([]*types.Tx, len(pending))
- for i, batch := range pending {
- txs[i] = batch.Tx
- }
- pr.txSyncCh <- &txsync{p, txs}
-}
-
// AddPeer implements Reactor by sending our state to peer.
func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
- pr.handshakeMu.Lock()
- defer pr.handshakeMu.Unlock()
- if peer == nil {
- return errPeerDropped
- }
if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
- return ErrStatusRequest
+ return errStatusRequest
}
- retryTicker := time.Tick(handshakeRetryTicker)
- handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+
+ checkTicker := time.NewTimer(handshakeCheckPerid)
+ timeoutTicker := time.NewTimer(handshakeTimeout)
for {
select {
- case status := <-pr.peerStatusCh:
- if status.peerID == peer.Key {
- if strings.Compare(pr.genesisHash.String(), status.genesisHash.String()) != 0 {
- log.Info("Remote peer genesis block hash:", status.genesisHash.String(), " local hash:", pr.genesisHash.String())
- return ErrDiffGenesisHash
- }
- pr.peers.AddPeer(peer)
- pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
- prPeer, ok := pr.peers.Peer(peer.Key)
- if !ok {
- return errPeerDropped
- }
- pr.syncTransactions(prPeer)
- pr.newPeerCh <- struct{}{}
+ case <-checkTicker.C:
+ if exist := pr.peers.getPeer(peer.Key); exist != nil {
+ pr.sm.syncTransactions(peer.Key)
return nil
}
- case <-retryTicker:
- if peer == nil {
- return errPeerDropped
- }
- if ok := peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}}); !ok {
- return ErrStatusRequest
- }
- case <-handshakeWait.C:
- return ErrProtocolHandshakeTimeout
+
+ case <-timeoutTicker.C:
+ return errProtocolHandshakeTimeout
}
}
}
// RemovePeer implements Reactor by removing peer from the pool.
func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
- select {
- case pr.quitReqBlockCh <- &peer.Key:
- default:
- log.Warning("quitReqBlockCh is full")
- }
- pr.peers.RemovePeer(peer.Key)
+ pr.peers.removePeer(peer.Key)
}
// Receive implements Reactor by handling 4 types of messages (look below).
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
- _, msg, err := DecodeMessage(msgBytes)
+ msgType, msg, err := DecodeMessage(msgBytes)
if err != nil {
- log.Errorf("Error decoding message %v", err)
+ log.WithFields(log.Fields{"module": logModule, "err": err}).Error("fail on reactor decoding message")
return
}
- log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
-
- switch msg := msg.(type) {
- case *BlockRequestMessage:
- var block *types.Block
- var err error
- if msg.Height != 0 {
- block, err = pr.chain.GetBlockByHeight(msg.Height)
- } else {
- block, err = pr.chain.GetBlockByHash(msg.GetHash())
- }
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage get block: %v", err)
- return
- }
- response, err := NewBlockResponseMessage(block)
- if err != nil {
- log.Errorf("Fail on BlockRequestMessage create response: %v", err)
- return
- }
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
-
- case *BlockResponseMessage:
- log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
- pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
-
- case *StatusRequestMessage:
- blockHeader := pr.chain.BestBlockHeader()
- src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(blockHeader, &pr.genesisHash)})
-
- case *StatusResponseMessage:
- peerStatus := &initalPeerStatus{
- peerID: src.Key,
- height: msg.Height,
- hash: msg.GetHash(),
- genesisHash: msg.GetGenesisHash(),
- }
- pr.peerStatusCh <- peerStatus
-
- case *TransactionNotifyMessage:
- tx, err := msg.GetTransaction()
- if err != nil {
- log.Errorf("Error decoding new tx %v", err)
- return
- }
- pr.blockKeeper.AddTx(tx, src.Key)
- case *MineBlockMessage:
- block, err := msg.GetMineBlock()
- if err != nil {
- log.Errorf("Error decoding mined block %v", err)
- return
- }
- // Mark the peer as owning the block and schedule it for import
- hash := block.Hash()
- pr.peers.MarkBlock(src.Key, &hash)
- pr.fetcher.Enqueue(src.Key, block)
- pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
-
- default:
- log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
- }
+ pr.sm.processMsg(src, msgType, msg)
}