mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-21 22:43:52 +00:00
Merge pull request #1710 from c9s/c9s/xmaker/stb-improvements
IMPROVE: [xmaker] improve stability
This commit is contained in:
commit
80949bf0e1
25
.travis.yml
25
.travis.yml
|
@ -1,25 +0,0 @@
|
|||
---
|
||||
language: go
|
||||
go:
|
||||
- 1.14
|
||||
- 1.15
|
||||
|
||||
services:
|
||||
- redis-server
|
||||
- mysql
|
||||
|
||||
before_install:
|
||||
- mysql -e 'CREATE DATABASE bbgo;'
|
||||
- mysql -e 'CREATE DATABASE bbgo_dev;'
|
||||
|
||||
install:
|
||||
- go get github.com/c9s/rockhopper/cmd/rockhopper
|
||||
|
||||
before_script:
|
||||
- go mod download
|
||||
- make migrations
|
||||
|
||||
script:
|
||||
- bash scripts/test-sqlite3-migrations.sh
|
||||
- bash scripts/test-mysql-migrations.sh
|
||||
- go test -v ./pkg/...
|
|
@ -60,6 +60,7 @@ crossExchangeStrategies:
|
|||
# 18002.00
|
||||
pips: 10
|
||||
circuitBreaker:
|
||||
enabled: true
|
||||
maximumConsecutiveTotalLoss: 36.0
|
||||
maximumConsecutiveLossTimes: 10
|
||||
maximumLossPerRound: 15.0
|
||||
|
|
|
@ -48,7 +48,7 @@ var rootCmd = &cobra.Command{
|
|||
stream.SetPublicOnly()
|
||||
stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{})
|
||||
|
||||
streamBook := types.NewStreamBook(symbol)
|
||||
streamBook := types.NewStreamBook(symbol, exchange.Name())
|
||||
streamBook.BindStream(stream)
|
||||
|
||||
go func() {
|
||||
|
|
|
@ -464,7 +464,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ
|
|||
for _, sub := range session.Subscriptions {
|
||||
switch sub.Channel {
|
||||
case types.BookChannel:
|
||||
book := types.NewStreamBook(sub.Symbol)
|
||||
book := types.NewStreamBook(sub.Symbol, session.ExchangeName)
|
||||
book.BindStream(session.MarketDataStream)
|
||||
session.orderBooks[sub.Symbol] = book
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ var orderbookCmd = &cobra.Command{
|
|||
return fmt.Errorf("session %s not found", sessionName)
|
||||
}
|
||||
|
||||
orderBook := types.NewMutexOrderBook(symbol)
|
||||
orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name())
|
||||
|
||||
s := session.Exchange.NewStream()
|
||||
s.SetPublicOnly()
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package priceresolver
|
||||
package pricesolver
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
@ -9,8 +9,8 @@ import (
|
|||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
// SimplePriceResolver implements a map-structure-based price index
|
||||
type SimplePriceResolver struct {
|
||||
// SimplePriceSolver implements a map-structure-based price index
|
||||
type SimplePriceSolver struct {
|
||||
// symbolPrices stores the latest trade price by mapping symbol to price
|
||||
symbolPrices map[string]fixedpoint.Value
|
||||
markets types.MarketMap
|
||||
|
@ -28,8 +28,8 @@ type SimplePriceResolver struct {
|
|||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceResolver {
|
||||
return &SimplePriceResolver{
|
||||
func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceSolver {
|
||||
return &SimplePriceSolver{
|
||||
markets: markets,
|
||||
symbolPrices: make(map[string]fixedpoint.Value),
|
||||
pricesByBase: make(map[string]map[string]fixedpoint.Value),
|
||||
|
@ -37,7 +37,7 @@ func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceResolver {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *SimplePriceResolver) Update(symbol string, price fixedpoint.Value) {
|
||||
func (m *SimplePriceSolver) Update(symbol string, price fixedpoint.Value) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
|
@ -65,11 +65,11 @@ func (m *SimplePriceResolver) Update(symbol string, price fixedpoint.Value) {
|
|||
baseMap[market.BaseCurrency] = price
|
||||
}
|
||||
|
||||
func (m *SimplePriceResolver) UpdateFromTrade(trade types.Trade) {
|
||||
func (m *SimplePriceSolver) UpdateFromTrade(trade types.Trade) {
|
||||
m.Update(trade.Symbol, trade.Price)
|
||||
}
|
||||
|
||||
func (m *SimplePriceResolver) inferencePrice(asset string, assetPrice fixedpoint.Value, preferredFiats ...string) (fixedpoint.Value, bool) {
|
||||
func (m *SimplePriceSolver) inferencePrice(asset string, assetPrice fixedpoint.Value, preferredFiats ...string) (fixedpoint.Value, bool) {
|
||||
// log.Infof("inferencePrice %s = %f", asset, assetPrice.Float64())
|
||||
quotePrices, ok := m.pricesByBase[asset]
|
||||
if ok {
|
||||
|
@ -112,7 +112,7 @@ func (m *SimplePriceResolver) inferencePrice(asset string, assetPrice fixedpoint
|
|||
return fixedpoint.Zero, false
|
||||
}
|
||||
|
||||
func (m *SimplePriceResolver) ResolvePrice(asset string, preferredFiats ...string) (fixedpoint.Value, bool) {
|
||||
func (m *SimplePriceSolver) ResolvePrice(asset string, preferredFiats ...string) (fixedpoint.Value, bool) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
return m.inferencePrice(asset, fixedpoint.One, preferredFiats...)
|
|
@ -1,4 +1,4 @@
|
|||
package priceresolver
|
||||
package pricesolver
|
||||
|
||||
import (
|
||||
"testing"
|
|
@ -2,6 +2,7 @@ package audacitymaker
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/datatype/floats"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
|
@ -38,7 +39,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener
|
|||
position := orderExecutor.Position()
|
||||
symbol := position.Symbol
|
||||
// ger best bid/ask, not used yet
|
||||
s.StreamBook = types.NewStreamBook(symbol)
|
||||
s.StreamBook = types.NewStreamBook(symbol, session.ExchangeName)
|
||||
s.StreamBook.BindStream(session.MarketDataStream)
|
||||
|
||||
// use queue to do time-series rolling
|
||||
|
@ -59,7 +60,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener
|
|||
|
||||
session.MarketDataStream.OnMarketTrade(func(trade types.Trade) {
|
||||
|
||||
//log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64())
|
||||
// log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64())
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
|
@ -80,10 +81,10 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener
|
|||
sellTradesNumber.Update(1)
|
||||
}
|
||||
|
||||
//canceled := s.orderExecutor.GracefulCancel(ctx)
|
||||
//if canceled != nil {
|
||||
// canceled := s.orderExecutor.GracefulCancel(ctx)
|
||||
// if canceled != nil {
|
||||
// _ = s.orderExecutor.GracefulCancel(ctx)
|
||||
//}
|
||||
// }
|
||||
|
||||
sizeFraction := buyTradeSize.Sum() / sellTradeSize.Sum()
|
||||
numberFraction := buyTradesNumber.Sum() / sellTradesNumber.Sum()
|
||||
|
@ -112,15 +113,15 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener
|
|||
if outlier(orderFlowSizeMinMax.Tail(100), threshold) > 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) > 0 {
|
||||
_ = s.orderExecutor.GracefulCancel(ctx)
|
||||
log.Infof("long!!")
|
||||
//_ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol)
|
||||
// _ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol)
|
||||
_ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price, symbol)
|
||||
//_ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol)
|
||||
// _ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol)
|
||||
} else if outlier(orderFlowSizeMinMax.Tail(100), threshold) < 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) < 0 {
|
||||
_ = s.orderExecutor.GracefulCancel(ctx)
|
||||
log.Infof("short!!")
|
||||
//_ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol)
|
||||
// _ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol)
|
||||
_ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price, symbol)
|
||||
//_ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol)
|
||||
// _ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -138,7 +139,9 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener
|
|||
}
|
||||
}
|
||||
|
||||
func (s *PerTrade) placeOrder(ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string) error {
|
||||
func (s *PerTrade) placeOrder(
|
||||
ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string,
|
||||
) error {
|
||||
market, _ := s.session.Market(symbol)
|
||||
_, err := s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
|
||||
Symbol: symbol,
|
||||
|
|
|
@ -98,7 +98,7 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
|
|||
func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
|
||||
s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID())
|
||||
|
||||
s.book = types.NewStreamBook(s.Symbol)
|
||||
s.book = types.NewStreamBook(s.Symbol, session.Exchange.Name())
|
||||
s.book.BindStream(session.MarketDataStream)
|
||||
|
||||
s.liquidityOrderBook = bbgo.NewActiveOrderBook(s.Symbol)
|
||||
|
|
|
@ -513,7 +513,9 @@ func notifyUsdPnL(profit fixedpoint.Value) {
|
|||
bbgo.Notify(title)
|
||||
}
|
||||
|
||||
func (s *Strategy) iocOrderExecution(ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64) (types.OrderSlice, error) {
|
||||
func (s *Strategy) iocOrderExecution(
|
||||
ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64,
|
||||
) (types.OrderSlice, error) {
|
||||
service, ok := session.Exchange.(types.ExchangeOrderQueryService)
|
||||
if !ok {
|
||||
return nil, errors.New("exchange does not support ExchangeOrderQueryService")
|
||||
|
@ -700,7 +702,9 @@ func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, t
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Strategy) waitOrdersAndCollectTrades(ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice) (map[uint64][]types.Trade, types.OrderSlice, error) {
|
||||
func (s *Strategy) waitOrdersAndCollectTrades(
|
||||
ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice,
|
||||
) (map[uint64][]types.Trade, types.OrderSlice, error) {
|
||||
var err error
|
||||
var orderTrades = make(map[uint64][]types.Trade)
|
||||
var updatedOrders types.OrderSlice
|
||||
|
@ -763,7 +767,9 @@ func (s *Strategy) analyzeOrders(orders types.OrderSlice) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan) (map[string]*ArbMarket, error) {
|
||||
func (s *Strategy) buildArbMarkets(
|
||||
session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan,
|
||||
) (map[string]*ArbMarket, error) {
|
||||
markets := make(map[string]*ArbMarket)
|
||||
// build market object
|
||||
for _, symbol := range symbols {
|
||||
|
@ -790,7 +796,7 @@ func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []stri
|
|||
Speed: types.SpeedHigh,
|
||||
})
|
||||
|
||||
book := types.NewStreamBook(symbol)
|
||||
book := types.NewStreamBook(symbol, session.ExchangeName)
|
||||
priceUpdater := func(_ types.SliceOrderBook) {
|
||||
bestBid, bestAsk, _ := book.BestBidAndAsk()
|
||||
if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) {
|
||||
|
|
|
@ -14,7 +14,7 @@ import (
|
|||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/core"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/priceresolver"
|
||||
"github.com/c9s/bbgo/pkg/pricesolver"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -57,7 +57,7 @@ type Strategy struct {
|
|||
|
||||
faultBalanceRecords map[string][]TimeBalance
|
||||
|
||||
priceResolver *priceresolver.SimplePriceResolver
|
||||
priceResolver *pricesolver.SimplePriceSolver
|
||||
|
||||
sessions map[string]*bbgo.ExchangeSession
|
||||
orderBooks map[string]*bbgo.ActiveOrderBook
|
||||
|
@ -372,7 +372,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
|
|||
// session.Market(symbol)
|
||||
}
|
||||
|
||||
s.priceResolver = priceresolver.NewSimplePriceResolver(markets)
|
||||
s.priceResolver = pricesolver.NewSimplePriceResolver(markets)
|
||||
|
||||
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
|
||||
defer wg.Done()
|
||||
|
|
|
@ -393,7 +393,7 @@ func (s *Strategy) CrossRun(
|
|||
return err
|
||||
}
|
||||
|
||||
s.pricingBook = types.NewStreamBook(s.HedgeSymbol)
|
||||
s.pricingBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName)
|
||||
s.pricingBook.BindStream(s.hedgeSession.MarketDataStream)
|
||||
|
||||
s.stopC = make(chan struct{})
|
||||
|
|
|
@ -44,7 +44,7 @@ func TestStrategy_generateMakerOrders(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
pricingBook := types.NewStreamBook("BTCUSDT")
|
||||
pricingBook := types.NewStreamBook("BTCUSDT", types.ExchangeBinance)
|
||||
pricingBook.Load(types.SliceOrderBook{
|
||||
Symbol: "BTCUSDT",
|
||||
Bids: types.PriceVolumeSlice{
|
||||
|
|
|
@ -153,11 +153,11 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
|
|||
})
|
||||
|
||||
if s.SourceExchange != "" {
|
||||
s.sourceBook = types.NewStreamBook(s.Symbol)
|
||||
s.sourceBook = types.NewStreamBook(s.Symbol, sourceSession.ExchangeName)
|
||||
s.sourceBook.BindStream(s.sourceSession.MarketDataStream)
|
||||
}
|
||||
|
||||
s.tradingBook = types.NewStreamBook(s.Symbol)
|
||||
s.tradingBook = types.NewStreamBook(s.Symbol, tradingSession.ExchangeName)
|
||||
s.tradingBook.BindStream(s.tradingSession.MarketDataStream)
|
||||
|
||||
s.tradingSession.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
|
||||
|
|
43
pkg/strategy/xmaker/metrics.go
Normal file
43
pkg/strategy/xmaker/metrics.go
Normal file
|
@ -0,0 +1,43 @@
|
|||
package xmaker
|
||||
|
||||
import "github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
var openOrderBidExposureInUsdMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "xmaker_open_order_bid_exposure_in_usd",
|
||||
Help: "",
|
||||
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
|
||||
|
||||
var openOrderAskExposureInUsdMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "xmaker_open_order_ask_exposure_in_usd",
|
||||
Help: "",
|
||||
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
|
||||
|
||||
var makerBestBidPriceMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "xmaker_maker_best_bid_price",
|
||||
Help: "",
|
||||
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
|
||||
|
||||
var makerBestAskPriceMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "xmaker_maker_best_ask_price",
|
||||
Help: "",
|
||||
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
|
||||
|
||||
var numOfLayersMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "xmaker_num_of_layers",
|
||||
Help: "",
|
||||
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(
|
||||
openOrderBidExposureInUsdMetrics,
|
||||
openOrderAskExposureInUsdMetrics,
|
||||
makerBestBidPriceMetrics,
|
||||
makerBestAskPriceMetrics,
|
||||
numOfLayersMetrics,
|
||||
)
|
||||
}
|
|
@ -7,13 +7,15 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/sirupsen/logrus"
|
||||
"golang.org/x/time/rate"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/bbgo"
|
||||
"github.com/c9s/bbgo/pkg/core"
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
"github.com/c9s/bbgo/pkg/indicator"
|
||||
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
|
||||
"github.com/c9s/bbgo/pkg/pricesolver"
|
||||
"github.com/c9s/bbgo/pkg/risk/circuitbreaker"
|
||||
"github.com/c9s/bbgo/pkg/types"
|
||||
"github.com/c9s/bbgo/pkg/util"
|
||||
|
@ -95,10 +97,11 @@ type Strategy struct {
|
|||
makerMarket, sourceMarket types.Market
|
||||
|
||||
// boll is the BOLLINGER indicator we used for predicting the price.
|
||||
boll *indicator.BOLL
|
||||
boll *indicatorv2.BOLLStream
|
||||
|
||||
state *State
|
||||
|
||||
priceSolver *pricesolver.SimplePriceSolver
|
||||
CircuitBreaker *circuitbreaker.BasicCircuitBreaker `json:"circuitBreaker"`
|
||||
|
||||
// persistence fields
|
||||
|
@ -214,6 +217,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
// use mid-price for the last price
|
||||
s.lastPrice = bestBid.Price.Add(bestAsk.Price).Div(Two)
|
||||
|
||||
s.priceSolver.Update(s.Symbol, s.lastPrice)
|
||||
|
||||
bookLastUpdateTime := s.book.LastUpdateTime()
|
||||
|
||||
if _, err := s.bidPriceHeartBeat.Update(bestBid); err != nil {
|
||||
|
@ -380,6 +385,16 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
}
|
||||
}
|
||||
|
||||
labels := prometheus.Labels{
|
||||
"strategy_type": ID,
|
||||
"strategy_id": s.InstanceID(),
|
||||
"exchange": s.MakerExchange,
|
||||
"symbol": s.Symbol,
|
||||
}
|
||||
|
||||
bidExposureInUsd := fixedpoint.Zero
|
||||
askExposureInUsd := fixedpoint.Zero
|
||||
|
||||
bidPrice := bestBidPrice
|
||||
askPrice := bestAskPrice
|
||||
for i := 0; i < s.NumLayers; i++ {
|
||||
|
@ -413,6 +428,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
Mul(s.makerMarket.TickSize)))
|
||||
}
|
||||
|
||||
makerBestBidPriceMetrics.With(labels).Set(bidPrice.Float64())
|
||||
|
||||
if makerQuota.QuoteAsset.Lock(bidQuantity.Mul(bidPrice)) && hedgeQuota.BaseAsset.Lock(bidQuantity) {
|
||||
// if we bought, then we need to sell the base from the hedge session
|
||||
submitOrders = append(submitOrders, types.SubmitOrder{
|
||||
|
@ -427,6 +444,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
|
||||
makerQuota.Commit()
|
||||
hedgeQuota.Commit()
|
||||
bidExposureInUsd = bidExposureInUsd.Add(bidQuantity.Mul(bidPrice))
|
||||
} else {
|
||||
makerQuota.Rollback()
|
||||
hedgeQuota.Rollback()
|
||||
|
@ -466,7 +484,10 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
askPrice = askPrice.Add(pips.Mul(fixedpoint.NewFromInt(int64(i)).Mul(s.makerMarket.TickSize)))
|
||||
}
|
||||
|
||||
makerBestAskPriceMetrics.With(labels).Set(askPrice.Float64())
|
||||
|
||||
if makerQuota.BaseAsset.Lock(askQuantity) && hedgeQuota.QuoteAsset.Lock(askQuantity.Mul(askPrice)) {
|
||||
|
||||
// if we bought, then we need to sell the base from the hedge session
|
||||
submitOrders = append(submitOrders, types.SubmitOrder{
|
||||
Symbol: s.Symbol,
|
||||
|
@ -480,6 +501,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
})
|
||||
makerQuota.Commit()
|
||||
hedgeQuota.Commit()
|
||||
|
||||
askExposureInUsd = askExposureInUsd.Add(askQuantity.Mul(askPrice))
|
||||
} else {
|
||||
makerQuota.Rollback()
|
||||
hedgeQuota.Rollback()
|
||||
|
@ -496,14 +519,28 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
|||
return
|
||||
}
|
||||
|
||||
makerOrders, err := orderExecutionRouter.SubmitOrdersTo(ctx, s.MakerExchange, submitOrders...)
|
||||
formattedOrders, err := s.makerSession.FormatOrders(submitOrders)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("order error: %s", err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
s.activeMakerOrders.Add(makerOrders...)
|
||||
s.orderStore.Add(makerOrders...)
|
||||
orderCreateCallback := func(createdOrder types.Order) {
|
||||
s.orderStore.Add(createdOrder)
|
||||
s.activeMakerOrders.Add(createdOrder)
|
||||
}
|
||||
|
||||
defer s.tradeCollector.Process()
|
||||
|
||||
createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, s.makerSession.Exchange, orderCreateCallback, formattedOrders...)
|
||||
if err != nil {
|
||||
log.WithError(err).Errorf("unable to place maker orders: %+v", formattedOrders)
|
||||
}
|
||||
|
||||
openOrderBidExposureInUsdMetrics.With(labels).Set(bidExposureInUsd.Float64())
|
||||
openOrderAskExposureInUsdMetrics.With(labels).Set(askExposureInUsd.Float64())
|
||||
|
||||
_ = errIdx
|
||||
_ = createdOrders
|
||||
}
|
||||
|
||||
var lastPriceModifier = fixedpoint.NewFromFloat(1.001)
|
||||
|
@ -744,6 +781,10 @@ func (s *Strategy) CrossRun(
|
|||
|
||||
s.sourceSession = sourceSession
|
||||
|
||||
// initialize the price resolver
|
||||
sourceMarkets := s.sourceSession.Markets()
|
||||
s.priceSolver = pricesolver.NewSimplePriceResolver(sourceMarkets)
|
||||
|
||||
makerSession, ok := sessions[s.MakerExchange]
|
||||
if !ok {
|
||||
return fmt.Errorf("maker exchange session %s is not defined", s.MakerExchange)
|
||||
|
@ -761,24 +802,16 @@ func (s *Strategy) CrossRun(
|
|||
return fmt.Errorf("maker session market %s is not defined", s.Symbol)
|
||||
}
|
||||
|
||||
standardIndicatorSet := s.sourceSession.StandardIndicatorSet(s.Symbol)
|
||||
indicators := s.sourceSession.Indicators(s.Symbol)
|
||||
if !ok {
|
||||
return fmt.Errorf("%s standard indicator set not found", s.Symbol)
|
||||
}
|
||||
|
||||
s.boll = standardIndicatorSet.BOLL(types.IntervalWindow{
|
||||
s.boll = indicators.BOLL(types.IntervalWindow{
|
||||
Interval: s.BollBandInterval,
|
||||
Window: 21,
|
||||
}, 1.0)
|
||||
|
||||
if store, ok := s.sourceSession.MarketDataStore(s.Symbol); ok {
|
||||
if klines, ok2 := store.KLinesOfInterval(s.BollBandInterval); ok2 {
|
||||
for i := 0; i < len(*klines); i++ {
|
||||
s.boll.CalculateAndUpdate((*klines)[0 : i+1])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// restore state
|
||||
instanceID := s.InstanceID()
|
||||
s.groupID = util.FNV32(instanceID)
|
||||
|
@ -819,7 +852,7 @@ func (s *Strategy) CrossRun(
|
|||
})
|
||||
}
|
||||
|
||||
s.book = types.NewStreamBook(s.Symbol)
|
||||
s.book = types.NewStreamBook(s.Symbol, s.sourceSession.ExchangeName)
|
||||
s.book.BindStream(s.sourceSession.MarketDataStream)
|
||||
|
||||
s.activeMakerOrders = bbgo.NewActiveOrderBook(s.Symbol)
|
||||
|
@ -948,10 +981,7 @@ func (s *Strategy) CrossRun(
|
|||
// wait for the quoter to stop
|
||||
time.Sleep(s.UpdateInterval.Duration())
|
||||
|
||||
shutdownCtx, cancelShutdown := context.WithTimeout(context.TODO(), time.Minute)
|
||||
defer cancelShutdown()
|
||||
|
||||
if err := s.activeMakerOrders.GracefulCancel(shutdownCtx, s.makerSession.Exchange); err != nil {
|
||||
if err := s.activeMakerOrders.GracefulCancel(ctx, s.makerSession.Exchange); err != nil {
|
||||
log.WithError(err).Errorf("graceful cancel error")
|
||||
}
|
||||
|
||||
|
|
|
@ -396,7 +396,7 @@ func (e *StreamExecutor) Run(parentCtx context.Context) error {
|
|||
e.marketDataStream.SetPublicOnly()
|
||||
e.marketDataStream.Subscribe(types.BookChannel, e.Symbol, types.SubscribeOptions{})
|
||||
|
||||
e.orderBook = types.NewStreamBook(e.Symbol)
|
||||
e.orderBook = types.NewStreamBook(e.Symbol, e.Session.ExchangeName)
|
||||
e.orderBook.BindStream(e.marketDataStream)
|
||||
|
||||
e.userDataStream = e.Session.Exchange.NewStream()
|
||||
|
|
|
@ -88,7 +88,7 @@ func NewFixedQuantityExecutor(
|
|||
Depth: types.DepthLevelMedium,
|
||||
})
|
||||
|
||||
orderBook := types.NewStreamBook(symbol)
|
||||
orderBook := types.NewStreamBook(symbol, exchange.Name())
|
||||
orderBook.BindStream(marketDataStream)
|
||||
|
||||
userDataStream := exchange.NewStream()
|
||||
|
|
|
@ -143,6 +143,7 @@ func TestNewStreamExecutor(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
mockEx.EXPECT().Name().Return(exchangeName)
|
||||
mockEx.EXPECT().NewStream().Return(mockMarketDataStream)
|
||||
mockEx.EXPECT().NewStream().Return(mockUserDataStream)
|
||||
mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil)
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
|
||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||
)
|
||||
|
||||
|
@ -26,12 +28,13 @@ type OrderBook interface {
|
|||
type MutexOrderBook struct {
|
||||
sync.Mutex
|
||||
|
||||
Symbol string
|
||||
Symbol string
|
||||
Exchange ExchangeName
|
||||
|
||||
orderBook OrderBook
|
||||
}
|
||||
|
||||
func NewMutexOrderBook(symbol string) *MutexOrderBook {
|
||||
func NewMutexOrderBook(symbol string, exchangeName ExchangeName) *MutexOrderBook {
|
||||
var book OrderBook = NewSliceOrderBook(symbol)
|
||||
|
||||
if v, _ := strconv.ParseBool(os.Getenv("ENABLE_RBT_ORDERBOOK")); v {
|
||||
|
@ -40,6 +43,7 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook {
|
|||
|
||||
return &MutexOrderBook{
|
||||
Symbol: symbol,
|
||||
Exchange: exchangeName,
|
||||
orderBook: book,
|
||||
}
|
||||
}
|
||||
|
@ -134,6 +138,46 @@ type BookSignal struct {
|
|||
Time time.Time
|
||||
}
|
||||
|
||||
var streamOrderBookBestBidPriceMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "bbgo_stream_order_book_best_bid_price",
|
||||
Help: "",
|
||||
}, []string{"symbol", "exchange"})
|
||||
|
||||
var streamOrderBookBestAskPriceMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "bbgo_stream_order_book_best_ask_price",
|
||||
Help: "",
|
||||
}, []string{"symbol", "exchange"})
|
||||
|
||||
var streamOrderBookBestBidVolumeMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "bbgo_stream_order_book_best_bid_volume",
|
||||
Help: "",
|
||||
}, []string{"symbol", "exchange"})
|
||||
|
||||
var streamOrderBookBestAskVolumeMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "bbgo_stream_order_book_best_ask_volume",
|
||||
Help: "",
|
||||
}, []string{"symbol", "exchange"})
|
||||
|
||||
var streamOrderBookUpdateTimeMetrics = prometheus.NewGaugeVec(
|
||||
prometheus.GaugeOpts{
|
||||
Name: "bbgo_stream_order_book_update_time_milliseconds",
|
||||
Help: "",
|
||||
}, []string{"symbol", "exchange"})
|
||||
|
||||
func init() {
|
||||
prometheus.MustRegister(
|
||||
streamOrderBookBestBidPriceMetrics,
|
||||
streamOrderBookBestAskPriceMetrics,
|
||||
streamOrderBookBestBidVolumeMetrics,
|
||||
streamOrderBookBestAskVolumeMetrics,
|
||||
streamOrderBookUpdateTimeMetrics,
|
||||
)
|
||||
}
|
||||
|
||||
// StreamOrderBook receives streaming data from websocket connection and
|
||||
// update the order book with mutex lock, so you can safely access it.
|
||||
//
|
||||
|
@ -147,13 +191,25 @@ type StreamOrderBook struct {
|
|||
snapshotCallbacks []func(snapshot SliceOrderBook)
|
||||
}
|
||||
|
||||
func NewStreamBook(symbol string) *StreamOrderBook {
|
||||
func NewStreamBook(symbol string, exchangeName ExchangeName) *StreamOrderBook {
|
||||
return &StreamOrderBook{
|
||||
MutexOrderBook: NewMutexOrderBook(symbol),
|
||||
MutexOrderBook: NewMutexOrderBook(symbol, exchangeName),
|
||||
C: make(chan *BookSignal, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *StreamOrderBook) updateMetrics(t time.Time) {
|
||||
bestBid, bestAsk, ok := sb.BestBidAndAsk()
|
||||
if ok {
|
||||
exchangeName := string(sb.Exchange)
|
||||
streamOrderBookBestAskPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Price.Float64())
|
||||
streamOrderBookBestBidPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Price.Float64())
|
||||
streamOrderBookBestAskVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Volume.Float64())
|
||||
streamOrderBookBestBidVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Volume.Float64())
|
||||
streamOrderBookUpdateTimeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(float64(t.UnixMilli()))
|
||||
}
|
||||
}
|
||||
|
||||
func (sb *StreamOrderBook) BindStream(stream Stream) {
|
||||
stream.OnBookSnapshot(func(book SliceOrderBook) {
|
||||
if sb.MutexOrderBook.Symbol != book.Symbol {
|
||||
|
@ -163,6 +219,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) {
|
|||
sb.Load(book)
|
||||
sb.EmitSnapshot(book)
|
||||
sb.emitChange(BookSignalSnapshot, book.Time)
|
||||
sb.updateMetrics(book.Time)
|
||||
})
|
||||
|
||||
stream.OnBookUpdate(func(book SliceOrderBook) {
|
||||
|
@ -173,6 +230,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) {
|
|||
sb.Update(book)
|
||||
sb.EmitUpdate(book)
|
||||
sb.emitChange(BookSignalUpdate, book.Time)
|
||||
sb.updateMetrics(book.Time)
|
||||
})
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user