diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index 0653603a7..000000000 --- a/.travis.yml +++ /dev/null @@ -1,25 +0,0 @@ ---- -language: go -go: -- 1.14 -- 1.15 - -services: -- redis-server -- mysql - -before_install: -- mysql -e 'CREATE DATABASE bbgo;' -- mysql -e 'CREATE DATABASE bbgo_dev;' - -install: -- go get github.com/c9s/rockhopper/cmd/rockhopper - -before_script: -- go mod download -- make migrations - -script: -- bash scripts/test-sqlite3-migrations.sh -- bash scripts/test-mysql-migrations.sh -- go test -v ./pkg/... diff --git a/config/xmaker.yaml b/config/xmaker.yaml index dddf31166..b4682f35e 100644 --- a/config/xmaker.yaml +++ b/config/xmaker.yaml @@ -60,6 +60,7 @@ crossExchangeStrategies: # 18002.00 pips: 10 circuitBreaker: + enabled: true maximumConsecutiveTotalLoss: 36.0 maximumConsecutiveLossTimes: 10 maximumLossPerRound: 15.0 diff --git a/examples/exchange-api/binance-book/main.go b/examples/exchange-api/binance-book/main.go index 42d669a07..6577dad85 100644 --- a/examples/exchange-api/binance-book/main.go +++ b/examples/exchange-api/binance-book/main.go @@ -48,7 +48,7 @@ var rootCmd = &cobra.Command{ stream.SetPublicOnly() stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) - streamBook := types.NewStreamBook(symbol) + streamBook := types.NewStreamBook(symbol, exchange.Name()) streamBook.BindStream(stream) go func() { diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index e8902a1d2..5e564de8b 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -464,7 +464,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ for _, sub := range session.Subscriptions { switch sub.Channel { case types.BookChannel: - book := types.NewStreamBook(sub.Symbol) + book := types.NewStreamBook(sub.Symbol, session.ExchangeName) book.BindStream(session.MarketDataStream) session.orderBooks[sub.Symbol] = book diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 87b8b0087..4d6e80464 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -54,7 +54,7 @@ var orderbookCmd = &cobra.Command{ return fmt.Errorf("session %s not found", sessionName) } - orderBook := types.NewMutexOrderBook(symbol) + orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name()) s := session.Exchange.NewStream() s.SetPublicOnly() diff --git a/pkg/priceresolver/simple.go b/pkg/pricesolver/simple.go similarity index 84% rename from pkg/priceresolver/simple.go rename to pkg/pricesolver/simple.go index 65b6ec51f..6e16700d2 100644 --- a/pkg/priceresolver/simple.go +++ b/pkg/pricesolver/simple.go @@ -1,4 +1,4 @@ -package priceresolver +package pricesolver import ( "sync" @@ -9,8 +9,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -// SimplePriceResolver implements a map-structure-based price index -type SimplePriceResolver struct { +// SimplePriceSolver implements a map-structure-based price index +type SimplePriceSolver struct { // symbolPrices stores the latest trade price by mapping symbol to price symbolPrices map[string]fixedpoint.Value markets types.MarketMap @@ -28,8 +28,8 @@ type SimplePriceResolver struct { mu sync.Mutex } -func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceResolver { - return &SimplePriceResolver{ +func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceSolver { + return &SimplePriceSolver{ markets: markets, symbolPrices: make(map[string]fixedpoint.Value), pricesByBase: make(map[string]map[string]fixedpoint.Value), @@ -37,7 +37,7 @@ func NewSimplePriceResolver(markets types.MarketMap) *SimplePriceResolver { } } -func (m *SimplePriceResolver) Update(symbol string, price fixedpoint.Value) { +func (m *SimplePriceSolver) Update(symbol string, price fixedpoint.Value) { m.mu.Lock() defer m.mu.Unlock() @@ -65,11 +65,11 @@ func (m *SimplePriceResolver) Update(symbol string, price fixedpoint.Value) { baseMap[market.BaseCurrency] = price } -func (m *SimplePriceResolver) UpdateFromTrade(trade types.Trade) { +func (m *SimplePriceSolver) UpdateFromTrade(trade types.Trade) { m.Update(trade.Symbol, trade.Price) } -func (m *SimplePriceResolver) inferencePrice(asset string, assetPrice fixedpoint.Value, preferredFiats ...string) (fixedpoint.Value, bool) { +func (m *SimplePriceSolver) inferencePrice(asset string, assetPrice fixedpoint.Value, preferredFiats ...string) (fixedpoint.Value, bool) { // log.Infof("inferencePrice %s = %f", asset, assetPrice.Float64()) quotePrices, ok := m.pricesByBase[asset] if ok { @@ -112,7 +112,7 @@ func (m *SimplePriceResolver) inferencePrice(asset string, assetPrice fixedpoint return fixedpoint.Zero, false } -func (m *SimplePriceResolver) ResolvePrice(asset string, preferredFiats ...string) (fixedpoint.Value, bool) { +func (m *SimplePriceSolver) ResolvePrice(asset string, preferredFiats ...string) (fixedpoint.Value, bool) { m.mu.Lock() defer m.mu.Unlock() return m.inferencePrice(asset, fixedpoint.One, preferredFiats...) diff --git a/pkg/priceresolver/simple_test.go b/pkg/pricesolver/simple_test.go similarity index 99% rename from pkg/priceresolver/simple_test.go rename to pkg/pricesolver/simple_test.go index a5e9c68f2..1714b73ac 100644 --- a/pkg/priceresolver/simple_test.go +++ b/pkg/pricesolver/simple_test.go @@ -1,4 +1,4 @@ -package priceresolver +package pricesolver import ( "testing" diff --git a/pkg/strategy/audacitymaker/orderflow.go b/pkg/strategy/audacitymaker/orderflow.go index 94de68880..6326c8360 100644 --- a/pkg/strategy/audacitymaker/orderflow.go +++ b/pkg/strategy/audacitymaker/orderflow.go @@ -2,6 +2,7 @@ package audacitymaker import ( "context" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/datatype/floats" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -38,7 +39,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener position := orderExecutor.Position() symbol := position.Symbol // ger best bid/ask, not used yet - s.StreamBook = types.NewStreamBook(symbol) + s.StreamBook = types.NewStreamBook(symbol, session.ExchangeName) s.StreamBook.BindStream(session.MarketDataStream) // use queue to do time-series rolling @@ -59,7 +60,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener session.MarketDataStream.OnMarketTrade(func(trade types.Trade) { - //log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64()) + // log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64()) ctx := context.Background() @@ -80,10 +81,10 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener sellTradesNumber.Update(1) } - //canceled := s.orderExecutor.GracefulCancel(ctx) - //if canceled != nil { + // canceled := s.orderExecutor.GracefulCancel(ctx) + // if canceled != nil { // _ = s.orderExecutor.GracefulCancel(ctx) - //} + // } sizeFraction := buyTradeSize.Sum() / sellTradeSize.Sum() numberFraction := buyTradesNumber.Sum() / sellTradesNumber.Sum() @@ -112,15 +113,15 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener if outlier(orderFlowSizeMinMax.Tail(100), threshold) > 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) > 0 { _ = s.orderExecutor.GracefulCancel(ctx) log.Infof("long!!") - //_ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol) + // _ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol) _ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price, symbol) - //_ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol) + // _ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol) } else if outlier(orderFlowSizeMinMax.Tail(100), threshold) < 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) < 0 { _ = s.orderExecutor.GracefulCancel(ctx) log.Infof("short!!") - //_ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol) + // _ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol) _ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price, symbol) - //_ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol) + // _ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol) } } @@ -138,7 +139,9 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener } } -func (s *PerTrade) placeOrder(ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string) error { +func (s *PerTrade) placeOrder( + ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string, +) error { market, _ := s.session.Market(symbol) _, err := s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Symbol: symbol, diff --git a/pkg/strategy/scmaker/strategy.go b/pkg/strategy/scmaker/strategy.go index 1039c3431..053bf20b1 100644 --- a/pkg/strategy/scmaker/strategy.go +++ b/pkg/strategy/scmaker/strategy.go @@ -98,7 +98,7 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID()) - s.book = types.NewStreamBook(s.Symbol) + s.book = types.NewStreamBook(s.Symbol, session.Exchange.Name()) s.book.BindStream(session.MarketDataStream) s.liquidityOrderBook = bbgo.NewActiveOrderBook(s.Symbol) diff --git a/pkg/strategy/tri/strategy.go b/pkg/strategy/tri/strategy.go index c4d8b39e3..429d9e480 100644 --- a/pkg/strategy/tri/strategy.go +++ b/pkg/strategy/tri/strategy.go @@ -513,7 +513,9 @@ func notifyUsdPnL(profit fixedpoint.Value) { bbgo.Notify(title) } -func (s *Strategy) iocOrderExecution(ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64) (types.OrderSlice, error) { +func (s *Strategy) iocOrderExecution( + ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64, +) (types.OrderSlice, error) { service, ok := session.Exchange.(types.ExchangeOrderQueryService) if !ok { return nil, errors.New("exchange does not support ExchangeOrderQueryService") @@ -700,7 +702,9 @@ func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, t } } -func (s *Strategy) waitOrdersAndCollectTrades(ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice) (map[uint64][]types.Trade, types.OrderSlice, error) { +func (s *Strategy) waitOrdersAndCollectTrades( + ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice, +) (map[uint64][]types.Trade, types.OrderSlice, error) { var err error var orderTrades = make(map[uint64][]types.Trade) var updatedOrders types.OrderSlice @@ -763,7 +767,9 @@ func (s *Strategy) analyzeOrders(orders types.OrderSlice) { } } -func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan) (map[string]*ArbMarket, error) { +func (s *Strategy) buildArbMarkets( + session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan, +) (map[string]*ArbMarket, error) { markets := make(map[string]*ArbMarket) // build market object for _, symbol := range symbols { @@ -790,7 +796,7 @@ func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []stri Speed: types.SpeedHigh, }) - book := types.NewStreamBook(symbol) + book := types.NewStreamBook(symbol, session.ExchangeName) priceUpdater := func(_ types.SliceOrderBook) { bestBid, bestAsk, _ := book.BestBidAndAsk() if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) { diff --git a/pkg/strategy/xalign/strategy.go b/pkg/strategy/xalign/strategy.go index 40b0a6f01..2f5776622 100644 --- a/pkg/strategy/xalign/strategy.go +++ b/pkg/strategy/xalign/strategy.go @@ -14,7 +14,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/priceresolver" + "github.com/c9s/bbgo/pkg/pricesolver" "github.com/c9s/bbgo/pkg/types" ) @@ -57,7 +57,7 @@ type Strategy struct { faultBalanceRecords map[string][]TimeBalance - priceResolver *priceresolver.SimplePriceResolver + priceResolver *pricesolver.SimplePriceSolver sessions map[string]*bbgo.ExchangeSession orderBooks map[string]*bbgo.ActiveOrderBook @@ -372,7 +372,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se // session.Market(symbol) } - s.priceResolver = priceresolver.NewSimplePriceResolver(markets) + s.priceResolver = pricesolver.NewSimplePriceResolver(markets) bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 103c50bf4..8863a98d1 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -393,7 +393,7 @@ func (s *Strategy) CrossRun( return err } - s.pricingBook = types.NewStreamBook(s.HedgeSymbol) + s.pricingBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName) s.pricingBook.BindStream(s.hedgeSession.MarketDataStream) s.stopC = make(chan struct{}) diff --git a/pkg/strategy/xdepthmaker/strategy_test.go b/pkg/strategy/xdepthmaker/strategy_test.go index 2df6424c0..3ba221425 100644 --- a/pkg/strategy/xdepthmaker/strategy_test.go +++ b/pkg/strategy/xdepthmaker/strategy_test.go @@ -44,7 +44,7 @@ func TestStrategy_generateMakerOrders(t *testing.T) { }, } - pricingBook := types.NewStreamBook("BTCUSDT") + pricingBook := types.NewStreamBook("BTCUSDT", types.ExchangeBinance) pricingBook.Load(types.SliceOrderBook{ Symbol: "BTCUSDT", Bids: types.PriceVolumeSlice{ diff --git a/pkg/strategy/xgap/strategy.go b/pkg/strategy/xgap/strategy.go index 2470bf71c..ca812551f 100644 --- a/pkg/strategy/xgap/strategy.go +++ b/pkg/strategy/xgap/strategy.go @@ -153,11 +153,11 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se }) if s.SourceExchange != "" { - s.sourceBook = types.NewStreamBook(s.Symbol) + s.sourceBook = types.NewStreamBook(s.Symbol, sourceSession.ExchangeName) s.sourceBook.BindStream(s.sourceSession.MarketDataStream) } - s.tradingBook = types.NewStreamBook(s.Symbol) + s.tradingBook = types.NewStreamBook(s.Symbol, tradingSession.ExchangeName) s.tradingBook.BindStream(s.tradingSession.MarketDataStream) s.tradingSession.UserDataStream.OnTradeUpdate(func(trade types.Trade) { diff --git a/pkg/strategy/xmaker/metrics.go b/pkg/strategy/xmaker/metrics.go new file mode 100644 index 000000000..b61c6fc85 --- /dev/null +++ b/pkg/strategy/xmaker/metrics.go @@ -0,0 +1,43 @@ +package xmaker + +import "github.com/prometheus/client_golang/prometheus" + +var openOrderBidExposureInUsdMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmaker_open_order_bid_exposure_in_usd", + Help: "", + }, []string{"strategy_type", "strategy_id", "exchange", "symbol"}) + +var openOrderAskExposureInUsdMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmaker_open_order_ask_exposure_in_usd", + Help: "", + }, []string{"strategy_type", "strategy_id", "exchange", "symbol"}) + +var makerBestBidPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmaker_maker_best_bid_price", + Help: "", + }, []string{"strategy_type", "strategy_id", "exchange", "symbol"}) + +var makerBestAskPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmaker_maker_best_ask_price", + Help: "", + }, []string{"strategy_type", "strategy_id", "exchange", "symbol"}) + +var numOfLayersMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "xmaker_num_of_layers", + Help: "", + }, []string{"strategy_type", "strategy_id", "exchange", "symbol"}) + +func init() { + prometheus.MustRegister( + openOrderBidExposureInUsdMetrics, + openOrderAskExposureInUsdMetrics, + makerBestBidPriceMetrics, + makerBestAskPriceMetrics, + numOfLayersMetrics, + ) +} diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 071d59600..3de63260a 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -7,13 +7,15 @@ import ( "time" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/indicator" + indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2" + "github.com/c9s/bbgo/pkg/pricesolver" "github.com/c9s/bbgo/pkg/risk/circuitbreaker" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -95,10 +97,11 @@ type Strategy struct { makerMarket, sourceMarket types.Market // boll is the BOLLINGER indicator we used for predicting the price. - boll *indicator.BOLL + boll *indicatorv2.BOLLStream state *State + priceSolver *pricesolver.SimplePriceSolver CircuitBreaker *circuitbreaker.BasicCircuitBreaker `json:"circuitBreaker"` // persistence fields @@ -214,6 +217,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or // use mid-price for the last price s.lastPrice = bestBid.Price.Add(bestAsk.Price).Div(Two) + s.priceSolver.Update(s.Symbol, s.lastPrice) + bookLastUpdateTime := s.book.LastUpdateTime() if _, err := s.bidPriceHeartBeat.Update(bestBid); err != nil { @@ -380,6 +385,16 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or } } + labels := prometheus.Labels{ + "strategy_type": ID, + "strategy_id": s.InstanceID(), + "exchange": s.MakerExchange, + "symbol": s.Symbol, + } + + bidExposureInUsd := fixedpoint.Zero + askExposureInUsd := fixedpoint.Zero + bidPrice := bestBidPrice askPrice := bestAskPrice for i := 0; i < s.NumLayers; i++ { @@ -413,6 +428,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or Mul(s.makerMarket.TickSize))) } + makerBestBidPriceMetrics.With(labels).Set(bidPrice.Float64()) + if makerQuota.QuoteAsset.Lock(bidQuantity.Mul(bidPrice)) && hedgeQuota.BaseAsset.Lock(bidQuantity) { // if we bought, then we need to sell the base from the hedge session submitOrders = append(submitOrders, types.SubmitOrder{ @@ -427,6 +444,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or makerQuota.Commit() hedgeQuota.Commit() + bidExposureInUsd = bidExposureInUsd.Add(bidQuantity.Mul(bidPrice)) } else { makerQuota.Rollback() hedgeQuota.Rollback() @@ -466,7 +484,10 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or askPrice = askPrice.Add(pips.Mul(fixedpoint.NewFromInt(int64(i)).Mul(s.makerMarket.TickSize))) } + makerBestAskPriceMetrics.With(labels).Set(askPrice.Float64()) + if makerQuota.BaseAsset.Lock(askQuantity) && hedgeQuota.QuoteAsset.Lock(askQuantity.Mul(askPrice)) { + // if we bought, then we need to sell the base from the hedge session submitOrders = append(submitOrders, types.SubmitOrder{ Symbol: s.Symbol, @@ -480,6 +501,8 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or }) makerQuota.Commit() hedgeQuota.Commit() + + askExposureInUsd = askExposureInUsd.Add(askQuantity.Mul(askPrice)) } else { makerQuota.Rollback() hedgeQuota.Rollback() @@ -496,14 +519,28 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or return } - makerOrders, err := orderExecutionRouter.SubmitOrdersTo(ctx, s.MakerExchange, submitOrders...) + formattedOrders, err := s.makerSession.FormatOrders(submitOrders) if err != nil { - log.WithError(err).Errorf("order error: %s", err.Error()) return } - s.activeMakerOrders.Add(makerOrders...) - s.orderStore.Add(makerOrders...) + orderCreateCallback := func(createdOrder types.Order) { + s.orderStore.Add(createdOrder) + s.activeMakerOrders.Add(createdOrder) + } + + defer s.tradeCollector.Process() + + createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, s.makerSession.Exchange, orderCreateCallback, formattedOrders...) + if err != nil { + log.WithError(err).Errorf("unable to place maker orders: %+v", formattedOrders) + } + + openOrderBidExposureInUsdMetrics.With(labels).Set(bidExposureInUsd.Float64()) + openOrderAskExposureInUsdMetrics.With(labels).Set(askExposureInUsd.Float64()) + + _ = errIdx + _ = createdOrders } var lastPriceModifier = fixedpoint.NewFromFloat(1.001) @@ -744,6 +781,10 @@ func (s *Strategy) CrossRun( s.sourceSession = sourceSession + // initialize the price resolver + sourceMarkets := s.sourceSession.Markets() + s.priceSolver = pricesolver.NewSimplePriceResolver(sourceMarkets) + makerSession, ok := sessions[s.MakerExchange] if !ok { return fmt.Errorf("maker exchange session %s is not defined", s.MakerExchange) @@ -761,24 +802,16 @@ func (s *Strategy) CrossRun( return fmt.Errorf("maker session market %s is not defined", s.Symbol) } - standardIndicatorSet := s.sourceSession.StandardIndicatorSet(s.Symbol) + indicators := s.sourceSession.Indicators(s.Symbol) if !ok { return fmt.Errorf("%s standard indicator set not found", s.Symbol) } - s.boll = standardIndicatorSet.BOLL(types.IntervalWindow{ + s.boll = indicators.BOLL(types.IntervalWindow{ Interval: s.BollBandInterval, Window: 21, }, 1.0) - if store, ok := s.sourceSession.MarketDataStore(s.Symbol); ok { - if klines, ok2 := store.KLinesOfInterval(s.BollBandInterval); ok2 { - for i := 0; i < len(*klines); i++ { - s.boll.CalculateAndUpdate((*klines)[0 : i+1]) - } - } - } - // restore state instanceID := s.InstanceID() s.groupID = util.FNV32(instanceID) @@ -819,7 +852,7 @@ func (s *Strategy) CrossRun( }) } - s.book = types.NewStreamBook(s.Symbol) + s.book = types.NewStreamBook(s.Symbol, s.sourceSession.ExchangeName) s.book.BindStream(s.sourceSession.MarketDataStream) s.activeMakerOrders = bbgo.NewActiveOrderBook(s.Symbol) @@ -948,10 +981,7 @@ func (s *Strategy) CrossRun( // wait for the quoter to stop time.Sleep(s.UpdateInterval.Duration()) - shutdownCtx, cancelShutdown := context.WithTimeout(context.TODO(), time.Minute) - defer cancelShutdown() - - if err := s.activeMakerOrders.GracefulCancel(shutdownCtx, s.makerSession.Exchange); err != nil { + if err := s.activeMakerOrders.GracefulCancel(ctx, s.makerSession.Exchange); err != nil { log.WithError(err).Errorf("graceful cancel error") } diff --git a/pkg/twap/stream_executor.go b/pkg/twap/stream_executor.go index 839d42670..ff1fc8af1 100644 --- a/pkg/twap/stream_executor.go +++ b/pkg/twap/stream_executor.go @@ -396,7 +396,7 @@ func (e *StreamExecutor) Run(parentCtx context.Context) error { e.marketDataStream.SetPublicOnly() e.marketDataStream.Subscribe(types.BookChannel, e.Symbol, types.SubscribeOptions{}) - e.orderBook = types.NewStreamBook(e.Symbol) + e.orderBook = types.NewStreamBook(e.Symbol, e.Session.ExchangeName) e.orderBook.BindStream(e.marketDataStream) e.userDataStream = e.Session.Exchange.NewStream() diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 1cbea8806..e521ad2ad 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -88,7 +88,7 @@ func NewFixedQuantityExecutor( Depth: types.DepthLevelMedium, }) - orderBook := types.NewStreamBook(symbol) + orderBook := types.NewStreamBook(symbol, exchange.Name()) orderBook.BindStream(marketDataStream) userDataStream := exchange.NewStream() diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index f0aa79485..fe3f05cdb 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -143,6 +143,7 @@ func TestNewStreamExecutor(t *testing.T) { }, } + mockEx.EXPECT().Name().Return(exchangeName) mockEx.EXPECT().NewStream().Return(mockMarketDataStream) mockEx.EXPECT().NewStream().Return(mockUserDataStream) mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 4e2c8bb29..0653fab68 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/c9s/bbgo/pkg/fixedpoint" ) @@ -26,12 +28,13 @@ type OrderBook interface { type MutexOrderBook struct { sync.Mutex - Symbol string + Symbol string + Exchange ExchangeName orderBook OrderBook } -func NewMutexOrderBook(symbol string) *MutexOrderBook { +func NewMutexOrderBook(symbol string, exchangeName ExchangeName) *MutexOrderBook { var book OrderBook = NewSliceOrderBook(symbol) if v, _ := strconv.ParseBool(os.Getenv("ENABLE_RBT_ORDERBOOK")); v { @@ -40,6 +43,7 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook { return &MutexOrderBook{ Symbol: symbol, + Exchange: exchangeName, orderBook: book, } } @@ -134,6 +138,46 @@ type BookSignal struct { Time time.Time } +var streamOrderBookBestBidPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_bid_price", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestAskPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_ask_price", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestBidVolumeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_bid_volume", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestAskVolumeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_ask_volume", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookUpdateTimeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_update_time_milliseconds", + Help: "", + }, []string{"symbol", "exchange"}) + +func init() { + prometheus.MustRegister( + streamOrderBookBestBidPriceMetrics, + streamOrderBookBestAskPriceMetrics, + streamOrderBookBestBidVolumeMetrics, + streamOrderBookBestAskVolumeMetrics, + streamOrderBookUpdateTimeMetrics, + ) +} + // StreamOrderBook receives streaming data from websocket connection and // update the order book with mutex lock, so you can safely access it. // @@ -147,13 +191,25 @@ type StreamOrderBook struct { snapshotCallbacks []func(snapshot SliceOrderBook) } -func NewStreamBook(symbol string) *StreamOrderBook { +func NewStreamBook(symbol string, exchangeName ExchangeName) *StreamOrderBook { return &StreamOrderBook{ - MutexOrderBook: NewMutexOrderBook(symbol), + MutexOrderBook: NewMutexOrderBook(symbol, exchangeName), C: make(chan *BookSignal, 1), } } +func (sb *StreamOrderBook) updateMetrics(t time.Time) { + bestBid, bestAsk, ok := sb.BestBidAndAsk() + if ok { + exchangeName := string(sb.Exchange) + streamOrderBookBestAskPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Price.Float64()) + streamOrderBookBestBidPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Price.Float64()) + streamOrderBookBestAskVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Volume.Float64()) + streamOrderBookBestBidVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Volume.Float64()) + streamOrderBookUpdateTimeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(float64(t.UnixMilli())) + } +} + func (sb *StreamOrderBook) BindStream(stream Stream) { stream.OnBookSnapshot(func(book SliceOrderBook) { if sb.MutexOrderBook.Symbol != book.Symbol { @@ -163,6 +219,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) { sb.Load(book) sb.EmitSnapshot(book) sb.emitChange(BookSignalSnapshot, book.Time) + sb.updateMetrics(book.Time) }) stream.OnBookUpdate(func(book SliceOrderBook) { @@ -173,6 +230,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) { sb.Update(book) sb.EmitUpdate(book) sb.emitChange(BookSignalUpdate, book.Time) + sb.updateMetrics(book.Time) }) }