diff --git a/examples/exchange-api/binance-book/main.go b/examples/exchange-api/binance-book/main.go index 42d669a07..6577dad85 100644 --- a/examples/exchange-api/binance-book/main.go +++ b/examples/exchange-api/binance-book/main.go @@ -48,7 +48,7 @@ var rootCmd = &cobra.Command{ stream.SetPublicOnly() stream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{}) - streamBook := types.NewStreamBook(symbol) + streamBook := types.NewStreamBook(symbol, exchange.Name()) streamBook.BindStream(stream) go func() { diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index e8902a1d2..5e564de8b 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -464,7 +464,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ for _, sub := range session.Subscriptions { switch sub.Channel { case types.BookChannel: - book := types.NewStreamBook(sub.Symbol) + book := types.NewStreamBook(sub.Symbol, session.ExchangeName) book.BindStream(session.MarketDataStream) session.orderBooks[sub.Symbol] = book diff --git a/pkg/cmd/orderbook.go b/pkg/cmd/orderbook.go index 87b8b0087..4d6e80464 100644 --- a/pkg/cmd/orderbook.go +++ b/pkg/cmd/orderbook.go @@ -54,7 +54,7 @@ var orderbookCmd = &cobra.Command{ return fmt.Errorf("session %s not found", sessionName) } - orderBook := types.NewMutexOrderBook(symbol) + orderBook := types.NewMutexOrderBook(symbol, session.Exchange.Name()) s := session.Exchange.NewStream() s.SetPublicOnly() diff --git a/pkg/strategy/audacitymaker/orderflow.go b/pkg/strategy/audacitymaker/orderflow.go index 94de68880..6326c8360 100644 --- a/pkg/strategy/audacitymaker/orderflow.go +++ b/pkg/strategy/audacitymaker/orderflow.go @@ -2,6 +2,7 @@ package audacitymaker import ( "context" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/datatype/floats" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -38,7 +39,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener position := orderExecutor.Position() symbol := position.Symbol // ger best bid/ask, not used yet - s.StreamBook = types.NewStreamBook(symbol) + s.StreamBook = types.NewStreamBook(symbol, session.ExchangeName) s.StreamBook.BindStream(session.MarketDataStream) // use queue to do time-series rolling @@ -59,7 +60,7 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener session.MarketDataStream.OnMarketTrade(func(trade types.Trade) { - //log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64()) + // log.Infof("%s trade @ %f", trade.Side, trade.Price.Float64()) ctx := context.Background() @@ -80,10 +81,10 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener sellTradesNumber.Update(1) } - //canceled := s.orderExecutor.GracefulCancel(ctx) - //if canceled != nil { + // canceled := s.orderExecutor.GracefulCancel(ctx) + // if canceled != nil { // _ = s.orderExecutor.GracefulCancel(ctx) - //} + // } sizeFraction := buyTradeSize.Sum() / sellTradeSize.Sum() numberFraction := buyTradesNumber.Sum() / sellTradesNumber.Sum() @@ -112,15 +113,15 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener if outlier(orderFlowSizeMinMax.Tail(100), threshold) > 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) > 0 { _ = s.orderExecutor.GracefulCancel(ctx) log.Infof("long!!") - //_ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol) + // _ = s.placeTrade(ctx, types.SideTypeBuy, s.Quantity, symbol) _ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price, symbol) - //_ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol) + // _ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price.Mul(fixedpoint.NewFromFloat(1.0005)), symbol) } else if outlier(orderFlowSizeMinMax.Tail(100), threshold) < 0 && outlier(orderFlowNumberMinMax.Tail(100), threshold) < 0 { _ = s.orderExecutor.GracefulCancel(ctx) log.Infof("short!!") - //_ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol) + // _ = s.placeTrade(ctx, types.SideTypeSell, s.Quantity, symbol) _ = s.placeOrder(ctx, types.SideTypeSell, s.Quantity, ask.Price, symbol) - //_ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol) + // _ = s.placeOrder(ctx, types.SideTypeBuy, s.Quantity, bid.Price.Mul(fixedpoint.NewFromFloat(0.9995)), symbol) } } @@ -138,7 +139,9 @@ func (s *PerTrade) Bind(session *bbgo.ExchangeSession, orderExecutor *bbgo.Gener } } -func (s *PerTrade) placeOrder(ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string) error { +func (s *PerTrade) placeOrder( + ctx context.Context, side types.SideType, quantity fixedpoint.Value, price fixedpoint.Value, symbol string, +) error { market, _ := s.session.Market(symbol) _, err := s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Symbol: symbol, diff --git a/pkg/strategy/scmaker/strategy.go b/pkg/strategy/scmaker/strategy.go index 1039c3431..053bf20b1 100644 --- a/pkg/strategy/scmaker/strategy.go +++ b/pkg/strategy/scmaker/strategy.go @@ -98,7 +98,7 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { s.Strategy.Initialize(ctx, s.Environment, session, s.Market, ID, s.InstanceID()) - s.book = types.NewStreamBook(s.Symbol) + s.book = types.NewStreamBook(s.Symbol, session.Exchange.Name()) s.book.BindStream(session.MarketDataStream) s.liquidityOrderBook = bbgo.NewActiveOrderBook(s.Symbol) diff --git a/pkg/strategy/tri/strategy.go b/pkg/strategy/tri/strategy.go index c4d8b39e3..429d9e480 100644 --- a/pkg/strategy/tri/strategy.go +++ b/pkg/strategy/tri/strategy.go @@ -513,7 +513,9 @@ func notifyUsdPnL(profit fixedpoint.Value) { bbgo.Notify(title) } -func (s *Strategy) iocOrderExecution(ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64) (types.OrderSlice, error) { +func (s *Strategy) iocOrderExecution( + ctx context.Context, session *bbgo.ExchangeSession, orders [3]types.SubmitOrder, ratio float64, +) (types.OrderSlice, error) { service, ok := session.Exchange.(types.ExchangeOrderQueryService) if !ok { return nil, errors.New("exchange does not support ExchangeOrderQueryService") @@ -700,7 +702,9 @@ func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, t } } -func (s *Strategy) waitOrdersAndCollectTrades(ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice) (map[uint64][]types.Trade, types.OrderSlice, error) { +func (s *Strategy) waitOrdersAndCollectTrades( + ctx context.Context, service types.ExchangeOrderQueryService, createdOrders types.OrderSlice, +) (map[uint64][]types.Trade, types.OrderSlice, error) { var err error var orderTrades = make(map[uint64][]types.Trade) var updatedOrders types.OrderSlice @@ -763,7 +767,9 @@ func (s *Strategy) analyzeOrders(orders types.OrderSlice) { } } -func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan) (map[string]*ArbMarket, error) { +func (s *Strategy) buildArbMarkets( + session *bbgo.ExchangeSession, symbols []string, separateStream bool, sigC sigchan.Chan, +) (map[string]*ArbMarket, error) { markets := make(map[string]*ArbMarket) // build market object for _, symbol := range symbols { @@ -790,7 +796,7 @@ func (s *Strategy) buildArbMarkets(session *bbgo.ExchangeSession, symbols []stri Speed: types.SpeedHigh, }) - book := types.NewStreamBook(symbol) + book := types.NewStreamBook(symbol, session.ExchangeName) priceUpdater := func(_ types.SliceOrderBook) { bestBid, bestAsk, _ := book.BestBidAndAsk() if bestAsk.Equals(m.bestAsk) && bestBid.Equals(m.bestBid) { diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index 103c50bf4..8863a98d1 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -393,7 +393,7 @@ func (s *Strategy) CrossRun( return err } - s.pricingBook = types.NewStreamBook(s.HedgeSymbol) + s.pricingBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName) s.pricingBook.BindStream(s.hedgeSession.MarketDataStream) s.stopC = make(chan struct{}) diff --git a/pkg/strategy/xdepthmaker/strategy_test.go b/pkg/strategy/xdepthmaker/strategy_test.go index 2df6424c0..3ba221425 100644 --- a/pkg/strategy/xdepthmaker/strategy_test.go +++ b/pkg/strategy/xdepthmaker/strategy_test.go @@ -44,7 +44,7 @@ func TestStrategy_generateMakerOrders(t *testing.T) { }, } - pricingBook := types.NewStreamBook("BTCUSDT") + pricingBook := types.NewStreamBook("BTCUSDT", types.ExchangeBinance) pricingBook.Load(types.SliceOrderBook{ Symbol: "BTCUSDT", Bids: types.PriceVolumeSlice{ diff --git a/pkg/strategy/xgap/strategy.go b/pkg/strategy/xgap/strategy.go index 2470bf71c..ca812551f 100644 --- a/pkg/strategy/xgap/strategy.go +++ b/pkg/strategy/xgap/strategy.go @@ -153,11 +153,11 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se }) if s.SourceExchange != "" { - s.sourceBook = types.NewStreamBook(s.Symbol) + s.sourceBook = types.NewStreamBook(s.Symbol, sourceSession.ExchangeName) s.sourceBook.BindStream(s.sourceSession.MarketDataStream) } - s.tradingBook = types.NewStreamBook(s.Symbol) + s.tradingBook = types.NewStreamBook(s.Symbol, tradingSession.ExchangeName) s.tradingBook.BindStream(s.tradingSession.MarketDataStream) s.tradingSession.UserDataStream.OnTradeUpdate(func(trade types.Trade) { diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index e6862c6fa..ecd604a6e 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -819,7 +819,7 @@ func (s *Strategy) CrossRun( }) } - s.book = types.NewStreamBook(s.Symbol) + s.book = types.NewStreamBook(s.Symbol, s.sourceSession.ExchangeName) s.book.BindStream(s.sourceSession.MarketDataStream) s.activeMakerOrders = bbgo.NewActiveOrderBook(s.Symbol) diff --git a/pkg/twap/stream_executor.go b/pkg/twap/stream_executor.go index 839d42670..ff1fc8af1 100644 --- a/pkg/twap/stream_executor.go +++ b/pkg/twap/stream_executor.go @@ -396,7 +396,7 @@ func (e *StreamExecutor) Run(parentCtx context.Context) error { e.marketDataStream.SetPublicOnly() e.marketDataStream.Subscribe(types.BookChannel, e.Symbol, types.SubscribeOptions{}) - e.orderBook = types.NewStreamBook(e.Symbol) + e.orderBook = types.NewStreamBook(e.Symbol, e.Session.ExchangeName) e.orderBook.BindStream(e.marketDataStream) e.userDataStream = e.Session.Exchange.NewStream() diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 1cbea8806..e521ad2ad 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -88,7 +88,7 @@ func NewFixedQuantityExecutor( Depth: types.DepthLevelMedium, }) - orderBook := types.NewStreamBook(symbol) + orderBook := types.NewStreamBook(symbol, exchange.Name()) orderBook.BindStream(marketDataStream) userDataStream := exchange.NewStream() diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index f0aa79485..fe3f05cdb 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -143,6 +143,7 @@ func TestNewStreamExecutor(t *testing.T) { }, } + mockEx.EXPECT().Name().Return(exchangeName) mockEx.EXPECT().NewStream().Return(mockMarketDataStream) mockEx.EXPECT().NewStream().Return(mockUserDataStream) mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) diff --git a/pkg/types/orderbook.go b/pkg/types/orderbook.go index 4e2c8bb29..0653fab68 100644 --- a/pkg/types/orderbook.go +++ b/pkg/types/orderbook.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/c9s/bbgo/pkg/fixedpoint" ) @@ -26,12 +28,13 @@ type OrderBook interface { type MutexOrderBook struct { sync.Mutex - Symbol string + Symbol string + Exchange ExchangeName orderBook OrderBook } -func NewMutexOrderBook(symbol string) *MutexOrderBook { +func NewMutexOrderBook(symbol string, exchangeName ExchangeName) *MutexOrderBook { var book OrderBook = NewSliceOrderBook(symbol) if v, _ := strconv.ParseBool(os.Getenv("ENABLE_RBT_ORDERBOOK")); v { @@ -40,6 +43,7 @@ func NewMutexOrderBook(symbol string) *MutexOrderBook { return &MutexOrderBook{ Symbol: symbol, + Exchange: exchangeName, orderBook: book, } } @@ -134,6 +138,46 @@ type BookSignal struct { Time time.Time } +var streamOrderBookBestBidPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_bid_price", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestAskPriceMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_ask_price", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestBidVolumeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_bid_volume", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookBestAskVolumeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_best_ask_volume", + Help: "", + }, []string{"symbol", "exchange"}) + +var streamOrderBookUpdateTimeMetrics = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_stream_order_book_update_time_milliseconds", + Help: "", + }, []string{"symbol", "exchange"}) + +func init() { + prometheus.MustRegister( + streamOrderBookBestBidPriceMetrics, + streamOrderBookBestAskPriceMetrics, + streamOrderBookBestBidVolumeMetrics, + streamOrderBookBestAskVolumeMetrics, + streamOrderBookUpdateTimeMetrics, + ) +} + // StreamOrderBook receives streaming data from websocket connection and // update the order book with mutex lock, so you can safely access it. // @@ -147,13 +191,25 @@ type StreamOrderBook struct { snapshotCallbacks []func(snapshot SliceOrderBook) } -func NewStreamBook(symbol string) *StreamOrderBook { +func NewStreamBook(symbol string, exchangeName ExchangeName) *StreamOrderBook { return &StreamOrderBook{ - MutexOrderBook: NewMutexOrderBook(symbol), + MutexOrderBook: NewMutexOrderBook(symbol, exchangeName), C: make(chan *BookSignal, 1), } } +func (sb *StreamOrderBook) updateMetrics(t time.Time) { + bestBid, bestAsk, ok := sb.BestBidAndAsk() + if ok { + exchangeName := string(sb.Exchange) + streamOrderBookBestAskPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Price.Float64()) + streamOrderBookBestBidPriceMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Price.Float64()) + streamOrderBookBestAskVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestAsk.Volume.Float64()) + streamOrderBookBestBidVolumeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(bestBid.Volume.Float64()) + streamOrderBookUpdateTimeMetrics.WithLabelValues(sb.Symbol, exchangeName).Set(float64(t.UnixMilli())) + } +} + func (sb *StreamOrderBook) BindStream(stream Stream) { stream.OnBookSnapshot(func(book SliceOrderBook) { if sb.MutexOrderBook.Symbol != book.Symbol { @@ -163,6 +219,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) { sb.Load(book) sb.EmitSnapshot(book) sb.emitChange(BookSignalSnapshot, book.Time) + sb.updateMetrics(book.Time) }) stream.OnBookUpdate(func(book SliceOrderBook) { @@ -173,6 +230,7 @@ func (sb *StreamOrderBook) BindStream(stream Stream) { sb.Update(book) sb.EmitUpdate(book) sb.emitChange(BookSignalUpdate, book.Time) + sb.updateMetrics(book.Time) }) }