OSDN Git Service

Optimize p2p transfer business layer logic (#501)
authoryahtoo <yahtoo.ma@gmail.com>
Sat, 7 Apr 2018 08:05:35 +0000 (16:05 +0800)
committerPaladz <yzhu101@uottawa.ca>
Sat, 7 Apr 2018 08:05:35 +0000 (16:05 +0800)
* Optimize p2p business layer logic

* Merge dev branch code

* Fix new mined block can't broadcast error

* Remove unused test file

* Del unused parameter

* Add RWMutex

* Add sent message filter

* Fix request block quite bug

* Del unused function BroadcastToPeers

* Fix new peer channel bug

* Del chainHeight from blockKeeper

* Fix sync check logic bug

* Add business protocol handshake

* Del unused code

* Optimize code format

* Del unused function

* Refactor request block by hash/height function

* Refactor tx notify handle

* Refactor BlockRequestWorker to sequential process

* Fix net bestheight error

* Refactor p2p net sync code

* Refactor p2p net sync code

* Refactor BlockRequestWorker of blockkeeper

* ValidateTx return orphan flag

* For p2p transfer test

* For p2p transfer test

* Check exist block whether orphan

* Enable peer exchange function default

* Add txpool sync when new peer connected

* Fix golint check error

* Change mine difficulty

* Fix requst genesis block bug

* Fix address compare function

* Fix blockrequest bug

* Revert "For p2p transfer test"

This reverts commit e6a36d86d5077f6c520dbfbb6eee2ab4335012e5.

* Fix request block bug

* For test

* Fix code review problem

* Optimize log printing

* Fix ci check error

37 files changed:
api/api.go
api/block_retrieve.go [moved from api/blockchain_reactor.go with 92% similarity]
api/miner.go
api/nodeinfo.go [new file with mode: 0644]
api/query.go
api/txfeeds.go
blockchain/block_keeper.go [deleted file]
blockchain/blockchain_reactor.go [deleted file]
blockchain/miner.go [deleted file]
blockchain/reactor.go [deleted file]
blockchain/transact.go [deleted file]
blockchain/txbuilder/finalize.go
blockchain/txfeeds.go [deleted file]
cmd/bytomd/commands/run_node.go
cmd/miner/main.go
config/config.go
config/genesis.go
consensus/general.go
netsync/block_keeper.go [new file with mode: 0644]
netsync/fetcher.go [new file with mode: 0644]
netsync/handle.go [new file with mode: 0644]
netsync/message.go [moved from blockchain/net.go with 61% similarity]
netsync/peer.go [new file with mode: 0644]
netsync/protocol_reactor.go [new file with mode: 0644]
netsync/sync.go [new file with mode: 0644]
node/node.go
p2p/pex_reactor.go
p2p/switch.go
protocol/block.go
protocol/tx.go
vendor/gopkg.in/fatih/set.v0/LICENSE.md [new file with mode: 0644]
vendor/gopkg.in/fatih/set.v0/README.md [new file with mode: 0644]
vendor/gopkg.in/fatih/set.v0/set.go [new file with mode: 0644]
vendor/gopkg.in/fatih/set.v0/set_nots.go [new file with mode: 0644]
vendor/gopkg.in/fatih/set.v0/set_ts.go [new file with mode: 0644]
vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go [new file with mode: 0755]
vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go [new file with mode: 0755]

index 27ac3f7..3b71e08 100644 (file)
@@ -12,14 +12,17 @@ import (
        cmn "github.com/tendermint/tmlibs/common"
 
        "github.com/bytom/accesstoken"
-       "github.com/bytom/blockchain"
+       "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
        "github.com/bytom/dashboard"
        "github.com/bytom/errors"
+       "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
        "github.com/bytom/net/http/authn"
        "github.com/bytom/net/http/gzip"
        "github.com/bytom/net/http/httpjson"
        "github.com/bytom/net/http/static"
+       "github.com/bytom/netsync"
        "github.com/bytom/protocol"
        "github.com/bytom/wallet"
 )
@@ -72,12 +75,15 @@ func (wh *waitHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
 
 // API is the scheduling center for server
 type API struct {
-       bcr          *blockchain.BlockchainReactor
-       wallet       *wallet.Wallet
-       accessTokens *accesstoken.CredentialStore
-       chain        *protocol.Chain
-       server       *http.Server
-       handler      http.Handler
+       sync          *netsync.SyncManager
+       wallet        *wallet.Wallet
+       accessTokens  *accesstoken.CredentialStore
+       chain         *protocol.Chain
+       server        *http.Server
+       handler       http.Handler
+       txFeedTracker *txfeed.Tracker
+       cpuMiner      *cpuminer.CPUMiner
+       miningPool    *miningpool.MiningPool
 }
 
 func (a *API) initServer(config *cfg.Config) {
@@ -134,12 +140,15 @@ func (a *API) StartServer(address string) {
 }
 
 // NewAPI create and initialize the API
-func NewAPI(bcr *blockchain.BlockchainReactor, wallet *wallet.Wallet, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
+func NewAPI(sync *netsync.SyncManager, wallet *wallet.Wallet, txfeeds *txfeed.Tracker, cpuMiner *cpuminer.CPUMiner, miningPool *miningpool.MiningPool, chain *protocol.Chain, config *cfg.Config, token *accesstoken.CredentialStore) *API {
        api := &API{
-               bcr:          bcr,
-               wallet:       wallet,
-               chain:        chain,
-               accessTokens: token,
+               sync:          sync,
+               wallet:        wallet,
+               chain:         chain,
+               accessTokens:  token,
+               txFeedTracker: txfeeds,
+               cpuMiner:      cpuMiner,
+               miningPool:    miningPool,
        }
        api.buildHandler()
        api.initServer(config)
@@ -197,7 +206,6 @@ func (a *API) buildHandler() {
        m.Handle("/", alwaysError(errors.New("not Found")))
        m.Handle("/error", jsonHandler(a.walletError))
 
-       m.Handle("/info", jsonHandler(a.bcr.Info))
        m.Handle("/net-info", jsonHandler(a.getNetInfo))
 
        m.Handle("/create-access-token", jsonHandler(a.createAccessToken))
similarity index 92%
rename from api/blockchain_reactor.go
rename to api/block_retrieve.go
index 0b018f8..ec5a434 100644 (file)
@@ -4,18 +4,13 @@ import (
        log "github.com/sirupsen/logrus"
 
        "github.com/bytom/blockchain/query"
-       "github.com/bytom/consensus"
+       "github.com/bytom/wallet"
        "github.com/bytom/consensus/difficulty"
        chainjson "github.com/bytom/encoding/json"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/wallet"
 )
 
-// return network infomation
-func (a *API) getNetInfo() Response {
-       return NewSuccessResponse(a.bcr.GetNodeInfo())
-}
 
 // return best block hash
 func (a *API) getBestBlockHash() Response {
@@ -172,14 +167,3 @@ func (a *API) getBlockCount() Response {
        return NewSuccessResponse(blockHeight)
 }
 
-// return is in mining or not
-func (a *API) isMining() Response {
-       IsMining := map[string]bool{"isMining": a.bcr.IsMining()}
-       return NewSuccessResponse(IsMining)
-}
-
-// return gasRate
-func (a *API) gasRate() Response {
-       gasrate := map[string]int64{"gasRate": consensus.VMGasRate}
-       return NewSuccessResponse(gasrate)
-}
index 30220b3..a6b2f8e 100644 (file)
@@ -4,22 +4,24 @@ import (
        "context"
 
        "github.com/bytom/protocol/bc/types"
+       "github.com/bytom/protocol/bc"
 )
 
 func (a *API) getWork() Response {
-       work, err := a.bcr.GetWork()
+       work, err := a.GetWork()
        if err != nil {
                return NewErrorResponse(err)
        }
        return NewSuccessResponse(work)
 }
 
+
 type SubmitWorkReq struct {
        BlockHeader *types.BlockHeader `json:"block_header"`
 }
 
 func (a *API) submitWork(ctx context.Context, req *SubmitWorkReq) Response {
-       if err := a.bcr.SubmitWork(req.BlockHeader); err != nil {
+       if err := a.SubmitWork(req.BlockHeader); err != nil {
                return NewErrorResponse(err)
        }
        return NewSuccessResponse(true)
@@ -39,3 +41,30 @@ func (a *API) getBlockHeaderByHeight(ctx context.Context, req struct {
        }
        return NewSuccessResponse(resp)
 }
+
+// GetWorkResp is resp struct for API
+type GetWorkResp struct {
+       BlockHeader *types.BlockHeader `json:"block_header"`
+       Seed        *bc.Hash           `json:"seed"`
+}
+
+func (a *API) GetWork() (*GetWorkResp, error) {
+       bh, err := a.miningPool.GetWork()
+       if err != nil {
+               return nil, err
+       }
+
+       seed, err := a.chain.GetSeed(bh.Height, &bh.PreviousBlockHash)
+       if err != nil {
+               return nil, err
+       }
+
+       return &GetWorkResp{
+               BlockHeader: bh,
+               Seed:        seed,
+       }, nil
+}
+
+func (a *API) SubmitWork(bh *types.BlockHeader) error {
+       return a.miningPool.SubmitWork(bh)
+}
diff --git a/api/nodeinfo.go b/api/nodeinfo.go
new file mode 100644 (file)
index 0000000..0a6c125
--- /dev/null
@@ -0,0 +1,40 @@
+package api
+
+type NetInfo struct {
+       Listening    bool   `json:"listening"`
+       Syncing      bool   `json:"syncing"`
+       Mining       bool   `json:"mining"`
+       PeerCount    int    `json:"peer_count"`
+       CurrentBlock uint64 `json:"current_block"`
+       HighestBlock uint64 `json:"highest_block"`
+}
+
+func (a *API) GetNodeInfo() *NetInfo {
+       info := &NetInfo{
+               Listening:    a.sync.Switch().IsListening(),
+               Syncing:      a.sync.BlockKeeper().IsCaughtUp(),
+               Mining:       a.cpuMiner.IsMining(),
+               PeerCount:    len(a.sync.Switch().Peers().List()),
+               CurrentBlock: a.chain.Height(),
+       }
+       _, info.HighestBlock = a.sync.Peers().BestPeer()
+       if info.CurrentBlock > info.HighestBlock {
+               info.HighestBlock = info.CurrentBlock
+       }
+       return info
+}
+
+// return network infomation
+func (a *API) getNetInfo() Response {
+       return NewSuccessResponse(a.GetNodeInfo())
+}
+
+// return is in mining or not
+func (a *API) isMining() Response {
+       IsMining := map[string]bool{"isMining": a.IsMining()}
+       return NewSuccessResponse(IsMining)
+}
+
+func (a *API) IsMining() bool {
+       return a.cpuMiner.IsMining()
+}
index c99fc0a..ebaf819 100755 (executable)
@@ -8,6 +8,7 @@ import (
 
        "github.com/bytom/account"
        "github.com/bytom/blockchain/query"
+       "github.com/bytom/consensus"
 )
 
 // POST /list-accounts
@@ -117,3 +118,9 @@ func (a *API) listUnspentOutputs(ctx context.Context, filter struct {
 
        return NewSuccessResponse(UTXOs)
 }
+
+// return gasRate
+func (a *API) gasRate() Response {
+       gasrate := map[string]int64{"gasRate": consensus.VMGasRate}
+       return NewSuccessResponse(gasrate)
+}
index c115336..2786383 100644 (file)
@@ -6,6 +6,7 @@ import (
 
        log "github.com/sirupsen/logrus"
 
+       "github.com/bytom/errors"
        "github.com/bytom/blockchain/txfeed"
 )
 
@@ -14,7 +15,7 @@ func (a *API) createTxFeed(ctx context.Context, in struct {
        Alias  string `json:"alias"`
        Filter string `json:"filter"`
 }) Response {
-       if err := a.bcr.TxFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+       if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
                log.WithField("error", err).Error("Add TxFeed Failed")
                return NewErrorResponse(err)
        }
@@ -26,7 +27,7 @@ func (a *API) getTxFeed(ctx context.Context, in struct {
        Alias string `json:"alias,omitempty"`
 }) Response {
        var tmpTxFeed interface{}
-       rawTxfeed, err := a.bcr.GetTxFeedByAlias(ctx, in.Alias)
+       rawTxfeed, err := a.GetTxFeedByAlias(ctx, in.Alias)
        if err != nil {
                return NewErrorResponse(err)
        }
@@ -42,7 +43,7 @@ func (a *API) getTxFeed(ctx context.Context, in struct {
 func (a *API) deleteTxFeed(ctx context.Context, in struct {
        Alias string `json:"alias,omitempty"`
 }) Response {
-       if err := a.bcr.TxFeedTracker.Delete(ctx, in.Alias); err != nil {
+       if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
                return NewErrorResponse(err)
        }
        return NewSuccessResponse(nil)
@@ -53,10 +54,10 @@ func (a *API) updateTxFeed(ctx context.Context, in struct {
        Alias  string `json:"alias"`
        Filter string `json:"filter"`
 }) Response {
-       if err := a.bcr.TxFeedTracker.Delete(ctx, in.Alias); err != nil {
+       if err := a.txFeedTracker.Delete(ctx, in.Alias); err != nil {
                return NewErrorResponse(err)
        }
-       if err := a.bcr.TxFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
+       if err := a.txFeedTracker.Create(ctx, in.Alias, in.Filter); err != nil {
                log.WithField("error", err).Error("Update TxFeed Failed")
                return NewErrorResponse(err)
        }
@@ -67,7 +68,7 @@ func (a *API) getTxFeeds() ([]txfeed.TxFeed, error) {
        txFeed := txfeed.TxFeed{}
        txFeeds := make([]txfeed.TxFeed, 0)
 
-       iter := a.bcr.TxFeedTracker.DB.Iterator()
+       iter := a.txFeedTracker.DB.Iterator()
        defer iter.Release()
 
        for iter.Next() {
@@ -90,3 +91,17 @@ func (a *API) listTxFeeds(ctx context.Context) Response {
 
        return NewSuccessResponse(txFeeds)
 }
+
+func (a *API) GetTxFeedByAlias(ctx context.Context, filter string) ([]byte, error) {
+       jf, err := json.Marshal(filter)
+       if err != nil {
+               return nil, err
+       }
+
+       value := a.txFeedTracker.DB.Get(jf)
+       if value == nil {
+               return nil, errors.New("No transaction feed")
+       }
+
+       return value, nil
+}
diff --git a/blockchain/block_keeper.go b/blockchain/block_keeper.go
deleted file mode 100644 (file)
index 4c9d125..0000000
+++ /dev/null
@@ -1,213 +0,0 @@
-package blockchain
-
-import (
-       "errors"
-       "sync"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/bytom/p2p"
-       "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-)
-
-type blockKeeperPeer struct {
-       mtx    sync.RWMutex
-       height uint64
-       hash   *bc.Hash
-}
-
-func newBlockKeeperPeer(height uint64, hash *bc.Hash) *blockKeeperPeer {
-       return &blockKeeperPeer{
-               height: height,
-               hash:   hash,
-       }
-}
-
-func (p *blockKeeperPeer) GetStatus() (height uint64, hash *bc.Hash) {
-       p.mtx.RLock()
-       defer p.mtx.RUnlock()
-       return p.height, p.hash
-}
-
-func (p *blockKeeperPeer) SetStatus(height uint64, hash *bc.Hash) {
-       p.mtx.Lock()
-       defer p.mtx.Unlock()
-
-       p.height = height
-       p.hash = hash
-}
-
-type pendingResponse struct {
-       block *types.Block
-       src   *p2p.Peer
-}
-
-//TODO: add retry mechanism
-type blockKeeper struct {
-       mtx           sync.RWMutex
-       chainHeight   uint64
-       maxPeerHeight uint64
-       chainUpdateCh <-chan struct{}
-       peerUpdateCh  chan struct{}
-       done          chan bool
-
-       chain            *protocol.Chain
-       sw               *p2p.Switch
-       peers            map[string]*blockKeeperPeer
-       pendingProcessCh chan *pendingResponse
-}
-
-func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch) *blockKeeper {
-       chainHeight := chain.Height()
-       bk := &blockKeeper{
-               chainHeight:   chainHeight,
-               maxPeerHeight: uint64(0),
-               chainUpdateCh: chain.BlockWaiter(chainHeight + 1),
-               peerUpdateCh:  make(chan struct{}, 1000),
-               done:          make(chan bool, 1),
-
-               chain:            chain,
-               sw:               sw,
-               peers:            make(map[string]*blockKeeperPeer),
-               pendingProcessCh: make(chan *pendingResponse),
-       }
-       go bk.blockProcessWorker()
-       go bk.blockRequestWorker()
-       return bk
-}
-
-func (bk *blockKeeper) Stop() {
-       bk.done <- true
-}
-
-func (bk *blockKeeper) AddBlock(block *types.Block, src *p2p.Peer) {
-       bk.pendingProcessCh <- &pendingResponse{block: block, src: src}
-}
-
-func (bk *blockKeeper) IsCaughtUp() bool {
-       bk.mtx.RLock()
-       defer bk.mtx.RUnlock()
-       return bk.chainHeight >= bk.maxPeerHeight
-}
-
-func (bk *blockKeeper) RemovePeer(peerID string) {
-       bk.mtx.Lock()
-       delete(bk.peers, peerID)
-       bk.mtx.Unlock()
-       log.WithField("ID", peerID).Info("Delete peer from blockKeeper")
-}
-
-func (bk *blockKeeper) requestBlockByHash(peerID string, hash *bc.Hash) error {
-       peer := bk.sw.Peers().Get(peerID)
-       if peer == nil {
-               return errors.New("can't find peer in peer pool")
-       }
-       msg := &BlockRequestMessage{RawHash: hash.Byte32()}
-       peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
-}
-
-func (bk *blockKeeper) requestBlockByHeight(peerID string, height uint64) error {
-       peer := bk.sw.Peers().Get(peerID)
-       if peer == nil {
-               return errors.New("can't find peer in peer pool")
-       }
-       msg := &BlockRequestMessage{Height: height}
-       peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
-}
-
-func (bk *blockKeeper) SetPeerHeight(peerID string, height uint64, hash *bc.Hash) {
-       bk.mtx.Lock()
-       defer bk.mtx.Unlock()
-
-       if height > bk.maxPeerHeight {
-               bk.maxPeerHeight = height
-               bk.peerUpdateCh <- struct{}{}
-       }
-
-       if peer, ok := bk.peers[peerID]; ok {
-               peer.SetStatus(height, hash)
-               return
-       }
-       peer := newBlockKeeperPeer(height, hash)
-       bk.peers[peerID] = peer
-       log.WithFields(log.Fields{"ID": peerID, "Height": height}).Info("Add new peer to blockKeeper")
-}
-
-func (bk *blockKeeper) RequestBlockByHeight(height uint64) {
-       bk.mtx.RLock()
-       defer bk.mtx.RUnlock()
-
-       for peerID, peer := range bk.peers {
-               if peerHeight, _ := peer.GetStatus(); peerHeight > bk.chainHeight {
-                       bk.requestBlockByHeight(peerID, height)
-               }
-       }
-}
-
-func (bk *blockKeeper) blockRequestWorker() {
-       for {
-               select {
-               case <-bk.chainUpdateCh:
-                       chainHeight := bk.chain.Height()
-                       bk.mtx.Lock()
-                       if bk.chainHeight < chainHeight {
-                               bk.chainHeight = chainHeight
-                       }
-                       bk.chainUpdateCh = bk.chain.BlockWaiter(bk.chainHeight + 1)
-                       bk.mtx.Unlock()
-
-               case <-bk.peerUpdateCh:
-                       bk.mtx.RLock()
-                       chainHeight := bk.chainHeight
-                       maxPeerHeight := bk.maxPeerHeight
-                       bk.mtx.RUnlock()
-
-                       for i := chainHeight + 1; i <= maxPeerHeight; i++ {
-                               bk.RequestBlockByHeight(i)
-                               waiter := bk.chain.BlockWaiter(i)
-                               retryTicker := time.Tick(15 * time.Second)
-
-                       retryLoop:
-                               for {
-                                       select {
-                                       case <-waiter:
-                                               break retryLoop
-                                       case <-retryTicker:
-                                               bk.RequestBlockByHeight(i)
-                                       }
-                               }
-                       }
-
-               case <-bk.done:
-                       return
-               }
-       }
-}
-
-func (bk *blockKeeper) blockProcessWorker() {
-       for pendingResponse := range bk.pendingProcessCh {
-
-               block := pendingResponse.block
-               blockHash := block.Hash()
-               isOrphan, err := bk.chain.ProcessBlock(block)
-               if err != nil {
-                       bk.sw.AddScamPeer(pendingResponse.src)
-                       log.WithField("hash", blockHash.String()).Errorf("blockKeeper fail process block %v", err)
-                       continue
-               }
-               log.WithFields(log.Fields{
-                       "height":   block.Height,
-                       "hash":     blockHash.String(),
-                       "isOrphan": isOrphan,
-               }).Info("blockKeeper processed block")
-
-               if isOrphan {
-                       bk.requestBlockByHash(pendingResponse.src.Key, &block.PreviousBlockHash)
-               }
-       }
-}
diff --git a/blockchain/blockchain_reactor.go b/blockchain/blockchain_reactor.go
deleted file mode 100644 (file)
index 19f9dda..0000000
+++ /dev/null
@@ -1,25 +0,0 @@
-package blockchain
-
-type NetInfo struct {
-       Listening    bool   `json:"listening"`
-       Syncing      bool   `json:"syncing"`
-       Mining       bool   `json:"mining"`
-       PeerCount    int    `json:"peer_count"`
-       CurrentBlock uint64 `json:"current_block"`
-       HighestBlock uint64 `json:"highest_block"`
-}
-
-func (bcr *BlockchainReactor) GetNodeInfo() *NetInfo {
-       return &NetInfo{
-               Listening:    bcr.sw.IsListening(),
-               Syncing:      bcr.blockKeeper.IsCaughtUp(),
-               Mining:       bcr.mining.IsMining(),
-               PeerCount:    len(bcr.sw.Peers().List()),
-               CurrentBlock: bcr.blockKeeper.chainHeight,
-               HighestBlock: bcr.blockKeeper.maxPeerHeight,
-       }
-}
-
-func (bcr *BlockchainReactor) IsMining() bool {
-       return bcr.mining.IsMining()
-}
diff --git a/blockchain/miner.go b/blockchain/miner.go
deleted file mode 100644 (file)
index aea626f..0000000
+++ /dev/null
@@ -1,33 +0,0 @@
-package blockchain
-
-import (
-       "github.com/bytom/protocol/bc"
-       "github.com/bytom/protocol/bc/types"
-)
-
-// GetWorkResp is resp struct for API
-type GetWorkResp struct {
-       BlockHeader *types.BlockHeader `json:"block_header"`
-       Seed        *bc.Hash           `json:"seed"`
-}
-
-func (bcr *BlockchainReactor) GetWork() (*GetWorkResp, error) {
-       bh, err := bcr.miningPool.GetWork()
-       if err != nil {
-               return nil, err
-       }
-
-       seed, err := bcr.chain.GetSeed(bh.Height, &bh.PreviousBlockHash)
-       if err != nil {
-               return nil, err
-       }
-
-       return &GetWorkResp{
-               BlockHeader: bh,
-               Seed:        seed,
-       }, nil
-}
-
-func (bcr *BlockchainReactor) SubmitWork(bh *types.BlockHeader) error {
-       return bcr.miningPool.SubmitWork(bh)
-}
diff --git a/blockchain/reactor.go b/blockchain/reactor.go
deleted file mode 100755 (executable)
index bf4665d..0000000
+++ /dev/null
@@ -1,226 +0,0 @@
-package blockchain
-
-import (
-       "context"
-       "reflect"
-       "time"
-
-       log "github.com/sirupsen/logrus"
-       cmn "github.com/tendermint/tmlibs/common"
-
-       "github.com/bytom/account"
-       "github.com/bytom/blockchain/txfeed"
-       "github.com/bytom/mining/cpuminer"
-       "github.com/bytom/mining/miningpool"
-       "github.com/bytom/p2p"
-       "github.com/bytom/p2p/trust"
-       "github.com/bytom/protocol"
-       "github.com/bytom/protocol/bc"
-       protocolTypes "github.com/bytom/protocol/bc/types"
-       "github.com/bytom/types"
-)
-
-const (
-       // BlockchainChannel is a channel for blocks and status updates
-       BlockchainChannel = byte(0x40)
-       maxNewBlockChSize = int(1024)
-
-       statusUpdateIntervalSeconds = 10
-       maxBlockchainResponseSize   = 22020096 + 2
-)
-
-// BlockchainReactor handles long-term catchup syncing.
-type BlockchainReactor struct {
-       p2p.BaseReactor
-
-       chain         *protocol.Chain
-       TxFeedTracker *txfeed.Tracker // TODO: move it out from BlockchainReactor
-       blockKeeper   *blockKeeper
-       txPool        *protocol.TxPool
-       mining        *cpuminer.CPUMiner
-       miningPool    *miningpool.MiningPool
-       sw            *p2p.Switch
-       evsw          types.EventSwitch
-       newBlockCh    chan *bc.Hash
-       miningEnable  bool
-}
-
-// Info return the server information
-func (bcr *BlockchainReactor) Info(ctx context.Context) (map[string]interface{}, error) {
-       return map[string]interface{}{
-               "is_configured": false,
-               "version":       "0.001",
-               "build_commit":  "----",
-               "build_date":    "------",
-               "build_config":  "---------",
-       }, nil
-}
-
-// NewBlockchainReactor returns the reactor of whole blockchain.
-func NewBlockchainReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, accountMgr *account.Manager, txfeeds *txfeed.Tracker, miningEnable bool) *BlockchainReactor {
-       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
-       bcr := &BlockchainReactor{
-               chain:         chain,
-               blockKeeper:   newBlockKeeper(chain, sw),
-               txPool:        txPool,
-               sw:            sw,
-               TxFeedTracker: txfeeds,
-               miningEnable:  miningEnable,
-               newBlockCh:    newBlockCh,
-       }
-
-       bcr.mining = cpuminer.NewCPUMiner(chain, accountMgr, txPool, newBlockCh)
-       bcr.miningPool = miningpool.NewMiningPool(chain, accountMgr, txPool, newBlockCh)
-
-       bcr.BaseReactor = *p2p.NewBaseReactor("BlockchainReactor", bcr)
-       return bcr
-}
-
-// OnStart implements BaseService
-func (bcr *BlockchainReactor) OnStart() error {
-       bcr.BaseReactor.OnStart()
-
-       if bcr.miningEnable {
-               bcr.mining.Start()
-       }
-       go bcr.syncRoutine()
-       return nil
-}
-
-// OnStop implements BaseService
-func (bcr *BlockchainReactor) OnStop() {
-       bcr.BaseReactor.OnStop()
-       if bcr.miningEnable {
-               bcr.mining.Stop()
-       }
-       bcr.blockKeeper.Stop()
-}
-
-// GetChannels implements Reactor
-func (bcr *BlockchainReactor) GetChannels() []*p2p.ChannelDescriptor {
-       return []*p2p.ChannelDescriptor{
-               {
-                       ID:                BlockchainChannel,
-                       Priority:          5,
-                       SendQueueCapacity: 100,
-               },
-       }
-}
-
-// AddPeer implements Reactor by sending our state to peer.
-func (bcr *BlockchainReactor) AddPeer(peer *p2p.Peer) {
-       peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
-}
-
-// RemovePeer implements Reactor by removing peer from the pool.
-func (bcr *BlockchainReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
-       bcr.blockKeeper.RemovePeer(peer.Key)
-}
-
-// Receive implements Reactor by handling 4 types of messages (look below).
-func (bcr *BlockchainReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
-       var tm *trust.TrustMetric
-       key := src.Connection().RemoteAddress.IP.String()
-       if tm = bcr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
-               log.Errorf("Can't get peer trust metric")
-               return
-       }
-
-       _, msg, err := DecodeMessage(msgBytes)
-       if err != nil {
-               log.Errorf("Error decoding messagek %v", err)
-               return
-       }
-       log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
-
-       switch msg := msg.(type) {
-       case *BlockRequestMessage:
-               var block *protocolTypes.Block
-               var err error
-               if msg.Height != 0 {
-                       block, err = bcr.chain.GetBlockByHeight(msg.Height)
-               } else {
-                       block, err = bcr.chain.GetBlockByHash(msg.GetHash())
-               }
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage get block: %v", err)
-                       return
-               }
-               response, err := NewBlockResponseMessage(block)
-               if err != nil {
-                       log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
-                       return
-               }
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
-
-       case *BlockResponseMessage:
-               bcr.blockKeeper.AddBlock(msg.GetBlock(), src)
-
-       case *StatusRequestMessage:
-               block := bcr.chain.BestBlock()
-               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
-
-       case *StatusResponseMessage:
-               bcr.blockKeeper.SetPeerHeight(src.Key, msg.Height, msg.GetHash())
-
-       case *TransactionNotifyMessage:
-               tx := msg.GetTransaction()
-               if err := bcr.chain.ValidateTx(tx); err != nil {
-                       bcr.sw.AddScamPeer(src)
-               }
-
-       default:
-               log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
-       }
-}
-
-// Handle messages from the poolReactor telling the reactor what to do.
-// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
-// (Except for the SYNC_LOOP, which is the primary purpose and must be synchronous.)
-func (bcr *BlockchainReactor) syncRoutine() {
-       statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
-       newTxCh := bcr.txPool.GetNewTxCh()
-
-       for {
-               select {
-               case blockHash := <-bcr.newBlockCh:
-                       block, err := bcr.chain.GetBlockByHash(blockHash)
-                       if err != nil {
-                               log.Errorf("Error get block from newBlockCh %v", err)
-                       }
-                       log.WithFields(log.Fields{"Hash": blockHash, "height": block.Height}).Info("Boardcast my new block")
-               case newTx := <-newTxCh:
-                       bcr.TxFeedTracker.TxFilter(newTx)
-                       go bcr.BroadcastTransaction(newTx)
-               case _ = <-statusUpdateTicker.C:
-                       go bcr.BroadcastStatusResponse()
-
-                       if bcr.miningEnable {
-                               // mining if and only if block sync is finished
-                               if bcr.blockKeeper.IsCaughtUp() {
-                                       bcr.mining.Start()
-                               } else {
-                                       bcr.mining.Stop()
-                               }
-                       }
-               case <-bcr.Quit:
-                       return
-               }
-       }
-}
-
-// BroadcastStatusResponse broadcasts `BlockStore` height.
-func (bcr *BlockchainReactor) BroadcastStatusResponse() {
-       block := bcr.chain.BestBlock()
-       bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
-}
-
-// BroadcastTransaction broadcats `BlockStore` transaction.
-func (bcr *BlockchainReactor) BroadcastTransaction(tx *protocolTypes.Tx) error {
-       msg, err := NewTransactionNotifyMessage(tx)
-       if err != nil {
-               return err
-       }
-       bcr.Switch.Broadcast(BlockchainChannel, struct{ BlockchainMessage }{msg})
-       return nil
-}
diff --git a/blockchain/transact.go b/blockchain/transact.go
deleted file mode 100644 (file)
index 4a254aa..0000000
+++ /dev/null
@@ -1,78 +0,0 @@
-package blockchain
-
-import (
-       "context"
-
-       log "github.com/sirupsen/logrus"
-
-       "github.com/bytom/blockchain/txbuilder"
-       "github.com/bytom/errors"
-       "github.com/bytom/protocol/bc/types"
-)
-
-// finalizeTxWait calls FinalizeTx and then waits for confirmation of
-// the transaction.  A nil error return means the transaction is
-// confirmed on the blockchain.  ErrRejected means a conflicting tx is
-// on the blockchain.  context.DeadlineExceeded means ctx is an
-// expiring context that timed out.
-func (bcr *BlockchainReactor) finalizeTxWait(ctx context.Context, txTemplate *txbuilder.Template, waitUntil string) error {
-       // Use the current generator height as the lower bound of the block height
-       // that the transaction may appear in.
-       localHeight := bcr.chain.Height()
-       //generatorHeight := localHeight
-
-       log.WithField("localHeight", localHeight).Info("Starting to finalize transaction")
-
-       err := txbuilder.FinalizeTx(ctx, bcr.chain, txTemplate.Transaction)
-       if err != nil {
-               return err
-       }
-       if waitUntil == "none" {
-               return nil
-       }
-
-       //TODO:complete finalizeTxWait
-       //height, err := a.waitForTxInBlock(ctx, txTemplate.Transaction, generatorHeight)
-       if err != nil {
-               return err
-       }
-       if waitUntil == "confirmed" {
-               return nil
-       }
-
-       return nil
-}
-
-func (bcr *BlockchainReactor) waitForTxInBlock(ctx context.Context, tx *types.Tx, height uint64) (uint64, error) {
-       log.Printf("waitForTxInBlock function")
-       for {
-               height++
-               select {
-               case <-ctx.Done():
-                       return 0, ctx.Err()
-
-               case <-bcr.chain.BlockWaiter(height):
-                       b, err := bcr.chain.GetBlockByHeight(height)
-                       if err != nil {
-                               return 0, errors.Wrap(err, "getting block that just landed")
-                       }
-                       for _, confirmed := range b.Transactions {
-                               if confirmed.ID == tx.ID {
-                                       // confirmed
-                                       return height, nil
-                               }
-                       }
-
-                       // might still be in pool or might be rejected; we can't
-                       // tell definitively until its max time elapses.
-                       // Re-insert into the pool in case it was dropped.
-                       err = txbuilder.FinalizeTx(ctx, bcr.chain, tx)
-                       if err != nil {
-                               return 0, err
-                       }
-
-                       // TODO(jackson): Do simple rejection checks like checking if
-                       // the tx's blockchain prevouts still exist in the state tree.
-               }
-       }
-}
index 1392ec9..0104e39 100755 (executable)
@@ -28,7 +28,7 @@ func FinalizeTx(ctx context.Context, c *protocol.Chain, tx *types.Tx) error {
                return err
        }
 
-       err = c.ValidateTx(tx)
+       _, err = c.ValidateTx(tx)
        if errors.Root(err) == protocol.ErrBadTx {
                return errors.Sub(ErrRejected, err)
        }
diff --git a/blockchain/txfeeds.go b/blockchain/txfeeds.go
deleted file mode 100644 (file)
index 4e9ce30..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package blockchain
-
-import (
-       "context"
-       "encoding/json"
-
-       "github.com/bytom/errors"
-)
-
-func (bcr *BlockchainReactor) GetTxFeedByAlias(ctx context.Context, filter string) ([]byte, error) {
-       jf, err := json.Marshal(filter)
-       if err != nil {
-               return nil, err
-       }
-
-       value := bcr.TxFeedTracker.DB.Get(jf)
-       if value == nil {
-               return nil, errors.New("No transaction feed")
-       }
-
-       return value, nil
-}
index d9b7481..8fbd226 100755 (executable)
@@ -67,7 +67,7 @@ func runNode(cmd *cobra.Command, args []string) error {
        if _, err := n.Start(); err != nil {
                return fmt.Errorf("Failed to start node: %v", err)
        } else {
-               log.WithField("nodeInfo", n.Switch().NodeInfo()).Info("Started node")
+               log.WithField("nodeInfo", n.SyncManager().Switch().NodeInfo()).Info("Started node")
        }
 
        // Trap signal, run forever.
index 6b88e91..3faffa8 100644 (file)
@@ -6,7 +6,6 @@ import (
        "os"
 
        "github.com/bytom/api"
-       "github.com/bytom/blockchain"
        "github.com/bytom/consensus/difficulty"
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
@@ -62,7 +61,7 @@ func main() {
                fmt.Println(err)
                os.Exit(1)
        }
-       resp := &blockchain.GetWorkResp{}
+       resp := &api.GetWorkResp{}
        if err = json.Unmarshal(rawData, resp); err != nil {
                fmt.Println(err)
                os.Exit(1)
index a1478c9..c360a35 100644 (file)
@@ -189,6 +189,7 @@ func DefaultP2PConfig() *P2PConfig {
                MaxNumPeers:      50,
                HandshakeTimeout: 30,
                DialTimeout:      3,
+               PexReactor:       true,
        }
 }
 
index a65dc40..b5d1c34 100644 (file)
@@ -53,13 +53,13 @@ func GenerateGenesisBlock() *types.Block {
                BlockHeader: types.BlockHeader{
                        Version:   1,
                        Height:    0,
-                       Nonce:     4216085,
+                       Nonce:     4216193,
                        Timestamp: 1516788453,
                        BlockCommitment: types.BlockCommitment{
                                TransactionsMerkleRoot: merkleRoot,
                                TransactionStatusHash:  txStatusHash,
                        },
-                       Bits: 2305843009222082559,
+                       Bits: 2305843009214532812,
                },
                Transactions: []*types.Tx{genesisCoinbaseTx},
        }
index 9b032f2..afde669 100644 (file)
@@ -19,7 +19,7 @@ const (
        InitialBlockSubsidy        = uint64(1470000000000000000)
 
        // config for pow mining
-       PowMinBits            = uint64(2161727821138738707)
+       PowMinBits            = uint64(2305843009213861724)
        BlocksPerRetarget     = uint64(1024)
        TargetSecondsPerBlock = uint64(60)
        SeedPerRetarget       = uint64(128)
@@ -34,7 +34,7 @@ const (
        CoinbaseArbitrarySizeLimit = 128
 
        VMGasRate        = int64(1000)
-       StorageGasRate   = int64(10)
+       StorageGasRate   = int64(0)
        MaxGasAmount     = int64(100000)
        DefaultGasCredit = int64(80000)
 
diff --git a/netsync/block_keeper.go b/netsync/block_keeper.go
new file mode 100644 (file)
index 0000000..93b2075
--- /dev/null
@@ -0,0 +1,155 @@
+package netsync
+
+import (
+       "strings"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/errors"
+       "github.com/bytom/p2p"
+       "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc/types"
+)
+
+const (
+       maxKnownTxs    = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
+       maxKnownBlocks = 1024  // Maximum block hashes to keep in the known list (prevent DOS)
+
+       syncTimeout        = 30 * time.Second
+       requestRetryTicker = 15 * time.Second
+
+       maxBlocksPending = 1024
+       maxtxsPending    = 32768
+       maxQuitReq       = 256
+)
+
+var (
+       errGetBlockTimeout = errors.New("Get block Timeout")
+       errPeerDropped     = errors.New("Peer dropped")
+       errCommAbnorm      = errors.New("Peer communication abnormality")
+       errScamPeer        = errors.New("Scam peer")
+       errReqBlock        = errors.New("Request block error")
+)
+
+//TODO: add retry mechanism
+type blockKeeper struct {
+       chain *protocol.Chain
+       sw    *p2p.Switch
+       peers *peerSet
+
+       pendingProcessCh chan *blockPending
+       txsProcessCh     chan *txsNotify
+       quitReqBlockCh   chan *string
+}
+
+func newBlockKeeper(chain *protocol.Chain, sw *p2p.Switch, peers *peerSet, quitReqBlockCh chan *string) *blockKeeper {
+       bk := &blockKeeper{
+               chain:            chain,
+               sw:               sw,
+               peers:            peers,
+               pendingProcessCh: make(chan *blockPending, maxBlocksPending),
+               txsProcessCh:     make(chan *txsNotify, maxtxsPending),
+               quitReqBlockCh:   quitReqBlockCh,
+       }
+       go bk.txsProcessWorker()
+       return bk
+}
+
+func (bk *blockKeeper) AddBlock(block *types.Block, peerID string) {
+       bk.pendingProcessCh <- &blockPending{block: block, peerID: peerID}
+}
+
+func (bk *blockKeeper) AddTx(tx *types.Tx, peerID string) {
+       bk.txsProcessCh <- &txsNotify{tx: tx, peerID: peerID}
+}
+
+func (bk *blockKeeper) IsCaughtUp() bool {
+       _, height := bk.peers.BestPeer()
+       return bk.chain.Height() < height
+}
+
+func (bk *blockKeeper) BlockRequestWorker(peerID string, maxPeerHeight uint64) error {
+       num := bk.chain.Height() + 1
+       orphanNum := uint64(0)
+       reqNum := uint64(0)
+       isOrphan := false
+       for num <= maxPeerHeight && num > 0 {
+               if isOrphan {
+                       reqNum = orphanNum
+               } else {
+                       reqNum = num
+               }
+               block, err := bk.BlockRequest(peerID, reqNum)
+               if errors.Root(err) == errPeerDropped || errors.Root(err) == errGetBlockTimeout || errors.Root(err) == errReqBlock {
+                       log.WithField("Peer abnormality. PeerID: ", peerID).Info(err)
+                       bk.peers.DropPeer(peerID)
+                       return errCommAbnorm
+               }
+               isOrphan, err = bk.chain.ProcessBlock(block)
+               if err != nil {
+                       bk.sw.AddScamPeer(bk.peers.Peer(peerID).getPeer())
+                       log.WithField("hash: ", block.Hash()).Errorf("blockKeeper fail process block %v", err)
+                       return errScamPeer
+               }
+               if isOrphan {
+                       orphanNum = block.Height - 1
+                       continue
+               }
+               num++
+       }
+       return nil
+}
+
+func (bk *blockKeeper) blockRequest(peerID string, height uint64) error {
+       return bk.peers.requestBlockByHeight(peerID, height)
+}
+
+func (bk *blockKeeper) BlockRequest(peerID string, height uint64) (*types.Block, error) {
+       var block *types.Block
+
+       if err := bk.blockRequest(peerID, height); err != nil {
+               return nil, errReqBlock
+       }
+       retryTicker := time.Tick(requestRetryTicker)
+       syncWait := time.NewTimer(syncTimeout)
+
+       for {
+               select {
+               case pendingResponse := <-bk.pendingProcessCh:
+                       block = pendingResponse.block
+                       if strings.Compare(pendingResponse.peerID, peerID) != 0 {
+                               log.Warning("From different peer")
+                               continue
+                       }
+                       if block.Height != height {
+                               log.Warning("Block height error")
+                               continue
+                       }
+                       return block, nil
+               case <-retryTicker:
+                       if err := bk.blockRequest(peerID, height); err != nil {
+                               return nil, errReqBlock
+                       }
+               case <-syncWait.C:
+                       log.Warning("Request block timeout")
+                       return nil, errGetBlockTimeout
+               case peerid := <-bk.quitReqBlockCh:
+                       if strings.Compare(*peerid, peerID) == 0 {
+                               log.Info("Quite block request worker")
+                               return nil, errPeerDropped
+                       }
+               }
+       }
+}
+
+func (bk *blockKeeper) txsProcessWorker() {
+       for txsResponse := range bk.txsProcessCh {
+               tx := txsResponse.tx
+               log.Info("Receive new tx from remote peer. TxID:", tx.ID.String())
+               bk.peers.MarkTransaction(txsResponse.peerID, &tx.ID)
+               if isOrphan, err := bk.chain.ValidateTx(tx); err != nil && isOrphan == false {
+                       bk.sw.AddScamPeer(bk.peers.Peer(txsResponse.peerID).getPeer())
+               }
+       }
+}
diff --git a/netsync/fetcher.go b/netsync/fetcher.go
new file mode 100644 (file)
index 0000000..754e2ce
--- /dev/null
@@ -0,0 +1,170 @@
+package netsync
+
+import (
+       "errors"
+
+       log "github.com/sirupsen/logrus"
+       "gopkg.in/karalabe/cookiejar.v2/collections/prque"
+
+       "github.com/bytom/p2p"
+       core "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
+       "strings"
+)
+
+const (
+       maxQueueDist = 1024 //32 // Maximum allowed distance from the chain head to queue
+)
+
+var (
+       errTerminated = errors.New("terminated")
+)
+
+// Fetcher is responsible for accumulating block announcements from various peers
+// and scheduling them for retrieval.
+type Fetcher struct {
+       chain *core.Chain
+       sw    *p2p.Switch
+       peers *peerSet
+
+       // Various event channels
+       newMinedBlock chan *blockPending
+       quit          chan struct{}
+
+       // Block cache
+       queue  *prque.Prque              // Queue containing the import operations (block number sorted)
+       queues map[string]int            // Per peer block counts to prevent memory exhaustion
+       queued map[bc.Hash]*blockPending // Set of already queued blocks (to dedup imports)
+}
+
+//NewFetcher New creates a block fetcher to retrieve blocks of the new mined.
+func NewFetcher(chain *core.Chain, sw *p2p.Switch, peers *peerSet) *Fetcher {
+       return &Fetcher{
+               chain:         chain,
+               sw:            sw,
+               peers:         peers,
+               newMinedBlock: make(chan *blockPending),
+               quit:          make(chan struct{}),
+               queue:         prque.New(),
+               queues:        make(map[string]int),
+               queued:        make(map[bc.Hash]*blockPending),
+       }
+}
+
+// Start boots up the announcement based synchroniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *Fetcher) Start() {
+       go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *Fetcher) Stop() {
+       close(f.quit)
+}
+
+// Enqueue tries to fill gaps the the fetcher's future import queue.
+func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
+       op := &blockPending{
+               peerID: peer,
+               block:  block,
+       }
+       select {
+       case f.newMinedBlock <- op:
+               return nil
+       case <-f.quit:
+               return errTerminated
+       }
+}
+
+// Loop is the main fetcher loop, checking and processing various notification
+// events.
+func (f *Fetcher) loop() {
+       for {
+               // Import any queued blocks that could potentially fit
+               height := f.chain.Height()
+               for !f.queue.Empty() {
+                       op := f.queue.PopItem().(*blockPending)
+                       // If too high up the chain or phase, continue later
+                       number := op.block.Height
+                       if number > height+1 {
+                               f.queue.Push(op, -float32(op.block.Height))
+                               break
+                       }
+                       // Otherwise if fresh and still unknown, try and import
+                       hash := op.block.Hash()
+                       block, _ := f.chain.GetBlockByHash(&hash)
+                       if block != nil {
+                               f.forgetBlock(hash)
+                               continue
+                       }
+                       if strings.Compare(op.block.PreviousBlockHash.String(), f.chain.BestBlockHash().String()) != 0 {
+                               f.forgetBlock(hash)
+                               continue
+                       }
+                       f.insert(op.peerID, op.block)
+               }
+               // Wait for an outside event to occur
+               select {
+               case <-f.quit:
+                       // Fetcher terminating, abort all operations
+                       return
+
+               case op := <-f.newMinedBlock:
+                       // A direct block insertion was requested, try and fill any pending gaps
+                       f.enqueue(op.peerID, op.block)
+               }
+       }
+}
+
+// enqueue schedules a new future import operation, if the block to be imported
+// has not yet been seen.
+func (f *Fetcher) enqueue(peer string, block *types.Block) {
+       hash := block.Hash()
+
+       //TODO: Ensure the peer isn't DOSing us
+       // Discard any past or too distant blocks
+       if dist := int64(block.Height) - int64(f.chain.Height()); dist < 0 || dist > maxQueueDist {
+               log.Info("Discarded propagated block, too far away", " peer: ", peer, "number: ", block.Height, "distance: ", dist)
+               return
+       }
+       // Schedule the block for future importing
+       if _, ok := f.queued[hash]; !ok {
+               op := &blockPending{
+                       peerID: peer,
+                       block:  block,
+               }
+               f.queued[hash] = op
+               f.queue.Push(op, -float32(block.Height))
+               log.Info("Queued propagated block.", " peer: ", peer, "number: ", block.Height, "queued: ", f.queue.Size())
+       }
+}
+
+// insert spawns a new goroutine to run a block insertion into the chain. If the
+// block's number is at the same height as the current import phase, it updates
+// the phase states accordingly.
+func (f *Fetcher) insert(peer string, block *types.Block) {
+       // Run the import on a new thread
+       log.Info("Importing propagated block", " from peer: ", peer, " height: ", block.Height)
+       // Run the actual import and log any issues
+       if _, err := f.chain.ProcessBlock(block); err != nil {
+               log.Info("Propagated block import failed", " from peer: ", peer, " height: ", block.Height, "err: ", err)
+               return
+       }
+       // If import succeeded, broadcast the block
+       log.Info("success process a block from new mined blocks cache. block height: ", block.Height)
+       go f.peers.BroadcastMinedBlock(block)
+}
+
+// forgetBlock removes all traces of a queued block from the fetcher's internal
+// state.
+func (f *Fetcher) forgetBlock(hash bc.Hash) {
+       if insert := f.queued[hash]; insert != nil {
+               f.queues[insert.peerID]--
+               if f.queues[insert.peerID] == 0 {
+                       delete(f.queues, insert.peerID)
+               }
+               delete(f.queued, hash)
+       }
+}
diff --git a/netsync/handle.go b/netsync/handle.go
new file mode 100644 (file)
index 0000000..e2b05af
--- /dev/null
@@ -0,0 +1,220 @@
+package netsync
+
+import (
+       "strings"
+
+       log "github.com/sirupsen/logrus"
+       "github.com/tendermint/go-crypto"
+       "github.com/tendermint/go-wire"
+       cmn "github.com/tendermint/tmlibs/common"
+       dbm "github.com/tendermint/tmlibs/db"
+
+       cfg "github.com/bytom/config"
+       "github.com/bytom/p2p"
+       core "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/version"
+)
+
+//SyncManager Sync Manager is responsible for the business layer information synchronization
+type SyncManager struct {
+       networkID uint64
+       sw        *p2p.Switch
+       addrBook  *p2p.AddrBook // known peers
+
+       privKey     crypto.PrivKeyEd25519 // local node's p2p key
+       chain       *core.Chain
+       txPool      *core.TxPool
+       fetcher     *Fetcher
+       blockKeeper *blockKeeper
+       peers       *peerSet
+
+       newBlockCh    chan *bc.Hash
+       newPeerCh     chan struct{}
+       txSyncCh      chan *txsync
+       dropPeerCh    chan *string
+       quitSync      chan struct{}
+       config        *cfg.Config
+       synchronising int32
+}
+
+//NewSyncManager create a sync manager
+func NewSyncManager(config *cfg.Config, chain *core.Chain, txPool *core.TxPool, newBlockCh chan *bc.Hash) (*SyncManager, error) {
+       // Create the protocol manager with the base fields
+       manager := &SyncManager{
+               txPool:     txPool,
+               chain:      chain,
+               privKey:    crypto.GenPrivKeyEd25519(),
+               config:     config,
+               quitSync:   make(chan struct{}),
+               newBlockCh: newBlockCh,
+               newPeerCh:  make(chan struct{}),
+               txSyncCh:    make(chan *txsync),
+               dropPeerCh: make(chan *string, maxQuitReq),
+               peers:      newPeerSet(),
+       }
+
+       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
+       manager.sw = p2p.NewSwitch(config.P2P, trustHistoryDB)
+
+       manager.blockKeeper = newBlockKeeper(manager.chain, manager.sw, manager.peers, manager.dropPeerCh)
+       manager.fetcher = NewFetcher(chain, manager.sw, manager.peers)
+
+       protocolReactor := NewProtocolReactor(chain, txPool, manager.sw, manager.blockKeeper, manager.fetcher, manager.peers, manager.newPeerCh, manager.txSyncCh, manager.dropPeerCh)
+       manager.sw.AddReactor("PROTOCOL", protocolReactor)
+
+       // Optionally, start the pex reactor
+       //var addrBook *p2p.AddrBook
+       if config.P2P.PexReactor {
+               manager.addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
+               pexReactor := p2p.NewPEXReactor(manager.addrBook)
+               manager.sw.AddReactor("PEX", pexReactor)
+       }
+
+       return manager, nil
+}
+
+// Defaults to tcp
+func protocolAndAddress(listenAddr string) (string, string) {
+       p, address := "tcp", listenAddr
+       parts := strings.SplitN(address, "://", 2)
+       if len(parts) == 2 {
+               p, address = parts[0], parts[1]
+       }
+       return p, address
+}
+
+func (sm *SyncManager) makeNodeInfo() *p2p.NodeInfo {
+       nodeInfo := &p2p.NodeInfo{
+               PubKey:  sm.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
+               Moniker: sm.config.Moniker,
+               Network: "bytom",
+               Version: version.Version,
+               Other: []string{
+                       cmn.Fmt("wire_version=%v", wire.Version),
+                       cmn.Fmt("p2p_version=%v", p2p.Version),
+               },
+       }
+
+       if !sm.sw.IsListening() {
+               return nodeInfo
+       }
+
+       p2pListener := sm.sw.Listeners()[0]
+       p2pHost := p2pListener.ExternalAddress().IP.String()
+       p2pPort := p2pListener.ExternalAddress().Port
+
+       // We assume that the rpcListener has the same ExternalAddress.
+       // This is probably true because both P2P and RPC listeners use UPnP,
+       // except of course if the rpc is only bound to localhost
+       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
+       return nodeInfo
+}
+
+func (sm *SyncManager) netStart() error {
+       // Create & add listener
+       p, address := protocolAndAddress(sm.config.P2P.ListenAddress)
+
+       l := p2p.NewDefaultListener(p, address, sm.config.P2P.SkipUPNP, nil)
+
+       sm.sw.AddListener(l)
+
+       // Start the switch
+       sm.sw.SetNodeInfo(sm.makeNodeInfo())
+       sm.sw.SetNodePrivKey(sm.privKey)
+       _, err := sm.sw.Start()
+       if err != nil {
+               return err
+       }
+
+       // If seeds exist, add them to the address book and dial out
+       if sm.config.P2P.Seeds != "" {
+               // dial out
+               seeds := strings.Split(sm.config.P2P.Seeds, ",")
+               if err := sm.DialSeeds(seeds); err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+//Start start sync manager service
+func (sm *SyncManager) Start() {
+       sm.netStart()
+       // broadcast transactions
+       go sm.txBroadcastLoop()
+
+       // broadcast mined blocks
+       go sm.minedBroadcastLoop()
+
+       // start sync handlers
+       go sm.syncer()
+
+       go sm.txsyncLoop()
+}
+
+//Stop stop sync manager
+func (sm *SyncManager) Stop() {
+       close(sm.quitSync)
+       sm.sw.Stop()
+}
+
+func (sm *SyncManager) txBroadcastLoop() {
+       newTxCh := sm.txPool.GetNewTxCh()
+       for {
+               select {
+               case newTx := <-newTxCh:
+                       sm.peers.BroadcastTx(newTx)
+
+               case <-sm.quitSync:
+                       return
+               }
+       }
+}
+
+func (sm *SyncManager) minedBroadcastLoop() {
+       for {
+               select {
+               case blockHash := <-sm.newBlockCh:
+                       block, err := sm.chain.GetBlockByHash(blockHash)
+                       if err != nil {
+                               log.Errorf("Failed on mined broadcast loop get block %v", err)
+                               return
+                       }
+                       sm.peers.BroadcastMinedBlock(block)
+               case <-sm.quitSync:
+                       return
+               }
+       }
+}
+
+//NodeInfo get P2P peer node info
+func (sm *SyncManager) NodeInfo() *p2p.NodeInfo {
+       return sm.sw.NodeInfo()
+}
+
+//BlockKeeper get block keeper
+func (sm *SyncManager) BlockKeeper() *blockKeeper {
+       return sm.blockKeeper
+}
+
+//Peers get sync manager peer set
+func (sm *SyncManager) Peers() *peerSet {
+       return sm.peers
+}
+
+//DialSeeds dial seed peers
+func (sm *SyncManager) DialSeeds(seeds []string) error {
+       return sm.sw.DialSeeds(sm.addrBook, seeds)
+}
+
+//Switch get sync manager switch
+func (sm *SyncManager) Switch() *p2p.Switch {
+       return sm.sw
+}
+
+func (sm *SyncManager) removePeer(peerID string) {
+       sm.peers.DropPeer(peerID)
+       log.Debug("Removing peer", "peerID:", peerID)
+}
similarity index 61%
rename from blockchain/net.go
rename to netsync/message.go
index 7b9018b..4a17f81 100644 (file)
@@ -1,27 +1,26 @@
-package blockchain
+package netsync
 
 import (
        "bytes"
        "errors"
        "fmt"
 
-       wire "github.com/tendermint/go-wire"
+       "github.com/tendermint/go-wire"
 
        "github.com/bytom/protocol/bc"
        "github.com/bytom/protocol/bc/types"
 )
 
+//protocol msg
 const (
-       // BlockRequestByte means block request message
-       BlockRequestByte = byte(0x10)
-       // BlockResponseByte means block response message
-       BlockResponseByte = byte(0x11)
-       // StatusRequestByte means status request message
-       StatusRequestByte = byte(0x20)
-       // StatusResponseByte means status response message
+       BlockRequestByte   = byte(0x10)
+       BlockResponseByte  = byte(0x11)
+       StatusRequestByte  = byte(0x20)
        StatusResponseByte = byte(0x21)
-       // NewTransactionByte means transaction notify message
        NewTransactionByte = byte(0x30)
+       NewMineBlockByte   = byte(0x40)
+
+       maxBlockchainResponseSize = 22020096 + 2
 )
 
 // BlockchainMessage is a generic message for this reactor.
@@ -34,9 +33,20 @@ var _ = wire.RegisterInterface(
        wire.ConcreteType{&StatusRequestMessage{}, StatusRequestByte},
        wire.ConcreteType{&StatusResponseMessage{}, StatusResponseByte},
        wire.ConcreteType{&TransactionNotifyMessage{}, NewTransactionByte},
+       wire.ConcreteType{&MineBlockMessage{}, NewMineBlockByte},
 )
 
-// DecodeMessage decode receive messages
+type blockPending struct {
+       block  *types.Block
+       peerID string
+}
+
+type txsNotify struct {
+       tx     *types.Tx
+       peerID string
+}
+
+//DecodeMessage decode msg
 func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
        msgType = bz[0]
        n := int(0)
@@ -48,18 +58,19 @@ func DecodeMessage(bz []byte) (msgType byte, msg BlockchainMessage, err error) {
        return
 }
 
-// BlockRequestMessage is block request message struct
+//BlockRequestMessage request blocks from remote peers by height/hash
 type BlockRequestMessage struct {
        Height  uint64
        RawHash [32]byte
 }
 
-// GetHash return block hash
+//GetHash get hash
 func (m *BlockRequestMessage) GetHash() *bc.Hash {
        hash := bc.NewHash(m.RawHash)
        return &hash
 }
 
+//String convert msg to string
 func (m *BlockRequestMessage) String() string {
        if m.Height > 0 {
                return fmt.Sprintf("BlockRequestMessage{Height: %d}", m.Height)
@@ -68,12 +79,12 @@ func (m *BlockRequestMessage) String() string {
        return fmt.Sprintf("BlockRequestMessage{Hash: %s}", hash.String())
 }
 
-// BlockResponseMessage is block response message struct
+//BlockResponseMessage response get block msg
 type BlockResponseMessage struct {
        RawBlock []byte
 }
 
-// NewBlockResponseMessage produce new BlockResponseMessage instance
+//NewBlockResponseMessage construct bock response msg
 func NewBlockResponseMessage(block *types.Block) (*BlockResponseMessage, error) {
        rawBlock, err := block.MarshalText()
        if err != nil {
@@ -82,7 +93,7 @@ func NewBlockResponseMessage(block *types.Block) (*BlockResponseMessage, error)
        return &BlockResponseMessage{RawBlock: rawBlock}, nil
 }
 
-// GetBlock return block struct
+//GetBlock get block from msg
 func (m *BlockResponseMessage) GetBlock() *types.Block {
        block := &types.Block{
                BlockHeader:  types.BlockHeader{},
@@ -92,16 +103,17 @@ func (m *BlockResponseMessage) GetBlock() *types.Block {
        return block
 }
 
+//String convert msg to string
 func (m *BlockResponseMessage) String() string {
        return fmt.Sprintf("BlockResponseMessage{Size: %d}", len(m.RawBlock))
 }
 
-// TransactionNotifyMessage is transaction notify message struct
+//TransactionNotifyMessage notify new tx msg
 type TransactionNotifyMessage struct {
        RawTx []byte
 }
 
-// NewTransactionNotifyMessage produce new TransactionNotifyMessage instance
+//NewTransactionNotifyMessage construct notify new tx msg
 func NewTransactionNotifyMessage(tx *types.Tx) (*TransactionNotifyMessage, error) {
        rawTx, err := tx.TxData.MarshalText()
        if err != nil {
@@ -110,31 +122,35 @@ func NewTransactionNotifyMessage(tx *types.Tx) (*TransactionNotifyMessage, error
        return &TransactionNotifyMessage{RawTx: rawTx}, nil
 }
 
-// GetTransaction return Tx struct
-func (m *TransactionNotifyMessage) GetTransaction() *types.Tx {
+//GetTransaction get tx from msg
+func (m *TransactionNotifyMessage) GetTransaction() (*types.Tx, error) {
        tx := &types.Tx{}
-       tx.UnmarshalText(m.RawTx)
-       return tx
+       if err := tx.UnmarshalText(m.RawTx); err != nil {
+               return nil, err
+       }
+       return tx, nil
 }
 
+//String
 func (m *TransactionNotifyMessage) String() string {
        return fmt.Sprintf("TransactionNotifyMessage{Size: %d}", len(m.RawTx))
 }
 
-// StatusRequestMessage is status request message struct
+//StatusRequestMessage status request msg
 type StatusRequestMessage struct{}
 
+//String
 func (m *StatusRequestMessage) String() string {
        return "StatusRequestMessage"
 }
 
-// StatusResponseMessage is status response message struct
+//StatusResponseMessage get status response msg
 type StatusResponseMessage struct {
        Height  uint64
        RawHash [32]byte
 }
 
-// NewStatusResponseMessage produce new StatusResponseMessage instance
+//NewStatusResponseMessage construct get status response msg
 func NewStatusResponseMessage(block *types.Block) *StatusResponseMessage {
        return &StatusResponseMessage{
                Height:  block.Height,
@@ -142,13 +158,42 @@ func NewStatusResponseMessage(block *types.Block) *StatusResponseMessage {
        }
 }
 
-// GetHash return hash pointer
+//GetHash get hash from msg
 func (m *StatusResponseMessage) GetHash() *bc.Hash {
        hash := bc.NewHash(m.RawHash)
        return &hash
 }
 
+//String convert msg to string
 func (m *StatusResponseMessage) String() string {
        hash := m.GetHash()
        return fmt.Sprintf("StatusResponseMessage{Height: %d, Hash: %s}", m.Height, hash.String())
 }
+
+//MineBlockMessage new mined block msg
+type MineBlockMessage struct {
+       RawBlock []byte
+}
+
+//NewMinedBlockMessage construct new mined block msg
+func NewMinedBlockMessage(block *types.Block) (*MineBlockMessage, error) {
+       rawBlock, err := block.MarshalText()
+       if err != nil {
+               return nil, err
+       }
+       return &MineBlockMessage{RawBlock: rawBlock}, nil
+}
+
+//GetMineBlock get mine block from msg
+func (m *MineBlockMessage) GetMineBlock() (*types.Block, error) {
+       block := &types.Block{}
+       if err:=block.UnmarshalText(m.RawBlock);err!=nil{
+               return nil, err
+       }
+       return block, nil
+}
+
+//String convert msg to string
+func (m *MineBlockMessage) String() string {
+       return fmt.Sprintf("NewMineBlockMessage{Size: %d}", len(m.RawBlock))
+}
diff --git a/netsync/peer.go b/netsync/peer.go
new file mode 100644 (file)
index 0000000..9ca4d1f
--- /dev/null
@@ -0,0 +1,347 @@
+package netsync
+
+import (
+       "sync"
+
+       log "github.com/sirupsen/logrus"
+       "gopkg.in/fatih/set.v0"
+
+       "github.com/bytom/errors"
+       "github.com/bytom/p2p"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
+)
+
+var (
+       errClosed            = errors.New("peer set is closed")
+       errAlreadyRegistered = errors.New("peer is already registered")
+       errNotRegistered     = errors.New("peer is not registered")
+)
+
+const defaultVersion = 1
+
+type peer struct {
+       mtx     sync.RWMutex
+       version int // Protocol version negotiated
+       id      string
+       height  uint64
+       hash    *bc.Hash
+       *p2p.Peer
+
+       knownTxs    *set.Set // Set of transaction hashes known to be known by this peer
+       knownBlocks *set.Set // Set of block hashes known to be known by this peer
+}
+
+func newPeer(height uint64, hash *bc.Hash, Peer *p2p.Peer) *peer {
+       return &peer{
+               version:     defaultVersion,
+               height:      height,
+               hash:        hash,
+               Peer:        Peer,
+               knownTxs:    set.New(),
+               knownBlocks: set.New(),
+       }
+}
+
+func (p *peer) GetStatus() (height uint64, hash *bc.Hash) {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+       return p.height, p.hash
+}
+
+func (p *peer) SetStatus(height uint64, hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       p.height = height
+       p.hash = hash
+}
+
+func (p *peer) requestBlockByHash(hash *bc.Hash) error {
+       msg := &BlockRequestMessage{RawHash: hash.Byte32()}
+       p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return nil
+}
+
+func (p *peer) requestBlockByHeight(height uint64) error {
+       msg := &BlockRequestMessage{Height: height}
+       p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       return nil
+}
+
+func (p *peer) SendTransactions(txs []*types.Tx) error {
+       for _, tx := range txs {
+               msg, err := NewTransactionNotifyMessage(tx)
+               if err != nil {
+                       return errors.New("Failed construction tx msg")
+               }
+               hash := &tx.ID
+               p.knownTxs.Add(hash.String())
+               p.Peer.TrySend(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       }
+       return nil
+}
+
+func (p *peer) getPeer() *p2p.Peer {
+       p.mtx.RLock()
+       defer p.mtx.RUnlock()
+
+       return p.Peer
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (p *peer) MarkTransaction(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       // If we reached the memory allowance, drop a previously known transaction hash
+       for p.knownTxs.Size() >= maxKnownTxs {
+               p.knownTxs.Pop()
+       }
+       p.knownTxs.Add(hash.String())
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (p *peer) MarkBlock(hash *bc.Hash) {
+       p.mtx.Lock()
+       defer p.mtx.Unlock()
+
+       // If we reached the memory allowance, drop a previously known block hash
+       for p.knownBlocks.Size() >= maxKnownBlocks {
+               p.knownBlocks.Pop()
+       }
+       p.knownBlocks.Add(hash.String())
+}
+
+type peerSet struct {
+       peers  map[string]*peer
+       lock   sync.RWMutex
+       closed bool
+}
+
+// newPeerSet creates a new peer set to track the active participants.
+func newPeerSet() *peerSet {
+       return &peerSet{
+               peers: make(map[string]*peer),
+       }
+}
+
+// Register injects a new peer into the working set, or returns an error if the
+// peer is already known.
+func (ps *peerSet) Register(p *peer) error {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       if ps.closed {
+               return errClosed
+       }
+       if _, ok := ps.peers[p.id]; ok {
+               return errAlreadyRegistered
+       }
+       ps.peers[p.id] = p
+       return nil
+}
+
+// Unregister removes a remote peer from the active set, disabling any further
+// actions to/from that particular entity.
+func (ps *peerSet) Unregister(id string) error {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       if _, ok := ps.peers[id]; !ok {
+               return errNotRegistered
+       }
+       delete(ps.peers, id)
+       return nil
+}
+
+func (ps *peerSet) DropPeer(id string) error {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       peer, ok := ps.peers[id]
+       if !ok {
+               return errNotRegistered
+       }
+       delete(ps.peers, id)
+       peer.CloseConn()
+       return nil
+}
+
+// Peer retrieves the registered peer with the given id.
+func (ps *peerSet) Peer(id string) *peer {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       return ps.peers[id]
+}
+
+// Len returns if the current number of peers in the set.
+func (ps *peerSet) Len() int {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       return len(ps.peers)
+}
+
+// MarkTransaction marks a transaction as known for the peer, ensuring that it
+// will never be propagated to this particular peer.
+func (ps *peerSet) MarkTransaction(peerID string, hash *bc.Hash) {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       if peer, ok := ps.peers[peerID]; ok {
+               peer.MarkTransaction(hash)
+       }
+}
+
+// MarkBlock marks a block as known for the peer, ensuring that the block will
+// never be propagated to this particular peer.
+func (ps *peerSet) MarkBlock(peerID string, hash *bc.Hash) {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       if peer, ok := ps.peers[peerID]; ok {
+               peer.MarkBlock(hash)
+       }
+}
+
+// PeersWithoutBlock retrieves a list of peers that do not have a given block in
+// their set of known hashes.
+func (ps *peerSet) PeersWithoutBlock(hash *bc.Hash) []*peer {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       list := make([]*peer, 0, len(ps.peers))
+       for _, p := range ps.peers {
+               if !p.knownBlocks.Has(hash.String()) {
+                       list = append(list, p)
+               }
+       }
+       return list
+}
+
+// PeersWithoutTx retrieves a list of peers that do not have a given transaction
+// in their set of known hashes.
+func (ps *peerSet) PeersWithoutTx(hash *bc.Hash) []*peer {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       list := make([]*peer, 0, len(ps.peers))
+       for _, p := range ps.peers {
+               if !p.knownTxs.Has(hash.String()) {
+                       list = append(list, p)
+               }
+       }
+       return list
+}
+
+// BestPeer retrieves the known peer with the currently highest total difficulty.
+func (ps *peerSet) BestPeer() (*p2p.Peer, uint64) {
+       ps.lock.RLock()
+       defer ps.lock.RUnlock()
+
+       var bestPeer *p2p.Peer
+       var bestHeight uint64
+
+       for _, p := range ps.peers {
+               if bestPeer == nil || p.height > bestHeight {
+                       bestPeer, bestHeight = p.Peer, p.height
+               }
+       }
+
+       return bestPeer, bestHeight
+}
+
+// Close disconnects all peers.
+// No new peers can be registered after Close has returned.
+func (ps *peerSet) Close() {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       for _, p := range ps.peers {
+               p.CloseConn()
+       }
+       ps.closed = true
+}
+
+func (ps *peerSet) AddPeer(peer *p2p.Peer) {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       if _, ok := ps.peers[peer.Key]; !ok {
+               keeperPeer := newPeer(0, nil, peer)
+               ps.peers[peer.Key] = keeperPeer
+               log.WithFields(log.Fields{"ID": peer.Key}).Info("Add new peer to blockKeeper")
+               return
+       }
+       log.WithField("ID", peer.Key).Warning("Add existing peer to blockKeeper")
+}
+
+func (ps *peerSet) RemovePeer(peerID string) {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       delete(ps.peers, peerID)
+       log.WithField("ID", peerID).Info("Delete peer from peerset")
+}
+
+func (ps *peerSet) SetPeerStatus(peerID string, height uint64, hash *bc.Hash) {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       if peer, ok := ps.peers[peerID]; ok {
+               peer.SetStatus(height, hash)
+       }
+}
+
+func (ps *peerSet) requestBlockByHash(peerID string, hash *bc.Hash) error {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       peer, ok := ps.peers[peerID]
+       if !ok {
+               return errors.New("Can't find peer. ")
+       }
+       return peer.requestBlockByHash(hash)
+}
+
+func (ps *peerSet) requestBlockByHeight(peerID string, height uint64) error {
+       ps.lock.Lock()
+       defer ps.lock.Unlock()
+
+       peer, ok := ps.peers[peerID]
+       if !ok {
+               return errors.New("Can't find peer. ")
+       }
+       return peer.requestBlockByHeight(height)
+}
+
+func (ps *peerSet) BroadcastMinedBlock(block *types.Block) error {
+       msg, err := NewMinedBlockMessage(block)
+       if err != nil {
+               return errors.New("Failed construction block msg")
+       }
+       hash := block.Hash()
+       peers := ps.PeersWithoutBlock(&hash)
+       for _, peer := range peers {
+               ps.MarkBlock(peer.Key, &hash)
+               peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       }
+       return nil
+}
+
+func (ps *peerSet) BroadcastTx(tx *types.Tx) error {
+       msg, err := NewTransactionNotifyMessage(tx)
+       if err != nil {
+               return errors.New("Failed construction tx msg")
+       }
+       peers := ps.PeersWithoutTx(&tx.ID)
+       for _, peer := range peers {
+               ps.peers[peer.Key].MarkTransaction(&tx.ID)
+               peer.Send(BlockchainChannel, struct{ BlockchainMessage }{msg})
+       }
+       return nil
+}
diff --git a/netsync/protocol_reactor.go b/netsync/protocol_reactor.go
new file mode 100644 (file)
index 0000000..1728859
--- /dev/null
@@ -0,0 +1,214 @@
+package netsync
+
+import (
+       "reflect"
+       "strings"
+       "time"
+
+       log "github.com/sirupsen/logrus"
+       cmn "github.com/tendermint/tmlibs/common"
+
+       "github.com/bytom/errors"
+       "github.com/bytom/p2p"
+       "github.com/bytom/p2p/trust"
+       "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
+       "github.com/bytom/protocol/bc/types"
+)
+
+const (
+       // BlockchainChannel is a channel for blocks and status updates
+       BlockchainChannel        = byte(0x40)
+       protocolHandshakeTimeout = time.Second * 10
+)
+
+var (
+       //ErrProtocolHandshakeTimeout peers handshake timeout
+       ErrProtocolHandshakeTimeout = errors.New("Protocol handshake timeout")
+)
+
+// Response describes the response standard.
+type Response struct {
+       Status string      `json:"status,omitempty"`
+       Msg    string      `json:"msg,omitempty"`
+       Data   interface{} `json:"data,omitempty"`
+}
+
+type initalPeerStatus struct {
+       peerID string
+       height uint64
+       hash   *bc.Hash
+}
+
+//ProtocolReactor handles new coming protocol message.
+type ProtocolReactor struct {
+       p2p.BaseReactor
+
+       chain       *protocol.Chain
+       blockKeeper *blockKeeper
+       txPool      *protocol.TxPool
+       sw          *p2p.Switch
+       fetcher     *Fetcher
+       peers       *peerSet
+
+       newPeerCh      chan struct{}
+       quitReqBlockCh chan *string
+       txSyncCh       chan *txsync
+       peerStatusCh   chan *initalPeerStatus
+}
+
+// NewProtocolReactor returns the reactor of whole blockchain.
+func NewProtocolReactor(chain *protocol.Chain, txPool *protocol.TxPool, sw *p2p.Switch, blockPeer *blockKeeper, fetcher *Fetcher, peers *peerSet, newPeerCh chan struct{}, txSyncCh chan *txsync, quitReqBlockCh chan *string) *ProtocolReactor {
+       pr := &ProtocolReactor{
+               chain:          chain,
+               blockKeeper:    blockPeer,
+               txPool:         txPool,
+               sw:             sw,
+               fetcher:        fetcher,
+               peers:          peers,
+               newPeerCh:      newPeerCh,
+               txSyncCh:       txSyncCh,
+               quitReqBlockCh: quitReqBlockCh,
+               peerStatusCh:   make(chan *initalPeerStatus),
+       }
+       pr.BaseReactor = *p2p.NewBaseReactor("ProtocolReactor", pr)
+       return pr
+}
+
+// GetChannels implements Reactor
+func (pr *ProtocolReactor) GetChannels() []*p2p.ChannelDescriptor {
+       return []*p2p.ChannelDescriptor{
+               &p2p.ChannelDescriptor{
+                       ID:                BlockchainChannel,
+                       Priority:          5,
+                       SendQueueCapacity: 100,
+               },
+       }
+}
+
+// OnStart implements BaseService
+func (pr *ProtocolReactor) OnStart() error {
+       pr.BaseReactor.OnStart()
+       return nil
+}
+
+// OnStop implements BaseService
+func (pr *ProtocolReactor) OnStop() {
+       pr.BaseReactor.OnStop()
+}
+
+// syncTransactions starts sending all currently pending transactions to the given peer.
+func (pr *ProtocolReactor) syncTransactions(p *peer) {
+       pending := pr.txPool.GetTransactions()
+       if len(pending) == 0 {
+               return
+       }
+       txs := make([]*types.Tx, len(pending))
+       for i, batch := range pending {
+               txs[i] = batch.Tx
+       }
+       pr.txSyncCh <- &txsync{p, txs}
+}
+
+// AddPeer implements Reactor by sending our state to peer.
+func (pr *ProtocolReactor) AddPeer(peer *p2p.Peer) error {
+       peer.Send(BlockchainChannel, struct{ BlockchainMessage }{&StatusRequestMessage{}})
+       handshakeWait := time.NewTimer(protocolHandshakeTimeout)
+       for {
+               select {
+               case status := <-pr.peerStatusCh:
+                       if strings.Compare(status.peerID, peer.Key) == 0 {
+                               pr.peers.AddPeer(peer)
+                               pr.peers.SetPeerStatus(status.peerID, status.height, status.hash)
+                               pr.syncTransactions(pr.peers.Peer(peer.Key))
+                               pr.newPeerCh <- struct{}{}
+                               return nil
+                       }
+               case <-handshakeWait.C:
+                       return ErrProtocolHandshakeTimeout
+               }
+       }
+}
+
+// RemovePeer implements Reactor by removing peer from the pool.
+func (pr *ProtocolReactor) RemovePeer(peer *p2p.Peer, reason interface{}) {
+       pr.quitReqBlockCh <- &peer.Key
+       pr.peers.RemovePeer(peer.Key)
+}
+
+// Receive implements Reactor by handling 4 types of messages (look below).
+func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) {
+       var tm *trust.TrustMetric
+       key := src.Connection().RemoteAddress.IP.String()
+       if tm = pr.sw.TrustMetricStore.GetPeerTrustMetric(key); tm == nil {
+               log.Errorf("Can't get peer trust metric")
+               return
+       }
+
+       _, msg, err := DecodeMessage(msgBytes)
+       if err != nil {
+               log.Errorf("Error decoding messagek %v", err)
+               return
+       }
+       log.WithFields(log.Fields{"peerID": src.Key, "msg": msg}).Info("Receive request")
+
+       switch msg := msg.(type) {
+       case *BlockRequestMessage:
+               var block *types.Block
+               var err error
+               if msg.Height != 0 {
+                       block, err = pr.chain.GetBlockByHeight(msg.Height)
+               } else {
+                       block, err = pr.chain.GetBlockByHash(msg.GetHash())
+               }
+               if err != nil {
+                       log.Errorf("Fail on BlockRequestMessage get block: %v", err)
+                       return
+               }
+               response, err := NewBlockResponseMessage(block)
+               if err != nil {
+                       log.Errorf("Fail on BlockRequestMessage create resoinse: %v", err)
+                       return
+               }
+               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response})
+
+       case *BlockResponseMessage:
+               log.Info("BlockResponseMessage height:", msg.GetBlock().Height)
+               pr.blockKeeper.AddBlock(msg.GetBlock(), src.Key)
+
+       case *StatusRequestMessage:
+               block := pr.chain.BestBlock()
+               src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{NewStatusResponseMessage(block)})
+
+       case *StatusResponseMessage:
+               peerStatus := &initalPeerStatus{
+                       peerID: src.Key,
+                       height: msg.Height,
+                       hash:   msg.GetHash(),
+               }
+               pr.peerStatusCh <- peerStatus
+
+       case *TransactionNotifyMessage:
+               tx, err := msg.GetTransaction()
+               if err != nil {
+                       log.Errorf("Error decoding new tx %v", err)
+                       return
+               }
+               pr.blockKeeper.AddTx(tx, src.Key)
+
+       case *MineBlockMessage:
+               block, err := msg.GetMineBlock()
+               if err != nil {
+                       log.Errorf("Error decoding mined block %v", err)
+                       return
+               }
+               // Mark the peer as owning the block and schedule it for import
+               hash := block.Hash()
+               pr.peers.MarkBlock(src.Key, &hash)
+               pr.fetcher.Enqueue(src.Key, block)
+               pr.peers.SetPeerStatus(src.Key, block.Height, &hash)
+
+       default:
+               log.Error(cmn.Fmt("Unknown message type %v", reflect.TypeOf(msg)))
+       }
+}
diff --git a/netsync/sync.go b/netsync/sync.go
new file mode 100644 (file)
index 0000000..223883a
--- /dev/null
@@ -0,0 +1,164 @@
+// Copyright 2015 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
+
+package netsync
+
+import (
+       "sync/atomic"
+       "time"
+       "math/rand"
+
+       log "github.com/sirupsen/logrus"
+
+       "github.com/bytom/common"
+       "github.com/bytom/protocol/bc/types"
+)
+
+const (
+       forceSyncCycle      = 10 * time.Second // Time interval to force syncs, even if few peers are available
+       minDesiredPeerCount = 5                // Amount of peers desired to start syncing
+
+       // This is the target size for the packs of transactions sent by txsyncLoop.
+       // A pack can get larger than this if a single transactions exceeds this size.
+       txsyncPackSize = 100 * 1024
+)
+
+type txsync struct {
+       p   *peer
+       txs []*types.Tx
+}
+
+// syncer is responsible for periodically synchronising with the network, both
+// downloading hashes and blocks as well as handling the announcement handler.
+func (sm *SyncManager) syncer() {
+       // Start and ensure cleanup of sync mechanisms
+       sm.fetcher.Start()
+       defer sm.fetcher.Stop()
+       //defer sm.downloader.Terminate()
+
+       // Wait for different events to fire synchronisation operations
+       forceSync := time.NewTicker(forceSyncCycle)
+       defer forceSync.Stop()
+
+       for {
+               select {
+               case <-sm.newPeerCh:
+                       log.Info("New peer connected.")
+                       // Make sure we have peers to select from, then sync
+                       if sm.sw.Peers().Size() < minDesiredPeerCount {
+                               break
+                       }
+                       go sm.synchronise()
+
+               case <-forceSync.C:
+                       // Force a sync even if not enough peers are present
+                       go sm.synchronise()
+
+               case <-sm.quitSync:
+                       return
+               }
+       }
+}
+
+// synchronise tries to sync up our local block chain with a remote peer.
+func (sm *SyncManager) synchronise() {
+       // Make sure only one goroutine is ever allowed past this point at once
+       if !atomic.CompareAndSwapInt32(&sm.synchronising, 0, 1) {
+               log.Info("Synchronising ...")
+               return
+       }
+       defer atomic.StoreInt32(&sm.synchronising, 0)
+
+       peer, bestHeight := sm.peers.BestPeer()
+       // Short circuit if no peers are available
+       if peer == nil {
+               return
+       }
+       if bestHeight > sm.chain.Height() {
+               sm.blockKeeper.BlockRequestWorker(peer.Key, bestHeight)
+       }
+}
+
+// txsyncLoop takes care of the initial transaction sync for each new
+// connection. When a new peer appears, we relay all currently pending
+// transactions. In order to minimise egress bandwidth usage, we send
+// the transactions in small packs to one peer at a time.
+func (sm *SyncManager) txsyncLoop() {
+       var (
+               pending = make(map[string]*txsync)
+               sending = false               // whether a send is active
+               pack    = new(txsync)         // the pack that is being sent
+               done    = make(chan error, 1) // result of the send
+       )
+
+       // send starts a sending a pack of transactions from the sync.
+       send := func(s *txsync) {
+               // Fill pack with transactions up to the target size.
+               size := common.StorageSize(0)
+               pack.p = s.p
+               pack.txs = pack.txs[:0]
+               for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+                       pack.txs = append(pack.txs, s.txs[i])
+                       size += common.StorageSize(s.txs[i].SerializedSize)
+               }
+               // Remove the transactions that will be sent.
+               s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+               if len(s.txs) == 0 {
+                       delete(pending, s.p.Key)
+               }
+               // Send the pack in the background.
+               log.Info("Sending batch of transactions. ", "count:", len(pack.txs), " bytes:", size)
+               sending = true
+               go func() { done <- pack.p.SendTransactions(pack.txs) }()
+       }
+
+       // pick chooses the next pending sync.
+       pick := func() *txsync {
+               if len(pending) == 0 {
+                       return nil
+               }
+               n := rand.Intn(len(pending)) + 1
+               for _, s := range pending {
+                       if n--; n == 0 {
+                               return s
+                       }
+               }
+               return nil
+       }
+
+       for {
+               select {
+               case s := <-sm.txSyncCh:
+                       pending[s.p.Key] = s
+                       if !sending {
+                               send(s)
+                       }
+               case err := <-done:
+                       sending = false
+                       // Stop tracking peers that cause send failures.
+                       if err != nil {
+                               log.Info("Transaction send failed", "err", err)
+                               delete(pending, pack.p.Key)
+                       }
+                       // Schedule the next send.
+                       if s := pick(); s != nil {
+                               send(s)
+                       }
+               case <-sm.quitSync:
+                       return
+               }
+       }
+}
index d8bdaa5..f8f1382 100755 (executable)
@@ -4,12 +4,9 @@ import (
        "context"
        "net/http"
        _ "net/http/pprof"
-       "strings"
        "time"
 
        log "github.com/sirupsen/logrus"
-       "github.com/tendermint/go-crypto"
-       "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
        dbm "github.com/tendermint/tmlibs/db"
        browser "github.com/toqueteos/webbrowser"
@@ -18,23 +15,25 @@ import (
        "github.com/bytom/account"
        "github.com/bytom/api"
        "github.com/bytom/asset"
-       bc "github.com/bytom/blockchain"
        "github.com/bytom/blockchain/pseudohsm"
        "github.com/bytom/blockchain/txfeed"
        cfg "github.com/bytom/config"
        "github.com/bytom/crypto/ed25519/chainkd"
        "github.com/bytom/database/leveldb"
        "github.com/bytom/env"
-       "github.com/bytom/p2p"
+       "github.com/bytom/mining/cpuminer"
+       "github.com/bytom/mining/miningpool"
+       "github.com/bytom/netsync"
        "github.com/bytom/protocol"
+       "github.com/bytom/protocol/bc"
        "github.com/bytom/types"
-       "github.com/bytom/version"
        w "github.com/bytom/wallet"
 )
 
 const (
        webAddress               = "http://127.0.0.1:9888"
        expireReservationsPeriod = time.Second
+       maxNewBlockChSize        = 1024
 )
 
 type Node struct {
@@ -43,17 +42,18 @@ type Node struct {
        // config
        config *cfg.Config
 
-       // network
-       privKey  crypto.PrivKeyEd25519 // local node's p2p key
-       sw       *p2p.Switch           // p2p connections
-       addrBook *p2p.AddrBook         // known peers
+       syncManager *netsync.SyncManager
 
-       evsw         types.EventSwitch // pub/sub for services
-       bcReactor    *bc.BlockchainReactor
+       evsw types.EventSwitch // pub/sub for services
+       //bcReactor    *bc.BlockchainReactor
        wallet       *w.Wallet
        accessTokens *accesstoken.CredentialStore
        api          *api.API
        chain        *protocol.Chain
+       txfeed       *txfeed.Tracker
+       cpuMiner     *cpuminer.CPUMiner
+       miningPool   *miningpool.MiningPool
+       miningEnable bool
 }
 
 func NewNode(config *cfg.Config) *Node {
@@ -66,8 +66,6 @@ func NewNode(config *cfg.Config) *Node {
        tokenDB := dbm.NewDB("accesstoken", config.DBBackend, config.DBDir())
        accessTokens := accesstoken.NewStore(tokenDB)
 
-       privKey := crypto.GenPrivKeyEd25519()
-
        // Make event switch
        eventSwitch := types.NewEventSwitch()
        _, err := eventSwitch.Start()
@@ -75,10 +73,6 @@ func NewNode(config *cfg.Config) *Node {
                cmn.Exit(cmn.Fmt("Failed to start switch: %v", err))
        }
 
-       trustHistoryDB := dbm.NewDB("trusthistory", config.DBBackend, config.DBDir())
-
-       sw := p2p.NewSwitch(config.P2P, trustHistoryDB)
-
        genesisBlock := cfg.GenerateGenesisBlock()
 
        txPool := protocol.NewTxPool()
@@ -130,18 +124,9 @@ func NewNode(config *cfg.Config) *Node {
                // Clean up expired UTXO reservations periodically.
                go accounts.ExpireReservations(ctx, expireReservationsPeriod)
        }
+       newBlockCh := make(chan *bc.Hash, maxNewBlockChSize)
 
-       bcReactor := bc.NewBlockchainReactor(chain, txPool, sw, accounts, txFeed, config.Mining)
-
-       sw.AddReactor("BLOCKCHAIN", bcReactor)
-
-       // Optionally, start the pex reactor
-       var addrBook *p2p.AddrBook
-       if config.P2P.PexReactor {
-               addrBook = p2p.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
-               pexReactor := p2p.NewPEXReactor(addrBook)
-               sw.AddReactor("PEX", pexReactor)
-       }
+       syncManager, _ := netsync.NewSyncManager(config, chain, txPool, newBlockCh)
 
        // run the profile server
        profileHost := config.ProfListenAddress
@@ -154,18 +139,19 @@ func NewNode(config *cfg.Config) *Node {
        }
 
        node := &Node{
-               config: config,
-
-               privKey:  privKey,
-               sw:       sw,
-               addrBook: addrBook,
-
+               config:       config,
+               syncManager:  syncManager,
                evsw:         eventSwitch,
-               bcReactor:    bcReactor,
                accessTokens: accessTokens,
                wallet:       wallet,
                chain:        chain,
+               txfeed:       txFeed,
+               miningEnable: config.Mining,
        }
+
+       node.cpuMiner = cpuminer.NewCPUMiner(chain, accounts, txPool, newBlockCh)
+       node.miningPool = miningpool.NewMiningPool(chain, accounts, txPool, newBlockCh)
+
        node.BaseService = *cmn.NewBaseService(nil, "Node", node)
 
        return node
@@ -211,35 +197,17 @@ func lanchWebBroser() {
 }
 
 func (n *Node) initAndstartApiServer() {
-       n.api = api.NewAPI(n.bcReactor, n.wallet, n.chain, n.config, n.accessTokens)
+       n.api = api.NewAPI(n.syncManager, n.wallet, n.txfeed, n.cpuMiner, n.miningPool, n.chain, n.config, n.accessTokens)
 
        listenAddr := env.String("LISTEN", n.config.ApiAddress)
        n.api.StartServer(*listenAddr)
 }
 
 func (n *Node) OnStart() error {
-       // Create & add listener
-       p, address := ProtocolAndAddress(n.config.P2P.ListenAddress)
-       l := p2p.NewDefaultListener(p, address, n.config.P2P.SkipUPNP, nil)
-       n.sw.AddListener(l)
-
-       // Start the switch
-       n.sw.SetNodeInfo(n.makeNodeInfo())
-       n.sw.SetNodePrivKey(n.privKey)
-       _, err := n.sw.Start()
-       if err != nil {
-               return err
+       if n.miningEnable {
+               n.cpuMiner.Start()
        }
-
-       // If seeds exist, add them to the address book and dial out
-       if n.config.P2P.Seeds != "" {
-               // dial out
-               seeds := strings.Split(n.config.P2P.Seeds, ",")
-               if err := n.DialSeeds(seeds); err != nil {
-                       return err
-               }
-       }
-
+       n.syncManager.Start()
        n.initAndstartApiServer()
        if !n.config.Web.Closed {
                lanchWebBroser()
@@ -250,11 +218,12 @@ func (n *Node) OnStart() error {
 
 func (n *Node) OnStop() {
        n.BaseService.OnStop()
-
+       if n.miningEnable {
+               n.cpuMiner.Stop()
+       }
+       n.syncManager.Stop()
        log.Info("Stopping Node")
        // TODO: gracefully disconnect from peers.
-       n.sw.Stop()
-
 }
 
 func (n *Node) RunForever() {
@@ -264,68 +233,16 @@ func (n *Node) RunForever() {
        })
 }
 
-// Add a Listener to accept inbound peer connections.
-// Add listeners before starting the Node.
-// The first listener is the primary listener (in NodeInfo)
-func (n *Node) AddListener(l p2p.Listener) {
-       n.sw.AddListener(l)
-}
-
-func (n *Node) Switch() *p2p.Switch {
-       return n.sw
-}
-
 func (n *Node) EventSwitch() types.EventSwitch {
        return n.evsw
 }
 
-func (n *Node) makeNodeInfo() *p2p.NodeInfo {
-       nodeInfo := &p2p.NodeInfo{
-               PubKey:  n.privKey.PubKey().Unwrap().(crypto.PubKeyEd25519),
-               Moniker: n.config.Moniker,
-               Network: "bytom",
-               Version: version.Version,
-               Other: []string{
-                       cmn.Fmt("wire_version=%v", wire.Version),
-                       cmn.Fmt("p2p_version=%v", p2p.Version),
-               },
-       }
-
-       if !n.sw.IsListening() {
-               return nodeInfo
-       }
-
-       p2pListener := n.sw.Listeners()[0]
-       p2pHost := p2pListener.ExternalAddress().IP.String()
-       p2pPort := p2pListener.ExternalAddress().Port
-       //rpcListenAddr := n.config.RPC.ListenAddress
-
-       // We assume that the rpcListener has the same ExternalAddress.
-       // This is probably true because both P2P and RPC listeners use UPnP,
-       // except of course if the rpc is only bound to localhost
-       nodeInfo.ListenAddr = cmn.Fmt("%v:%v", p2pHost, p2pPort)
-       //nodeInfo.Other = append(nodeInfo.Other, cmn.Fmt("rpc_addr=%v", rpcListenAddr))
-       return nodeInfo
-}
-
-//------------------------------------------------------------------------------
-
-func (n *Node) NodeInfo() *p2p.NodeInfo {
-       return n.sw.NodeInfo()
-}
-
-func (n *Node) DialSeeds(seeds []string) error {
-       return n.sw.DialSeeds(n.addrBook, seeds)
+func (n *Node) SyncManager() *netsync.SyncManager {
+       return n.syncManager
 }
 
-// Defaults to tcp
-func ProtocolAndAddress(listenAddr string) (string, string) {
-       p, address := "tcp", listenAddr
-       parts := strings.SplitN(address, "://", 2)
-       if len(parts) == 2 {
-               p, address = parts[0], parts[1]
-       }
-       return p, address
+func (n *Node) MiningPool() *miningpool.MiningPool {
+       return n.miningPool
 }
 
 //------------------------------------------------------------------------------
index 75c383f..b5c6683 100644 (file)
@@ -6,10 +6,12 @@ import (
        "math/rand"
        "reflect"
        "time"
+       "strings"
 
        log "github.com/sirupsen/logrus"
        wire "github.com/tendermint/go-wire"
        cmn "github.com/tendermint/tmlibs/common"
+       "github.com/bytom/errors"
 )
 
 const (
@@ -94,7 +96,7 @@ func (r *PEXReactor) GetChannels() []*ChannelDescriptor {
 
 // AddPeer implements Reactor by adding peer to the address book (if inbound)
 // or by requesting more addresses (if outbound).
-func (r *PEXReactor) AddPeer(p *Peer) {
+func (r *PEXReactor) AddPeer(p *Peer) error {
        if p.IsOutbound() {
                // For outbound peers, the address is already in the books.
                // Either it was added in DialSeeds or when we
@@ -110,10 +112,11 @@ func (r *PEXReactor) AddPeer(p *Peer) {
                                "addr":  p.ListenAddr,
                                "error": err,
                        }).Error("Error in AddPeer: Invalid peer address")
-                       return
+                       return errors.New("Error in AddPeer: Invalid peer address")
                }
                r.book.AddAddress(addr, addr)
        }
+       return nil
 }
 
 // RemovePeer implements Reactor.
@@ -263,7 +266,13 @@ func (r *PEXReactor) ensurePeers() {
                        }
                        _, alreadySelected := toDial[try.IP.String()]
                        alreadyDialing := r.Switch.IsDialing(try)
-                       alreadyConnected := r.Switch.Peers().Has(try.IP.String())
+                       var alreadyConnected bool
+                       for _, v := range r.Switch.Peers().list {
+                               if strings.Compare(v.mconn.RemoteAddress.String(), try.String()) == 0 {
+                                       alreadyConnected = true
+                                       break
+                               }
+                       }
                        if alreadySelected || alreadyDialing || alreadyConnected {
                                continue
                        } else {
index b74b532..4559a30 100644 (file)
@@ -34,7 +34,7 @@ type Reactor interface {
 
        SetSwitch(*Switch)
        GetChannels() []*ChannelDescriptor
-       AddPeer(peer *Peer)
+       AddPeer(peer *Peer) error
        RemovePeer(peer *Peer, reason interface{})
        Receive(chID byte, peer *Peer, msgBytes []byte)
 }
@@ -261,7 +261,9 @@ func (sw *Switch) AddPeer(peer *Peer) error {
 
        // Start peer
        if sw.IsRunning() {
-               sw.startInitPeer(peer)
+               if err := sw.startInitPeer(peer); err != nil {
+                       return err
+               }
        }
 
        // Add the peer to .peers.
@@ -303,11 +305,14 @@ func (sw *Switch) SetPubKeyFilter(f func(crypto.PubKeyEd25519) error) {
        sw.filterConnByPubKey = f
 }
 
-func (sw *Switch) startInitPeer(peer *Peer) {
+func (sw *Switch) startInitPeer(peer *Peer) error {
        peer.Start() // spawn send/recv routines
        for _, reactor := range sw.reactors {
-               reactor.AddPeer(peer)
+               if err := reactor.AddPeer(peer); err != nil {
+                       return err
+               }
        }
+       return nil
 }
 
 // Dial a list of seeds asynchronously in random order
@@ -427,7 +432,7 @@ func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
        return
 }
 
-func (sw *Switch) Peers() IPeerSet {
+func (sw *Switch) Peers() *PeerSet {
        return sw.peers
 }
 
index 92c6c11..a83bb37 100755 (executable)
@@ -123,6 +123,8 @@ func (c *Chain) reorganizeChain(block *types.Block) error {
                        return err
                }
                chainChanges[a.Height] = &attachBlock.ID
+               // TODO: put this action in better place
+               c.orphanManage.Delete(&attachBlock.ID)
        }
 
        return c.setState(block, utxoView, chainChanges)
@@ -189,7 +191,7 @@ func (c *Chain) ProcessBlock(block *types.Block) (bool, error) {
        blockHash := block.Hash()
        if c.BlockExist(&blockHash) {
                log.WithField("hash", blockHash.String()).Info("Skip process due to block already been handled")
-               return false, nil
+               return c.orphanManage.BlockExist(&blockHash), nil
        }
        if !c.store.BlockExist(&block.PreviousBlockHash) {
                c.orphanManage.Add(block)
index 75e1c1a..55fcd86 100644 (file)
@@ -13,22 +13,21 @@ var ErrBadTx = errors.New("invalid transaction")
 // ValidateTx validates the given transaction. A cache holds
 // per-transaction validation results and is consulted before
 // performing full validation.
-func (c *Chain) ValidateTx(tx *types.Tx) error {
+func (c *Chain) ValidateTx(tx *types.Tx) (bool,error) {
        newTx := tx.Tx
        block := types.MapBlock(c.BestBlock())
        if ok := c.txPool.HaveTransaction(&newTx.ID); ok {
-               return c.txPool.GetErrCache(&newTx.ID)
+               return false, c.txPool.GetErrCache(&newTx.ID)
        }
 
        // validate the UTXO
        view := c.txPool.GetTransactionUTXO(tx.Tx)
        if err := c.GetTransactionsUtxo(view, []*bc.Tx{newTx}); err != nil {
                c.txPool.AddErrCache(&newTx.ID, err)
-               return err
+               return  false, err
        }
        if err := view.ApplyTransaction(block, newTx, false); err != nil {
-               c.txPool.AddErrCache(&newTx.ID, err)
-               return err
+               return true, err
        }
 
        // validate the BVM contract
@@ -37,11 +36,11 @@ func (c *Chain) ValidateTx(tx *types.Tx) error {
        if err != nil {
                if gasStatus == nil || !gasStatus.GasVaild {
                        c.txPool.AddErrCache(&newTx.ID, err)
-                       return err
+                       return false, err
                }
                gasOnlyTx = true
        }
 
        _, err = c.txPool.AddTransaction(tx, gasOnlyTx, block.BlockHeader.Height, gasStatus.BTMValue)
-       return err
+       return false, err
 }
diff --git a/vendor/gopkg.in/fatih/set.v0/LICENSE.md b/vendor/gopkg.in/fatih/set.v0/LICENSE.md
new file mode 100644 (file)
index 0000000..25fdaf6
--- /dev/null
@@ -0,0 +1,20 @@
+The MIT License (MIT)
+
+Copyright (c) 2013 Fatih Arslan
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/vendor/gopkg.in/fatih/set.v0/README.md b/vendor/gopkg.in/fatih/set.v0/README.md
new file mode 100644 (file)
index 0000000..23afdd9
--- /dev/null
@@ -0,0 +1,245 @@
+# Set [![GoDoc](http://img.shields.io/badge/go-documentation-blue.svg?style=flat-square)](https://godoc.org/gopkg.in/fatih/set.v0) [![Build Status](http://img.shields.io/travis/fatih/set.svg?style=flat-square)](https://travis-ci.org/fatih/set)
+
+Set is a basic and simple, hash-based, **Set** data structure implementation
+in Go (Golang).
+
+Set provides both threadsafe and non-threadsafe implementations of a generic
+set data structure. The thread safety encompasses all operations on one set.
+Operations on multiple sets are consistent in that the elements of each set
+used was valid at exactly one point in time between the start and the end of
+the operation. Because it's thread safe, you can use it concurrently with your
+goroutines.
+
+For usage see examples below or click on the godoc badge.
+
+## Install and Usage
+
+Install the package with:
+
+```bash
+go get gopkg.in/fatih/set.v0
+```
+
+Import it with:
+
+```go
+import "gopkg.in/fatih/set.v0"
+```
+
+and use `set` as the package name inside the code.
+
+## Examples
+
+#### Initialization of a new Set
+
+```go
+
+// create a set with zero items
+s := set.New()
+s := set.NewNonTS() // non thread-safe version
+
+// ... or with some initial values
+s := set.New("istanbul", "frankfurt", 30.123, "san francisco", 1234)
+s := set.NewNonTS("kenya", "ethiopia", "sumatra")
+
+```
+
+#### Basic Operations
+
+```go
+// add items
+s.Add("istanbul")
+s.Add("istanbul") // nothing happens if you add duplicate item
+
+// add multiple items
+s.Add("ankara", "san francisco", 3.14)
+
+// remove item
+s.Remove("frankfurt")
+s.Remove("frankfurt") // nothing happes if you remove a nonexisting item
+
+// remove multiple items
+s.Remove("barcelona", 3.14, "ankara")
+
+// removes an arbitary item and return it
+item := s.Pop()
+
+// create a new copy
+other := s.Copy()
+
+// remove all items
+s.Clear()
+
+// number of items in the set
+len := s.Size()
+
+// return a list of items
+items := s.List()
+
+// string representation of set
+fmt.Printf("set is %s", s.String())
+
+```
+
+#### Check Operations
+
+```go
+// check for set emptiness, returns true if set is empty
+s.IsEmpty()
+
+// check for a single item exist
+s.Has("istanbul")
+
+// ... or for multiple items. This will return true if all of the items exist.
+s.Has("istanbul", "san francisco", 3.14)
+
+// create two sets for the following checks...
+s := s.New("1", "2", "3", "4", "5")
+t := s.New("1", "2", "3")
+
+
+// check if they are the same
+if !s.IsEqual(t) {
+    fmt.Println("s is not equal to t")
+}
+
+// if s contains all elements of t
+if s.IsSubset(t) {
+       fmt.Println("t is a subset of s")
+}
+
+// ... or if s is a superset of t
+if t.IsSuperset(s) {
+       fmt.Println("s is a superset of t")
+}
+
+
+```
+
+#### Set Operations
+
+
+```go
+// let us initialize two sets with some values
+a := set.New("ankara", "berlin", "san francisco")
+b := set.New("frankfurt", "berlin")
+
+// creates a new set with the items in a and b combined.
+// [frankfurt, berlin, ankara, san francisco]
+c := set.Union(a, b)
+
+// contains items which is in both a and b
+// [berlin]
+c := set.Intersection(a, b)
+
+// contains items which are in a but not in b
+// [ankara, san francisco]
+c := set.Difference(a, b)
+
+// contains items which are in one of either, but not in both.
+// [frankfurt, ankara, san francisco]
+c := set.SymmetricDifference(a, b)
+
+```
+
+```go
+// like Union but saves the result back into a.
+a.Merge(b)
+
+// removes the set items which are in b from a and saves the result back into a.
+a.Separate(b)
+
+```
+
+#### Multiple Set Operations
+
+```go
+a := set.New("1", "3", "4", "5")
+b := set.New("2", "3", "4", "5")
+c := set.New("4", "5", "6", "7")
+
+// creates a new set with items in a, b and c
+// [1 2 3 4 5 6 7]
+u := set.Union(a, b, c)
+
+// creates a new set with items in a but not in b and c
+// [1]
+u := set.Difference(a, b, c)
+
+// creates a new set with items that are common to a, b and c
+// [5]
+u := set.Intersection(a, b, c)
+```
+
+#### Helper methods
+
+The Slice functions below are a convenient way to extract or convert your Set data
+into basic data types.
+
+
+```go
+// create a set of mixed types
+s := set.New("ankara", "5", "8", "san francisco", 13, 21)
+
+
+// convert s into a slice of strings (type is []string)
+// [ankara 5 8 san francisco]
+t := set.StringSlice(s)
+
+
+// u contains a slice of ints (type is []int)
+// [13, 21]
+u := set.IntSlice(s)
+
+```
+
+#### Concurrent safe usage
+
+Below is an example of a concurrent way that uses set. We call ten functions
+concurrently and wait until they are finished. It basically creates a new
+string for each goroutine and adds it to our set.
+
+```go
+package main
+
+import (
+       "fmt"
+       "github.com/fatih/set"
+       "strconv"
+       "sync"
+)
+
+func main() {
+       var wg sync.WaitGroup // this is just for waiting until all goroutines finish
+
+       // Initialize our thread safe Set
+       s := set.New()
+
+       // Add items concurrently (item1, item2, and so on)
+       for i := 0; i < 10; i++ {
+               wg.Add(1)
+               go func(i int) {
+                       item := "item" + strconv.Itoa(i)
+                       fmt.Println("adding", item)
+                       s.Add(item)
+                       wg.Done()
+               }(i)
+       }
+
+       // Wait until all concurrent calls finished and print our set
+       wg.Wait()
+       fmt.Println(s)
+}
+```
+
+## Credits
+
+ * [Fatih Arslan](https://github.com/fatih)
+ * [Arne Hormann](https://github.com/arnehormann)
+ * [Sam Boyer](https://github.com/sdboyer)
+ * [Ralph Loizzo](https://github.com/friartech)
+
+## License
+
+The MIT License (MIT) - see LICENSE.md for more details
+
diff --git a/vendor/gopkg.in/fatih/set.v0/set.go b/vendor/gopkg.in/fatih/set.v0/set.go
new file mode 100644 (file)
index 0000000..ac0240c
--- /dev/null
@@ -0,0 +1,121 @@
+// Package set provides both threadsafe and non-threadsafe implementations of
+// a generic set data structure. In the threadsafe set, safety encompasses all
+// operations on one set. Operations on multiple sets are consistent in that
+// the elements of each set used was valid at exactly one point in time
+// between the start and the end of the operation.
+package set
+
+// Interface is describing a Set. Sets are an unordered, unique list of values.
+type Interface interface {
+       New(items ...interface{}) Interface
+       Add(items ...interface{})
+       Remove(items ...interface{})
+       Pop() interface{}
+       Has(items ...interface{}) bool
+       Size() int
+       Clear()
+       IsEmpty() bool
+       IsEqual(s Interface) bool
+       IsSubset(s Interface) bool
+       IsSuperset(s Interface) bool
+       Each(func(interface{}) bool)
+       String() string
+       List() []interface{}
+       Copy() Interface
+       Merge(s Interface)
+       Separate(s Interface)
+}
+
+// helpful to not write everywhere struct{}{}
+var keyExists = struct{}{}
+
+// Union is the merger of multiple sets. It returns a new set with all the
+// elements present in all the sets that are passed.
+//
+// The dynamic type of the returned set is determined by the first passed set's
+// implementation of the New() method.
+func Union(set1, set2 Interface, sets ...Interface) Interface {
+       u := set1.Copy()
+       set2.Each(func(item interface{}) bool {
+               u.Add(item)
+               return true
+       })
+       for _, set := range sets {
+               set.Each(func(item interface{}) bool {
+                       u.Add(item)
+                       return true
+               })
+       }
+
+       return u
+}
+
+// Difference returns a new set which contains items which are in in the first
+// set but not in the others. Unlike the Difference() method you can use this
+// function separately with multiple sets.
+func Difference(set1, set2 Interface, sets ...Interface) Interface {
+       s := set1.Copy()
+       s.Separate(set2)
+       for _, set := range sets {
+               s.Separate(set) // seperate is thread safe
+       }
+       return s
+}
+
+// Intersection returns a new set which contains items that only exist in all given sets.
+func Intersection(set1, set2 Interface, sets ...Interface) Interface {
+       all := Union(set1, set2, sets...)
+       result := Union(set1, set2, sets...)
+
+       all.Each(func(item interface{}) bool {
+               if !set1.Has(item) || !set2.Has(item) {
+                       result.Remove(item)
+               }
+
+               for _, set := range sets {
+                       if !set.Has(item) {
+                               result.Remove(item)
+                       }
+               }
+               return true
+       })
+       return result
+}
+
+// SymmetricDifference returns a new set which s is the difference of items which are in
+// one of either, but not in both.
+func SymmetricDifference(s Interface, t Interface) Interface {
+       u := Difference(s, t)
+       v := Difference(t, s)
+       return Union(u, v)
+}
+
+// StringSlice is a helper function that returns a slice of strings of s. If
+// the set contains mixed types of items only items of type string are returned.
+func StringSlice(s Interface) []string {
+       slice := make([]string, 0)
+       for _, item := range s.List() {
+               v, ok := item.(string)
+               if !ok {
+                       continue
+               }
+
+               slice = append(slice, v)
+       }
+       return slice
+}
+
+// IntSlice is a helper function that returns a slice of ints of s. If
+// the set contains mixed types of items only items of type int are returned.
+func IntSlice(s Interface) []int {
+       slice := make([]int, 0)
+       for _, item := range s.List() {
+               v, ok := item.(int)
+               if !ok {
+                       continue
+               }
+
+               slice = append(slice, v)
+       }
+       return slice
+}
diff --git a/vendor/gopkg.in/fatih/set.v0/set_nots.go b/vendor/gopkg.in/fatih/set.v0/set_nots.go
new file mode 100644 (file)
index 0000000..ec1ab22
--- /dev/null
@@ -0,0 +1,195 @@
+package set
+
+import (
+       "fmt"
+       "strings"
+)
+
+// Provides a common set baseline for both threadsafe and non-ts Sets.
+type set struct {
+       m map[interface{}]struct{} // struct{} doesn't take up space
+}
+
+// SetNonTS defines a non-thread safe set data structure.
+type SetNonTS struct {
+       set
+}
+
+// NewNonTS creates and initialize a new non-threadsafe Set.
+// It accepts a variable number of arguments to populate the initial set.
+// If nothing is passed a SetNonTS with zero size is created.
+func NewNonTS(items ...interface{}) *SetNonTS {
+       s := &SetNonTS{}
+       s.m = make(map[interface{}]struct{})
+
+       // Ensure interface compliance
+       var _ Interface = s
+
+       s.Add(items...)
+       return s
+}
+
+// New creates and initalizes a new Set interface. It accepts a variable
+// number of arguments to populate the initial set. If nothing is passed a
+// zero size Set based on the struct is created.
+func (s *set) New(items ...interface{}) Interface {
+       return NewNonTS(items...)
+}
+
+// Add includes the specified items (one or more) to the set. The underlying
+// Set s is modified. If passed nothing it silently returns.
+func (s *set) Add(items ...interface{}) {
+       if len(items) == 0 {
+               return
+       }
+
+       for _, item := range items {
+               s.m[item] = keyExists
+       }
+}
+
+// Remove deletes the specified items from the set.  The underlying Set s is
+// modified. If passed nothing it silently returns.
+func (s *set) Remove(items ...interface{}) {
+       if len(items) == 0 {
+               return
+       }
+
+       for _, item := range items {
+               delete(s.m, item)
+       }
+}
+
+// Pop  deletes and return an item from the set. The underlying Set s is
+// modified. If set is empty, nil is returned.
+func (s *set) Pop() interface{} {
+       for item := range s.m {
+               delete(s.m, item)
+               return item
+       }
+       return nil
+}
+
+// Has looks for the existence of items passed. It returns false if nothing is
+// passed. For multiple items it returns true only if all of  the items exist.
+func (s *set) Has(items ...interface{}) bool {
+       // assume checked for empty item, which not exist
+       if len(items) == 0 {
+               return false
+       }
+
+       has := true
+       for _, item := range items {
+               if _, has = s.m[item]; !has {
+                       break
+               }
+       }
+       return has
+}
+
+// Size returns the number of items in a set.
+func (s *set) Size() int {
+       return len(s.m)
+}
+
+// Clear removes all items from the set.
+func (s *set) Clear() {
+       s.m = make(map[interface{}]struct{})
+}
+
+// IsEmpty reports whether the Set is empty.
+func (s *set) IsEmpty() bool {
+       return s.Size() == 0
+}
+
+// IsEqual test whether s and t are the same in size and have the same items.
+func (s *set) IsEqual(t Interface) bool {
+       // Force locking only if given set is threadsafe.
+       if conv, ok := t.(*Set); ok {
+               conv.l.RLock()
+               defer conv.l.RUnlock()
+       }
+
+       // return false if they are no the same size
+       if sameSize := len(s.m) == t.Size(); !sameSize {
+               return false
+       }
+
+       equal := true
+       t.Each(func(item interface{}) bool {
+               _, equal = s.m[item]
+               return equal // if false, Each() will end
+       })
+
+       return equal
+}
+
+// IsSubset tests whether t is a subset of s.
+func (s *set) IsSubset(t Interface) (subset bool) {
+       subset = true
+
+       t.Each(func(item interface{}) bool {
+               _, subset = s.m[item]
+               return subset
+       })
+
+       return
+}
+
+// IsSuperset tests whether t is a superset of s.
+func (s *set) IsSuperset(t Interface) bool {
+       return t.IsSubset(s)
+}
+
+// Each traverses the items in the Set, calling the provided function for each
+// set member. Traversal will continue until all items in the Set have been
+// visited, or if the closure returns false.
+func (s *set) Each(f func(item interface{}) bool) {
+       for item := range s.m {
+               if !f(item) {
+                       break
+               }
+       }
+}
+
+// String returns a string representation of s
+func (s *set) String() string {
+       t := make([]string, 0, len(s.List()))
+       for _, item := range s.List() {
+               t = append(t, fmt.Sprintf("%v", item))
+       }
+
+       return fmt.Sprintf("[%s]", strings.Join(t, ", "))
+}
+
+// List returns a slice of all items. There is also StringSlice() and
+// IntSlice() methods for returning slices of type string or int.
+func (s *set) List() []interface{} {
+       list := make([]interface{}, 0, len(s.m))
+
+       for item := range s.m {
+               list = append(list, item)
+       }
+
+       return list
+}
+
+// Copy returns a new Set with a copy of s.
+func (s *set) Copy() Interface {
+       return NewNonTS(s.List()...)
+}
+
+// Merge is like Union, however it modifies the current set it's applied on
+// with the given t set.
+func (s *set) Merge(t Interface) {
+       t.Each(func(item interface{}) bool {
+               s.m[item] = keyExists
+               return true
+       })
+}
+
+// it's not the opposite of Merge.
+// Separate removes the set items containing in t from set s. Please aware that
+func (s *set) Separate(t Interface) {
+       s.Remove(t.List()...)
+}
diff --git a/vendor/gopkg.in/fatih/set.v0/set_ts.go b/vendor/gopkg.in/fatih/set.v0/set_ts.go
new file mode 100644 (file)
index 0000000..50f5325
--- /dev/null
@@ -0,0 +1,200 @@
+package set
+
+import (
+       "sync"
+)
+
+// Set defines a thread safe set data structure.
+type Set struct {
+       set
+       l sync.RWMutex // we name it because we don't want to expose it
+}
+
+// New creates and initialize a new Set. It's accept a variable number of
+// arguments to populate the initial set. If nothing passed a Set with zero
+// size is created.
+func New(items ...interface{}) *Set {
+       s := &Set{}
+       s.m = make(map[interface{}]struct{})
+
+       // Ensure interface compliance
+       var _ Interface = s
+
+       s.Add(items...)
+       return s
+}
+
+// New creates and initalizes a new Set interface. It accepts a variable
+// number of arguments to populate the initial set. If nothing is passed a
+// zero size Set based on the struct is created.
+func (s *Set) New(items ...interface{}) Interface {
+       return New(items...)
+}
+
+// Add includes the specified items (one or more) to the set. The underlying
+// Set s is modified. If passed nothing it silently returns.
+func (s *Set) Add(items ...interface{}) {
+       if len(items) == 0 {
+               return
+       }
+
+       s.l.Lock()
+       defer s.l.Unlock()
+
+       for _, item := range items {
+               s.m[item] = keyExists
+       }
+}
+
+// Remove deletes the specified items from the set.  The underlying Set s is
+// modified. If passed nothing it silently returns.
+func (s *Set) Remove(items ...interface{}) {
+       if len(items) == 0 {
+               return
+       }
+
+       s.l.Lock()
+       defer s.l.Unlock()
+
+       for _, item := range items {
+               delete(s.m, item)
+       }
+}
+
+// Pop  deletes and return an item from the set. The underlying Set s is
+// modified. If set is empty, nil is returned.
+func (s *Set) Pop() interface{} {
+       s.l.RLock()
+       for item := range s.m {
+               s.l.RUnlock()
+               s.l.Lock()
+               delete(s.m, item)
+               s.l.Unlock()
+               return item
+       }
+       s.l.RUnlock()
+       return nil
+}
+
+// Has looks for the existence of items passed. It returns false if nothing is
+// passed. For multiple items it returns true only if all of  the items exist.
+func (s *Set) Has(items ...interface{}) bool {
+       // assume checked for empty item, which not exist
+       if len(items) == 0 {
+               return false
+       }
+
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       has := true
+       for _, item := range items {
+               if _, has = s.m[item]; !has {
+                       break
+               }
+       }
+       return has
+}
+
+// Size returns the number of items in a set.
+func (s *Set) Size() int {
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       l := len(s.m)
+       return l
+}
+
+// Clear removes all items from the set.
+func (s *Set) Clear() {
+       s.l.Lock()
+       defer s.l.Unlock()
+
+       s.m = make(map[interface{}]struct{})
+}
+
+// IsEqual test whether s and t are the same in size and have the same items.
+func (s *Set) IsEqual(t Interface) bool {
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       // Force locking only if given set is threadsafe.
+       if conv, ok := t.(*Set); ok {
+               conv.l.RLock()
+               defer conv.l.RUnlock()
+       }
+
+       // return false if they are no the same size
+       if sameSize := len(s.m) == t.Size(); !sameSize {
+               return false
+       }
+
+       equal := true
+       t.Each(func(item interface{}) bool {
+               _, equal = s.m[item]
+               return equal // if false, Each() will end
+       })
+
+       return equal
+}
+
+// IsSubset tests whether t is a subset of s.
+func (s *Set) IsSubset(t Interface) (subset bool) {
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       subset = true
+
+       t.Each(func(item interface{}) bool {
+               _, subset = s.m[item]
+               return subset
+       })
+
+       return
+}
+
+// Each traverses the items in the Set, calling the provided function for each
+// set member. Traversal will continue until all items in the Set have been
+// visited, or if the closure returns false.
+func (s *Set) Each(f func(item interface{}) bool) {
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       for item := range s.m {
+               if !f(item) {
+                       break
+               }
+       }
+}
+
+// List returns a slice of all items. There is also StringSlice() and
+// IntSlice() methods for returning slices of type string or int.
+func (s *Set) List() []interface{} {
+       s.l.RLock()
+       defer s.l.RUnlock()
+
+       list := make([]interface{}, 0, len(s.m))
+
+       for item := range s.m {
+               list = append(list, item)
+       }
+
+       return list
+}
+
+// Copy returns a new Set with a copy of s.
+func (s *Set) Copy() Interface {
+       return New(s.List()...)
+}
+
+// Merge is like Union, however it modifies the current set it's applied on
+// with the given t set.
+func (s *Set) Merge(t Interface) {
+       s.l.Lock()
+       defer s.l.Unlock()
+
+       t.Each(func(item interface{}) bool {
+               s.m[item] = keyExists
+               return true
+       })
+}
diff --git a/vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go b/vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/prque.go
new file mode 100755 (executable)
index 0000000..5c1967c
--- /dev/null
@@ -0,0 +1,66 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
+// Package prque implements a priority queue data structure supporting arbitrary
+// value types and float priorities.
+//
+// The reasoning behind using floats for the priorities vs. ints or interfaces
+// was larger flexibility without sacrificing too much performance or code
+// complexity.
+//
+// If you would like to use a min-priority queue, simply negate the priorities.
+//
+// Internally the queue is based on the standard heap package working on a
+// sortable version of the block based stack.
+package prque
+
+import (
+       "container/heap"
+)
+
+// Priority queue data structure.
+type Prque struct {
+       cont *sstack
+}
+
+// Creates a new priority queue.
+func New() *Prque {
+       return &Prque{newSstack()}
+}
+
+// Pushes a value with a given priority into the queue, expanding if necessary.
+func (p *Prque) Push(data interface{}, priority float32) {
+       heap.Push(p.cont, &item{data, priority})
+}
+
+// Pops the value with the greates priority off the stack and returns it.
+// Currently no shrinking is done.
+func (p *Prque) Pop() (interface{}, float32) {
+       item := heap.Pop(p.cont).(*item)
+       return item.value, item.priority
+}
+
+// Pops only the item from the queue, dropping the associated priority value.
+func (p *Prque) PopItem() interface{} {
+       return heap.Pop(p.cont).(*item).value
+}
+
+// Checks whether the priority queue is empty.
+func (p *Prque) Empty() bool {
+       return p.cont.Len() == 0
+}
+
+// Returns the number of element in the priority queue.
+func (p *Prque) Size() int {
+       return p.cont.Len()
+}
+
+// Clears the contents of the priority queue.
+func (p *Prque) Reset() {
+       *p = *New()
+}
diff --git a/vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go b/vendor/gopkg.in/karalabe/cookiejar.v2/collections/prque/sstack.go
new file mode 100755 (executable)
index 0000000..9f39319
--- /dev/null
@@ -0,0 +1,91 @@
+// CookieJar - A contestant's algorithm toolbox
+// Copyright (c) 2013 Peter Szilagyi. All rights reserved.
+//
+// CookieJar is dual licensed: use of this source code is governed by a BSD
+// license that can be found in the LICENSE file. Alternatively, the CookieJar
+// toolbox may be used in accordance with the terms and conditions contained
+// in a signed written agreement between you and the author(s).
+
+package prque
+
+// The size of a block of data
+const blockSize = 4096
+
+// A prioritized item in the sorted stack.
+type item struct {
+       value    interface{}
+       priority float32
+}
+
+// Internal sortable stack data structure. Implements the Push and Pop ops for
+// the stack (heap) functionality and the Len, Less and Swap methods for the
+// sortability requirements of the heaps.
+type sstack struct {
+       size     int
+       capacity int
+       offset   int
+
+       blocks [][]*item
+       active []*item
+}
+
+// Creates a new, empty stack.
+func newSstack() *sstack {
+       result := new(sstack)
+       result.active = make([]*item, blockSize)
+       result.blocks = [][]*item{result.active}
+       result.capacity = blockSize
+       return result
+}
+
+// Pushes a value onto the stack, expanding it if necessary. Required by
+// heap.Interface.
+func (s *sstack) Push(data interface{}) {
+       if s.size == s.capacity {
+               s.active = make([]*item, blockSize)
+               s.blocks = append(s.blocks, s.active)
+               s.capacity += blockSize
+               s.offset = 0
+       } else if s.offset == blockSize {
+               s.active = s.blocks[s.size/blockSize]
+               s.offset = 0
+       }
+       s.active[s.offset] = data.(*item)
+       s.offset++
+       s.size++
+}
+
+// Pops a value off the stack and returns it. Currently no shrinking is done.
+// Required by heap.Interface.
+func (s *sstack) Pop() (res interface{}) {
+       s.size--
+       s.offset--
+       if s.offset < 0 {
+               s.offset = blockSize - 1
+               s.active = s.blocks[s.size/blockSize]
+       }
+       res, s.active[s.offset] = s.active[s.offset], nil
+       return
+}
+
+// Returns the length of the stack. Required by sort.Interface.
+func (s *sstack) Len() int {
+       return s.size
+}
+
+// Compares the priority of two elements of the stack (higher is first).
+// Required by sort.Interface.
+func (s *sstack) Less(i, j int) bool {
+       return s.blocks[i/blockSize][i%blockSize].priority > s.blocks[j/blockSize][j%blockSize].priority
+}
+
+// Swaps two elements in the stack. Required by sort.Interface.
+func (s *sstack) Swap(i, j int) {
+       ib, io, jb, jo := i/blockSize, i%blockSize, j/blockSize, j%blockSize
+       s.blocks[ib][io], s.blocks[jb][jo] = s.blocks[jb][jo], s.blocks[ib][io]
+}
+
+// Resets the stack, effectively clearing its contents.
+func (s *sstack) Reset() {
+       *s = *newSstack()
+}