diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 8de07c9e3..2046b2ffa 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -27,7 +27,9 @@ var quantityReduceDelta = fixedpoint.NewFromFloat(0.005) // This is for the maximum retries const submitOrderRetryLimit = 5 +// BaseOrderExecutor provides the common accessors for order executor type BaseOrderExecutor struct { + exchange types.Exchange session *ExchangeSession activeMakerOrders *ActiveOrderBook orderStore *core.OrderStore @@ -43,8 +45,8 @@ func (e *BaseOrderExecutor) ActiveMakerOrders() *ActiveOrderBook { // GracefulCancel cancels all active maker orders if orders are not given, otherwise cancel all the given orders func (e *BaseOrderExecutor) GracefulCancel(ctx context.Context, orders ...types.Order) error { - if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange, orders...); err != nil { - return errors.Wrap(err, "graceful cancel error") + if err := e.activeMakerOrders.GracefulCancel(ctx, e.exchange, orders...); err != nil { + return errors.Wrap(err, "graceful cancel order error") } return nil @@ -84,6 +86,7 @@ func NewGeneralOrderExecutor( executor := &GeneralOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ session: session, + exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(symbol), orderStore: orderStore, }, @@ -111,7 +114,7 @@ func (e *GeneralOrderExecutor) SetMaxRetries(maxRetries uint) { } func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) { - marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService) + marginService, ok := e.exchange.(types.MarginBorrowRepayService) if !ok { log.Warnf("session %s (%T) exchange does not support MarginBorrowRepayService", e.session.Name, e.session.Exchange) return diff --git a/pkg/bbgo/order_executor_simple.go b/pkg/bbgo/order_executor_simple.go index e7c5308d0..123cd03f1 100644 --- a/pkg/bbgo/order_executor_simple.go +++ b/pkg/bbgo/order_executor_simple.go @@ -22,6 +22,7 @@ func NewSimpleOrderExecutor(session *ExchangeSession) *SimpleOrderExecutor { return &SimpleOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ session: session, + exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(""), orderStore: core.NewOrderStore(""), }, diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 475237a20..6d7ae236a 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/cobra" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/twap" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/types" @@ -255,7 +256,7 @@ var executeOrderCmd = &cobra.Command{ executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution := &bbgo.TwapExecution{ + execution := &twap.StreamExecutor{ Session: session, Symbol: symbol, Side: side, diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 6bbe1cdac..90decc6bf 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -1006,13 +1006,15 @@ func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrde } // could be IOC or FOK - if len(order.TimeInForce) > 0 { - // TODO: check the TimeInForce value - req.TimeInForce(binance.TimeInForceType(order.TimeInForce)) - } else { - switch order.Type { - case types.OrderTypeLimit, types.OrderTypeStopLimit: - req.TimeInForce(binance.TimeInForceTypeGTC) + switch order.Type { + case types.OrderTypeLimit, types.OrderTypeStopLimit: + req.TimeInForce(binance.TimeInForceTypeGTC) + case types.OrderTypeLimitMaker: + // do not set TimeInForce for LimitMaker + default: + if len(order.TimeInForce) > 0 { + // TODO: check the TimeInForce value + req.TimeInForce(binance.TimeInForceType(order.TimeInForce)) } } diff --git a/pkg/risk/riskcontrol/position_test.go b/pkg/risk/riskcontrol/position_test.go index 554bac94e..7893b3445 100644 --- a/pkg/risk/riskcontrol/position_test.go +++ b/pkg/risk/riskcontrol/position_test.go @@ -23,7 +23,7 @@ func Test_ModifiedQuantity(t *testing.T) { BaseCurrency: "BTC", }, } - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, "BTCUSDT", "strategy", "strategy-1", pos) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, "BTCUSDT", "strategy", "strategy-1", pos) riskControl := NewPositionRiskControl(orderExecutor, fixedpoint.NewFromInt(10), fixedpoint.NewFromInt(2)) cases := []struct { diff --git a/pkg/strategy/autobuy/strategy.go b/pkg/strategy/autobuy/strategy.go index 13a143161..aee4f0f3b 100644 --- a/pkg/strategy/autobuy/strategy.go +++ b/pkg/strategy/autobuy/strategy.go @@ -5,13 +5,14 @@ import ( "fmt" "sync" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2" "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" - "github.com/robfig/cron/v3" - "github.com/sirupsen/logrus" ) const ID = "autobuy" @@ -128,7 +129,7 @@ func (s *Strategy) autobuy(ctx context.Context) { } side := types.SideTypeBuy - price := s.PriceType.Map(ticker, side) + price := s.PriceType.GetPrice(ticker, side) if price.Float64() > s.boll.UpBand.Last(0) { log.Infof("price %s is higher than upper band %f, skip", price.String(), s.boll.UpBand.Last(0)) diff --git a/pkg/strategy/dca2/recover_test.go b/pkg/strategy/dca2/recover_test.go index 472ecee77..c26205c2d 100644 --- a/pkg/strategy/dca2/recover_test.go +++ b/pkg/strategy/dca2/recover_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" ) func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt time.Time) types.Order { @@ -29,7 +30,7 @@ func Test_RecoverState(t *testing.T) { t.Run("new strategy", func(t *testing.T) { currentRound := Round{} position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) @@ -47,7 +48,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionReady, state) @@ -65,7 +66,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrderFilled, state) @@ -83,7 +84,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) @@ -101,7 +102,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) @@ -122,7 +123,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, TakeProfitReady, state) @@ -143,7 +144,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) diff --git a/pkg/strategy/rebalance/strategy.go b/pkg/strategy/rebalance/strategy.go index eab1de87c..6b4009063 100644 --- a/pkg/strategy/rebalance/strategy.go +++ b/pkg/strategy/rebalance/strategy.go @@ -262,7 +262,7 @@ func (s *Strategy) generateOrder(ctx context.Context) (*types.SubmitOrder, error } quantity = market.RoundDownQuantityByPrecision(quantity) - price := s.PriceType.Map(ticker, side) + price := s.PriceType.GetPrice(ticker, side) if s.MaxAmount.Float64() > 0 { quantity = bbgo.AdjustQuantityByMaxAmount(quantity, price, s.MaxAmount) diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/twap/stream_executor.go similarity index 77% rename from pkg/bbgo/twap_order_executor.go rename to pkg/twap/stream_executor.go index 05ad9e5e4..839d42670 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/twap/stream_executor.go @@ -1,4 +1,4 @@ -package bbgo +package twap import ( "context" @@ -7,16 +7,18 @@ import ( "time" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) -type TwapExecution struct { - Session *ExchangeSession +// StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream. +type StreamExecutor struct { + Session *bbgo.ExchangeSession Symbol string Side types.SideType TargetQuantity fixedpoint.Value @@ -37,7 +39,7 @@ type TwapExecution struct { currentPrice fixedpoint.Value activePosition fixedpoint.Value - activeMakerOrders *ActiveOrderBook + activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position @@ -46,26 +48,24 @@ type TwapExecution struct { stoppedC chan struct{} - state int - mu sync.Mutex } -func (e *TwapExecution) connectMarketData(ctx context.Context) { - log.Infof("connecting market data stream...") +func (e *StreamExecutor) connectMarketData(ctx context.Context) { + logrus.Infof("connecting market data stream...") if err := e.marketDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("market data stream connect error") + logrus.WithError(err).Errorf("market data stream connect error") } } -func (e *TwapExecution) connectUserData(ctx context.Context) { - log.Infof("connecting user data stream...") +func (e *StreamExecutor) connectUserData(ctx context.Context) { + logrus.Infof("connecting user data stream...") if err := e.userDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("user data stream connect error") + logrus.WithError(err).Errorf("user data stream connect error") } } -func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { +func (e *StreamExecutor) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -111,7 +111,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er switch e.Side { case types.SideTypeSell: if newPrice.Compare(e.StopPrice) < 0 { - log.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -121,7 +121,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: if newPrice.Compare(e.StopPrice) > 0 { - log.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -157,7 +157,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er } minNotional := e.market.MinNotional - orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) switch e.Side { case types.SideTypeSell: @@ -169,11 +169,11 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: // check base balance for sell, try to sell as more as possible if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok { - orderQuantity = AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) } } - if e.DeadlineTime != emptyTime { + if !e.DeadlineTime.IsZero() { now := time.Now() if now.After(e.DeadlineTime) { orderForm = types.SubmitOrder{ @@ -200,7 +200,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er return orderForm, err } -func (e *TwapExecution) updateOrder(ctx context.Context) error { +func (e *StreamExecutor) updateOrder(ctx context.Context) error { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -224,7 +224,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { - log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) + logrus.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) } // get the first order @@ -234,7 +234,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { - log.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) return nil } @@ -243,24 +243,24 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // DO NOT UPDATE IF: // tickSpread > 0 AND current order price == second price + tickSpread // current order price == first price - log.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) switch e.Side { case types.SideTypeBuy: if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best bid price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) return nil } case types.SideTypeSell: if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } } @@ -283,13 +283,13 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { return nil } -func (e *TwapExecution) cancelActiveOrders() { +func (e *StreamExecutor) cancelActiveOrders() { gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) defer gracefulCancel() e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) } -func (e *TwapExecution) orderUpdater(ctx context.Context) { +func (e *StreamExecutor) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() @@ -317,9 +317,9 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { return } - log.Infof("%s order book changed, checking order...", e.Symbol) + logrus.Infof("%s order book changed, checking order...", e.Symbol) if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } case <-ticker.C: @@ -332,25 +332,26 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } } } } -func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { +func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool { base := e.position.GetBase() if base.Abs().Compare(e.TargetQuantity) >= 0 { - log.Infof("filled target quantity, canceling the order execution context") + logrus.Infof("filled target quantity, canceling the order execution context") e.cancelExecution() return true } + return false } -func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { +func (e *StreamExecutor) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -360,21 +361,21 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { return } - log.Info(trade.String()) + logrus.Info(trade.String()) e.position.AddTrade(trade) - log.Infof("position updated: %+v", e.position) + logrus.Infof("position updated: %+v", e.position) } -func (e *TwapExecution) handleFilledOrder(order types.Order) { - log.Info(order.String()) +func (e *StreamExecutor) handleFilledOrder(order types.Order) { + logrus.Info(order.String()) // filled event triggers the order removal from the active order store // we need to ensure we received every order update event before the execution is done. e.cancelContextIfTargetQuantityFilled() } -func (e *TwapExecution) Run(parentCtx context.Context) error { +func (e *StreamExecutor) Run(parentCtx context.Context) error { e.mu.Lock() e.stoppedC = make(chan struct{}) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) @@ -397,10 +398,10 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook.BindStream(e.marketDataStream) - go e.connectMarketData(e.executionCtx) e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) + e.position = &types.Position{ Symbol: e.Symbol, BaseCurrency: e.market.BaseCurrency, @@ -409,16 +410,17 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { e.orderStore = core.NewOrderStore(e.Symbol) e.orderStore.BindStream(e.userDataStream) - e.activeMakerOrders = NewActiveOrderBook(e.Symbol) + e.activeMakerOrders = bbgo.NewActiveOrderBook(e.Symbol) e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) + go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx) go e.orderUpdater(e.executionCtx) return nil } -func (e *TwapExecution) emitDone() { +func (e *StreamExecutor) emitDone() { e.mu.Lock() if e.stoppedC == nil { e.stoppedC = make(chan struct{}) @@ -427,7 +429,7 @@ func (e *TwapExecution) emitDone() { e.mu.Unlock() } -func (e *TwapExecution) Done() (c <-chan struct{}) { +func (e *StreamExecutor) Done() (c <-chan struct{}) { e.mu.Lock() // if the channel is not allocated, it means it's not started yet, we need to return a closed channel if e.stoppedC == nil { @@ -447,7 +449,7 @@ func (e *TwapExecution) Done() (c <-chan struct{}) { // We need to: // 1. stop the order updater (by using the execution context) // 2. the order updater cancels all open orders and close the user data stream -func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { +func (e *StreamExecutor) Shutdown(shutdownCtx context.Context) { e.mu.Lock() if e.cancelExecution != nil { e.cancelExecution() diff --git a/pkg/twap/v2/bbomonitor.go b/pkg/twap/v2/bbomonitor.go new file mode 100644 index 000000000..5081fc9bb --- /dev/null +++ b/pkg/twap/v2/bbomonitor.go @@ -0,0 +1,53 @@ +package twap + +import ( + "time" + + "github.com/c9s/bbgo/pkg/types" +) + +// BboMonitor monitors the best bid and ask price and volume. +// +//go:generate callbackgen -type BboMonitor +type BboMonitor struct { + Bid types.PriceVolume + Ask types.PriceVolume + UpdatedTime time.Time + + updateCallbacks []func(bid, ask types.PriceVolume) +} + +func NewBboMonitor() *BboMonitor { + return &BboMonitor{} +} + +func (m *BboMonitor) OnUpdateFromBook(book *types.StreamOrderBook) bool { + bestBid, ok1 := book.BestBid() + bestAsk, ok2 := book.BestAsk() + if !ok1 || !ok2 { + return false + } + + return m.Update(bestBid, bestAsk, book.LastUpdateTime()) +} + +func (m *BboMonitor) Update(bid, ask types.PriceVolume, t time.Time) bool { + changed := false + if m.Bid.Price.Compare(bid.Price) != 0 || m.Bid.Volume.Compare(bid.Volume) != 0 { + changed = true + } + + if m.Ask.Price.Compare(ask.Price) != 0 || m.Ask.Volume.Compare(ask.Volume) != 0 { + changed = true + } + + m.Bid = bid + m.Ask = ask + m.UpdatedTime = t + + if changed { + m.EmitUpdate(bid, ask) + } + + return changed +} diff --git a/pkg/twap/v2/bbomonitor_callbacks.go b/pkg/twap/v2/bbomonitor_callbacks.go new file mode 100644 index 000000000..4e413eaab --- /dev/null +++ b/pkg/twap/v2/bbomonitor_callbacks.go @@ -0,0 +1,17 @@ +// Code generated by "callbackgen -type BboMonitor"; DO NOT EDIT. + +package twap + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (m *BboMonitor) OnUpdate(cb func(bid types.PriceVolume, ask types.PriceVolume)) { + m.updateCallbacks = append(m.updateCallbacks, cb) +} + +func (m *BboMonitor) EmitUpdate(bid types.PriceVolume, ask types.PriceVolume) { + for _, cb := range m.updateCallbacks { + cb(bid, ask) + } +} diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go new file mode 100644 index 000000000..caa43686d --- /dev/null +++ b/pkg/twap/v2/stream_executor.go @@ -0,0 +1,634 @@ +package twap + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/core" + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +var defaultUpdateInterval = time.Minute + +type DoneSignal struct { + doneC chan struct{} + mu sync.Mutex +} + +func NewDoneSignal() *DoneSignal { + return &DoneSignal{ + doneC: make(chan struct{}), + } +} + +func (e *DoneSignal) Emit() { + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + } + + close(e.doneC) + e.mu.Unlock() +} + +// Chan returns a channel that emits a signal when the execution is done. +func (e *DoneSignal) Chan() (c <-chan struct{}) { + // if the channel is not allocated, it means it's not started yet, we need to return a closed channel + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + c = e.doneC + } else { + c = e.doneC + } + e.mu.Unlock() + + return c +} + +// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API. +// It uses a fixed target quantity to place orders. +type FixedQuantityExecutor struct { + exchange types.Exchange + + // configuration fields + + symbol string + side types.SideType + targetQuantity, sliceQuantity fixedpoint.Value + + // updateInterval is a fixed update interval for placing new order + updateInterval time.Duration + + // delayInterval is the delay interval between each order placement + delayInterval time.Duration + + // numOfTicks is the number of price ticks behind the best bid to place the order + numOfTicks int + + // stopPrice is the price limit for the order + // for buy-orders, the price limit is the maximum price + // for sell-orders, the price limit is the minimum price + stopPrice fixedpoint.Value + + // deadlineTime is the deadline time for the order execution + deadlineTime *time.Time + + executionCtx context.Context + cancelExecution context.CancelFunc + + userDataStreamCtx context.Context + cancelUserDataStream context.CancelFunc + + market types.Market + marketDataStream types.Stream + + orderBook *types.StreamOrderBook + + userDataStream types.Stream + + activeMakerOrders *bbgo.ActiveOrderBook + orderStore *core.OrderStore + position *types.Position + tradeCollector *core.TradeCollector + + logger logrus.FieldLogger + + mu sync.Mutex + + userDataStreamConnectC chan struct{} + marketDataStreamConnectC chan struct{} + done *DoneSignal +} + +func NewStreamExecutor( + exchange types.Exchange, + symbol string, + market types.Market, + side types.SideType, + targetQuantity, sliceQuantity fixedpoint.Value, +) *FixedQuantityExecutor { + + marketDataStream := exchange.NewStream() + marketDataStream.SetPublicOnly() + marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + + orderBook := types.NewStreamBook(symbol) + orderBook.BindStream(marketDataStream) + + userDataStream := exchange.NewStream() + orderStore := core.NewOrderStore(symbol) + position := types.NewPositionFromMarket(market) + tradeCollector := core.NewTradeCollector(symbol, position, orderStore) + orderStore.BindStream(userDataStream) + + activeMakerOrders := bbgo.NewActiveOrderBook(symbol) + + e := &FixedQuantityExecutor{ + exchange: exchange, + symbol: symbol, + side: side, + market: market, + targetQuantity: targetQuantity, + sliceQuantity: sliceQuantity, + updateInterval: defaultUpdateInterval, + logger: logrus.WithFields(logrus.Fields{ + "executor": "twapStream", + "symbol": symbol, + }), + + marketDataStream: marketDataStream, + orderBook: orderBook, + + userDataStream: userDataStream, + + activeMakerOrders: activeMakerOrders, + orderStore: orderStore, + tradeCollector: tradeCollector, + position: position, + done: NewDoneSignal(), + + userDataStreamConnectC: make(chan struct{}), + marketDataStreamConnectC: make(chan struct{}), + } + + e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { + e.logger.Info(trade.String()) + }) + e.tradeCollector.BindStream(e.userDataStream) + + activeMakerOrders.OnFilled(e.handleFilledOrder) + activeMakerOrders.BindStream(e.userDataStream) + + e.marketDataStream.OnConnect(func() { + e.logger.Info("market data stream on connect") + close(e.marketDataStreamConnectC) + e.logger.Infof("marketDataStreamConnectC closed") + }) + + // private channels + e.userDataStream.OnAuth(func() { + e.logger.Info("user data stream on auth") + close(e.userDataStreamConnectC) + e.logger.Info("userDataStreamConnectC closed") + }) + + return e +} + +func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) { + e.deadlineTime = &t +} + +func (e *FixedQuantityExecutor) SetDelayInterval(delayInterval time.Duration) { + e.delayInterval = delayInterval +} + +func (e *FixedQuantityExecutor) SetUpdateInterval(updateInterval time.Duration) { + e.updateInterval = updateInterval +} + +func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) { + e.logger.Infof("connecting market data stream...") + if err := e.marketDataStream.Connect(ctx); err != nil { + e.logger.WithError(err).Errorf("market data stream connect error") + } +} + +func (e *FixedQuantityExecutor) connectUserData(ctx context.Context) { + e.logger.Infof("connecting user data stream...") + if err := e.userDataStream.Connect(ctx); err != nil { + e.logger.WithError(err).Errorf("user data stream connect error") + } +} + +func (e *FixedQuantityExecutor) handleFilledOrder(order types.Order) { + e.logger.Info(order.String()) + + // filled event triggers the order removal from the active order store + // we need to ensure we received every order update event before the execution is done. + e.cancelContextIfTargetQuantityFilled() +} + +func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool { + // ensure that the trades are processed + e.tradeCollector.Process() + + // now get the base quantity from the position + base := e.position.GetBase() + + if base.Abs().Sub(e.targetQuantity).Compare(e.market.MinQuantity.Neg()) >= 0 { + e.logger.Infof("position is filled with target quantity, canceling the order execution context") + e.cancelExecution() + return true + } + return false +} + +func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { + gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second) + defer gracefulCancel() + return e.activeMakerOrders.GracefulCancel(gracefulCtx, e.exchange) +} + +func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { + // updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2) + + defer func() { + if err := e.cancelActiveOrders(ctx); err != nil { + e.logger.WithError(err).Error("cancel active orders error") + } + + e.cancelUserDataStream() + e.done.Emit() + }() + + ticker := time.NewTimer(e.updateInterval) + defer ticker.Stop() + + monitor := NewBboMonitor() + + for { + select { + case <-ctx.Done(): + return + + case <-e.orderBook.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + + // orderBook.C sends a signal when any price or quantity changes in the order book + /* + if !updateLimiter.Allow() { + break + } + */ + + if e.cancelContextIfTargetQuantityFilled() { + return + } + + e.logger.Infof("%s order book changed, checking order...", e.symbol) + + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } + + case <-ticker.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + + /* + if !updateLimiter.Allow() { + break + } + */ + + if e.cancelContextIfTargetQuantityFilled() { + return + } + + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } + } + } +} + +func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + // if there is no gap between the first price entry and the second price entry + second, ok := sideBook.Second() + if !ok { + return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.symbol, e.side) + } + + tickSize := e.market.TickSize + numOfTicks := fixedpoint.NewFromInt(int64(e.numOfTicks)) + tickSpread := tickSize.Mul(numOfTicks) + + // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { + orders := e.activeMakerOrders.Orders() + if len(orders) > 1 { + e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol) + } + + // get the first active order + order := orders[0] + orderPrice := order.Price + // quantity := fixedpoint.NewFromFloat(order.Quantity) + + remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) + if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + return nil + } + + // if the first bid price or first ask price is the same to the current active order + // we should skip updating the order + // DO NOT UPDATE IF: + // tickSpread > 0 AND current order price == second price + tickSpread + // current order price == first price + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + + switch e.side { + case types.SideTypeBuy: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) + return nil + } + + case types.SideTypeSell: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } + } + + if err := e.cancelActiveOrders(ctx); err != nil { + e.logger.Warnf("cancel active orders error: %v", err) + } + } + + e.tradeCollector.Process() + + orderForm, err := e.generateOrder() + if err != nil { + return err + } else if orderForm == nil { + return nil + } + + createdOrder, err := e.exchange.SubmitOrder(ctx, *orderForm) + if err != nil { + return err + } + + if createdOrder != nil { + e.orderStore.Add(*createdOrder) + e.activeMakerOrders.Add(*createdOrder) + e.tradeCollector.Process() + } + + return nil +} + +func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) { + newPrice := fixedpoint.Zero + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return newPrice, fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + newPrice = first.Price + spread, ok := book.Spread() + if !ok { + return newPrice, errors.New("can not calculate spread, neither bid price or ask price exists") + } + + tickSize := e.market.TickSize + tickSpread := tickSize.Mul(fixedpoint.NewFromInt(int64(e.numOfTicks))) + if spread.Compare(tickSize) > 0 { + // there is a gap in the spread + tickSpread = fixedpoint.Min(tickSpread, spread.Sub(tickSize)) + switch e.side { + case types.SideTypeSell: + newPrice = newPrice.Sub(tickSpread) + case types.SideTypeBuy: + newPrice = newPrice.Add(tickSpread) + } + } + + if e.stopPrice.Sign() > 0 { + switch e.side { + case types.SideTypeSell: + if newPrice.Compare(e.stopPrice) < 0 { + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + + case types.SideTypeBuy: + if newPrice.Compare(e.stopPrice) > 0 { + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + } + } + + return newPrice, nil +} + +func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value { + base := e.position.GetBase() + return e.targetQuantity.Sub(base.Abs()) +} + +func (e *FixedQuantityExecutor) isDeadlineExceeded() bool { + if e.deadlineTime != nil && !e.deadlineTime.IsZero() { + return time.Since(*e.deadlineTime) > 0 + } + + return false +} + +func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) { + minQuantity := e.market.MinQuantity + remainingQuantity := e.getRemainingQuantity() + + if remainingQuantity.Sign() <= 0 { + e.cancelExecution() + return fixedpoint.Zero, nil + } + + if remainingQuantity.Compare(minQuantity) < 0 { + e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String()) + + e.cancelExecution() + return fixedpoint.Zero, nil + } + + // if deadline exceeded, we should return the remaining quantity + if e.isDeadlineExceeded() { + return remainingQuantity, nil + } + + // when slice = 1000, if we only have 998, we should adjust our quantity to 998 + orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity) + + // if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round + // if there are rest slices + nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity) + + if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) { + orderQuantity = remainingQuantity + } + + orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price) + return orderQuantity, nil +} + +func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) { + newPrice, err := e.getNewPrice() + if err != nil { + return nil, err + } + + orderQuantity, err := e.calculateNewOrderQuantity(newPrice) + if err != nil { + return nil, err + } + + balances, err := e.exchange.QueryAccountBalances(e.executionCtx) + if err != nil { + return nil, err + } + + switch e.side { + case types.SideTypeSell: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.BaseCurrency]; ok { + orderQuantity = fixedpoint.Min(b.Available, orderQuantity) + } + + case types.SideTypeBuy: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.QuoteCurrency]; ok { + orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + } + } + + if e.isDeadlineExceeded() { + return &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeMarket, + Quantity: orderQuantity, + Market: e.market, + }, nil + } + + return &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeLimitMaker, + Quantity: orderQuantity, + Price: newPrice, + Market: e.market, + TimeInForce: types.TimeInForceGTC, + }, nil +} + +func (e *FixedQuantityExecutor) Start(ctx context.Context) error { + if e.executionCtx != nil { + return errors.New("executionCtx is not nil, you can't start the executor twice") + } + + e.executionCtx, e.cancelExecution = context.WithCancel(ctx) + e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx) + + go e.connectMarketData(e.executionCtx) + go e.connectUserData(e.userDataStreamCtx) + + e.logger.Infof("waiting for connections ready...") + + if err := e.WaitForConnection(ctx); err != nil { + e.cancelExecution() + return err + } + + e.logger.Infof("connections ready, starting order updater...") + + go e.orderUpdater(e.executionCtx) + return nil +} + +func (e *FixedQuantityExecutor) WaitForConnection(ctx context.Context) error { + if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("market data stream connection timeout") + } + + if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("user data stream connection timeout") + } + + return nil +} + +// Done returns a channel that emits a signal when the execution is done. +func (e *FixedQuantityExecutor) Done() <-chan struct{} { + return e.done.Chan() +} + +// Shutdown stops the execution +// If we call this method, it means the execution is still running, +// We need it to: +// 1. Stop the order updater (by using the execution context) +// 2. The order updater cancels all open orders and closes the user data stream +func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) { + e.mu.Lock() + if e.cancelExecution != nil { + e.cancelExecution() + } + e.mu.Unlock() + + for { + select { + + case <-shutdownCtx.Done(): + return + + case <-e.done.Chan(): + return + + } + } +} + +func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return false + case <-c: + return true + } +} diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go new file mode 100644 index 000000000..9ed443e27 --- /dev/null +++ b/pkg/twap/v2/stream_executor_test.go @@ -0,0 +1,316 @@ +package twap + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/c9s/bbgo/pkg/fixedpoint" + . "github.com/c9s/bbgo/pkg/testing/testhelper" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/types/mocks" +) + +func getTestMarket() types.Market { + market := types.Market{ + Symbol: "BTCUSDT", + PricePrecision: 8, + VolumePrecision: 8, + QuoteCurrency: "USDT", + BaseCurrency: "BTC", + MinNotional: fixedpoint.MustNewFromString("0.001"), + MinAmount: fixedpoint.MustNewFromString("10.0"), + MinQuantity: fixedpoint.MustNewFromString("0.001"), + } + return market +} + +type OrderMatcher struct { + Order types.Order +} + +func MatchOrder(o types.Order) *OrderMatcher { + return &OrderMatcher{ + Order: o, + } +} + +func (m *OrderMatcher) Matches(x interface{}) bool { + order, ok := x.(types.Order) + if !ok { + return false + } + + return m.Order.OrderID == order.OrderID && m.Order.Price.Compare(m.Order.Price) == 0 +} + +func (m *OrderMatcher) String() string { + return fmt.Sprintf("OrderMatcher expects %+v", m.Order) +} + +type CatchMatcher struct { + f func(x any) +} + +func Catch(f func(x any)) *CatchMatcher { + return &CatchMatcher{ + f: f, + } +} + +func (m *CatchMatcher) Matches(x interface{}) bool { + m.f(x) + return true +} + +func (m *CatchMatcher) String() string { + return "CatchMatcher" +} + +func bindMockMarketDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnBookSnapshot(Catch(func(x any) { + stream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnBookUpdate(Catch(func(x any) { + stream.OnBookUpdate(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() +} + +func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnOrderUpdate(Catch(func(x any) { + stream.OnOrderUpdate(x.(func(order types.Order))) + })).AnyTimes() + mockStream.EXPECT().OnTradeUpdate(Catch(func(x any) { + stream.OnTradeUpdate(x.(func(order types.Trade))) + })).AnyTimes() + mockStream.EXPECT().OnBalanceUpdate(Catch(func(x any) { + stream.OnBalanceUpdate(x.(func(m types.BalanceMap))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() + mockStream.EXPECT().OnAuth(Catch(func(x any) { + stream.OnAuth(x.(func())) + })) +} + +func TestNewStreamExecutor(t *testing.T) { + exchangeName := types.ExchangeBinance + symbol := "BTCUSDT" + market := getTestMarket() + + targetQuantity := Number(100) + sliceQuantity := Number(1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockEx := mocks.NewMockExchange(mockCtrl) + + marketDataStream := &types.StandardStream{} + userDataStream := &types.StandardStream{} + + mockMarketDataStream := mocks.NewMockStream(mockCtrl) + mockMarketDataStream.EXPECT().SetPublicOnly() + mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + + bindMockMarketDataStream(mockMarketDataStream, marketDataStream) + + mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + mockUserDataStream := mocks.NewMockStream(mockCtrl) + bindMockUserDataStream(mockUserDataStream, userDataStream) + mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + initialBalances := types.BalanceMap{ + "BTC": types.Balance{ + Available: Number(2), + }, + "USDT": types.Balance{ + Available: Number(20_000), + }, + } + + mockEx.EXPECT().NewStream().Return(mockMarketDataStream) + mockEx.EXPECT().NewStream().Return(mockUserDataStream) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) + + // first order + firstSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19400), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + firstSubmitOrderTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + firstOrder := types.Order{ + SubmitOrder: firstSubmitOrder, + Exchange: exchangeName, + OrderID: 1, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(firstSubmitOrderTime), + UpdateTime: types.Time(firstSubmitOrderTime), + } + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil) + + executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity) + executor.SetUpdateInterval(200 * time.Millisecond) + + go func() { + err := executor.Start(ctx) + assert.NoError(t, err) + }() + + go func() { + time.Sleep(500 * time.Millisecond) + marketDataStream.EmitConnect() + userDataStream.EmitConnect() + userDataStream.EmitAuth() + }() + + err := executor.WaitForConnection(ctx) + assert.NoError(t, err) + + t.Logf("sending book snapshot...") + snapshotTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + marketDataStream.EmitBookSnapshot(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19400), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + time.Sleep(500 * time.Millisecond) + + t.Logf("sending book update...") + + // we expect the second order will be placed when the order update is received + secondSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19420), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + secondSubmitOrderTime := time.Date(2021, 1, 1, 0, 1, 0, 0, time.UTC) + secondOrder := types.Order{ + SubmitOrder: secondSubmitOrder, + Exchange: exchangeName, + OrderID: 2, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(secondSubmitOrderTime), + UpdateTime: types.Time(secondSubmitOrderTime), + } + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(firstOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := firstOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order update: %+v", orderUpdate) + return nil + }) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), secondSubmitOrder).Return(&secondOrder, nil) + + t.Logf("waiting for the order update...") + time.Sleep(500 * time.Millisecond) + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + marketDataStream.EmitBookUpdate(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19420), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + t.Logf("waiting for the next order update...") + time.Sleep(500 * time.Millisecond) + + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + t.Logf("emitting trade update...") + userDataStream.EmitTradeUpdate(types.Trade{ + ID: 1, + OrderID: 2, + Exchange: exchangeName, + Price: Number(19420.0), + Quantity: Number(100.0), + QuoteQuantity: Number(100.0 * 19420.0), + Symbol: symbol, + Side: types.SideTypeBuy, + IsBuyer: true, + IsMaker: true, + Time: types.Time(secondSubmitOrderTime), + }) + + t.Logf("waiting for the trade callbacks...") + time.Sleep(500 * time.Millisecond) + + executor.tradeCollector.Process() + assert.Equal(t, Number(100), executor.position.GetBase()) + + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(secondOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := secondOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order #2 update: %+v", orderUpdate) + return nil + }) + assert.True(t, executor.cancelContextIfTargetQuantityFilled(), "target quantity should be filled") + + // finalizing and stop the executor + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + case <-executor.Done(): + } + t.Logf("executor done") +} diff --git a/pkg/types/market.go b/pkg/types/market.go index c7135aff3..aae71bd1f 100644 --- a/pkg/types/market.go +++ b/pkg/types/market.go @@ -247,6 +247,19 @@ func (m Market) AdjustQuantityByMinNotional(quantity, currentPrice fixedpoint.Va return quantity } +// AdjustQuantityByMaxAmount adjusts the quantity to make the amount less than the given maxAmount +func (m Market) AdjustQuantityByMaxAmount(quantity, currentPrice, maxAmount fixedpoint.Value) fixedpoint.Value { + // modify quantity for the min amount + amount := currentPrice.Mul(quantity) + if amount.Compare(maxAmount) < 0 { + return quantity + } + + ratio := maxAmount.Div(amount) + quantity = quantity.Mul(ratio) + return m.TruncateQuantity(quantity) +} + type MarketMap map[string]Market func (m MarketMap) Add(market Market) { diff --git a/pkg/types/mocks/mock_stream.go b/pkg/types/mocks/mock_stream.go new file mode 100644 index 000000000..176484aea --- /dev/null +++ b/pkg/types/mocks/mock_stream.go @@ -0,0 +1,375 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/c9s/bbgo/pkg/types (interfaces: Stream) +// +// Generated by this command: +// +// mockgen -destination=mocks/mock_stream.go -package=mocks . Stream +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/c9s/bbgo/pkg/types" + gomock "go.uber.org/mock/gomock" +) + +// MockStream is a mock of Stream interface. +type MockStream struct { + ctrl *gomock.Controller + recorder *MockStreamMockRecorder +} + +// MockStreamMockRecorder is the mock recorder for MockStream. +type MockStreamMockRecorder struct { + mock *MockStream +} + +// NewMockStream creates a new mock instance. +func NewMockStream(ctrl *gomock.Controller) *MockStream { + mock := &MockStream{ctrl: ctrl} + mock.recorder = &MockStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStream) EXPECT() *MockStreamMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockStream) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStreamMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close)) +} + +// Connect mocks base method. +func (m *MockStream) Connect(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Connect", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Connect indicates an expected call of Connect. +func (mr *MockStreamMockRecorder) Connect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockStream)(nil).Connect), arg0) +} + +// GetPublicOnly mocks base method. +func (m *MockStream) GetPublicOnly() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPublicOnly") + ret0, _ := ret[0].(bool) + return ret0 +} + +// GetPublicOnly indicates an expected call of GetPublicOnly. +func (mr *MockStreamMockRecorder) GetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPublicOnly", reflect.TypeOf((*MockStream)(nil).GetPublicOnly)) +} + +// GetSubscriptions mocks base method. +func (m *MockStream) GetSubscriptions() []types.Subscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscriptions") + ret0, _ := ret[0].([]types.Subscription) + return ret0 +} + +// GetSubscriptions indicates an expected call of GetSubscriptions. +func (mr *MockStreamMockRecorder) GetSubscriptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockStream)(nil).GetSubscriptions)) +} + +// OnAggTrade mocks base method. +func (m *MockStream) OnAggTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAggTrade", arg0) +} + +// OnAggTrade indicates an expected call of OnAggTrade. +func (mr *MockStreamMockRecorder) OnAggTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAggTrade", reflect.TypeOf((*MockStream)(nil).OnAggTrade), arg0) +} + +// OnAuth mocks base method. +func (m *MockStream) OnAuth(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAuth", arg0) +} + +// OnAuth indicates an expected call of OnAuth. +func (mr *MockStreamMockRecorder) OnAuth(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAuth", reflect.TypeOf((*MockStream)(nil).OnAuth), arg0) +} + +// OnBalanceSnapshot mocks base method. +func (m *MockStream) OnBalanceSnapshot(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceSnapshot", arg0) +} + +// OnBalanceSnapshot indicates an expected call of OnBalanceSnapshot. +func (mr *MockStreamMockRecorder) OnBalanceSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceSnapshot", reflect.TypeOf((*MockStream)(nil).OnBalanceSnapshot), arg0) +} + +// OnBalanceUpdate mocks base method. +func (m *MockStream) OnBalanceUpdate(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceUpdate", arg0) +} + +// OnBalanceUpdate indicates an expected call of OnBalanceUpdate. +func (mr *MockStreamMockRecorder) OnBalanceUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceUpdate", reflect.TypeOf((*MockStream)(nil).OnBalanceUpdate), arg0) +} + +// OnBookSnapshot mocks base method. +func (m *MockStream) OnBookSnapshot(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookSnapshot", arg0) +} + +// OnBookSnapshot indicates an expected call of OnBookSnapshot. +func (mr *MockStreamMockRecorder) OnBookSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookSnapshot", reflect.TypeOf((*MockStream)(nil).OnBookSnapshot), arg0) +} + +// OnBookTickerUpdate mocks base method. +func (m *MockStream) OnBookTickerUpdate(arg0 func(types.BookTicker)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookTickerUpdate", arg0) +} + +// OnBookTickerUpdate indicates an expected call of OnBookTickerUpdate. +func (mr *MockStreamMockRecorder) OnBookTickerUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookTickerUpdate", reflect.TypeOf((*MockStream)(nil).OnBookTickerUpdate), arg0) +} + +// OnBookUpdate mocks base method. +func (m *MockStream) OnBookUpdate(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookUpdate", arg0) +} + +// OnBookUpdate indicates an expected call of OnBookUpdate. +func (mr *MockStreamMockRecorder) OnBookUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookUpdate", reflect.TypeOf((*MockStream)(nil).OnBookUpdate), arg0) +} + +// OnConnect mocks base method. +func (m *MockStream) OnConnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnConnect", arg0) +} + +// OnConnect indicates an expected call of OnConnect. +func (mr *MockStreamMockRecorder) OnConnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnConnect", reflect.TypeOf((*MockStream)(nil).OnConnect), arg0) +} + +// OnDisconnect mocks base method. +func (m *MockStream) OnDisconnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnDisconnect", arg0) +} + +// OnDisconnect indicates an expected call of OnDisconnect. +func (mr *MockStreamMockRecorder) OnDisconnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDisconnect", reflect.TypeOf((*MockStream)(nil).OnDisconnect), arg0) +} + +// OnForceOrder mocks base method. +func (m *MockStream) OnForceOrder(arg0 func(types.LiquidationInfo)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnForceOrder", arg0) +} + +// OnForceOrder indicates an expected call of OnForceOrder. +func (mr *MockStreamMockRecorder) OnForceOrder(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnForceOrder", reflect.TypeOf((*MockStream)(nil).OnForceOrder), arg0) +} + +// OnFuturesPositionSnapshot mocks base method. +func (m *MockStream) OnFuturesPositionSnapshot(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionSnapshot", arg0) +} + +// OnFuturesPositionSnapshot indicates an expected call of OnFuturesPositionSnapshot. +func (mr *MockStreamMockRecorder) OnFuturesPositionSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionSnapshot", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionSnapshot), arg0) +} + +// OnFuturesPositionUpdate mocks base method. +func (m *MockStream) OnFuturesPositionUpdate(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionUpdate", arg0) +} + +// OnFuturesPositionUpdate indicates an expected call of OnFuturesPositionUpdate. +func (mr *MockStreamMockRecorder) OnFuturesPositionUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionUpdate", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionUpdate), arg0) +} + +// OnKLine mocks base method. +func (m *MockStream) OnKLine(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLine", arg0) +} + +// OnKLine indicates an expected call of OnKLine. +func (mr *MockStreamMockRecorder) OnKLine(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLine", reflect.TypeOf((*MockStream)(nil).OnKLine), arg0) +} + +// OnKLineClosed mocks base method. +func (m *MockStream) OnKLineClosed(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLineClosed", arg0) +} + +// OnKLineClosed indicates an expected call of OnKLineClosed. +func (mr *MockStreamMockRecorder) OnKLineClosed(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLineClosed", reflect.TypeOf((*MockStream)(nil).OnKLineClosed), arg0) +} + +// OnMarketTrade mocks base method. +func (m *MockStream) OnMarketTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnMarketTrade", arg0) +} + +// OnMarketTrade indicates an expected call of OnMarketTrade. +func (mr *MockStreamMockRecorder) OnMarketTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMarketTrade", reflect.TypeOf((*MockStream)(nil).OnMarketTrade), arg0) +} + +// OnOrderUpdate mocks base method. +func (m *MockStream) OnOrderUpdate(arg0 func(types.Order)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnOrderUpdate", arg0) +} + +// OnOrderUpdate indicates an expected call of OnOrderUpdate. +func (mr *MockStreamMockRecorder) OnOrderUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnOrderUpdate", reflect.TypeOf((*MockStream)(nil).OnOrderUpdate), arg0) +} + +// OnRawMessage mocks base method. +func (m *MockStream) OnRawMessage(arg0 func([]byte)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnRawMessage", arg0) +} + +// OnRawMessage indicates an expected call of OnRawMessage. +func (mr *MockStreamMockRecorder) OnRawMessage(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRawMessage", reflect.TypeOf((*MockStream)(nil).OnRawMessage), arg0) +} + +// OnStart mocks base method. +func (m *MockStream) OnStart(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnStart", arg0) +} + +// OnStart indicates an expected call of OnStart. +func (mr *MockStreamMockRecorder) OnStart(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnStart", reflect.TypeOf((*MockStream)(nil).OnStart), arg0) +} + +// OnTradeUpdate mocks base method. +func (m *MockStream) OnTradeUpdate(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnTradeUpdate", arg0) +} + +// OnTradeUpdate indicates an expected call of OnTradeUpdate. +func (mr *MockStreamMockRecorder) OnTradeUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTradeUpdate", reflect.TypeOf((*MockStream)(nil).OnTradeUpdate), arg0) +} + +// Reconnect mocks base method. +func (m *MockStream) Reconnect() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reconnect") +} + +// Reconnect indicates an expected call of Reconnect. +func (mr *MockStreamMockRecorder) Reconnect() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconnect", reflect.TypeOf((*MockStream)(nil).Reconnect)) +} + +// Resubscribe mocks base method. +func (m *MockStream) Resubscribe(arg0 func([]types.Subscription) ([]types.Subscription, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Resubscribe", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Resubscribe indicates an expected call of Resubscribe. +func (mr *MockStreamMockRecorder) Resubscribe(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resubscribe", reflect.TypeOf((*MockStream)(nil).Resubscribe), arg0) +} + +// SetPublicOnly mocks base method. +func (m *MockStream) SetPublicOnly() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetPublicOnly") +} + +// SetPublicOnly indicates an expected call of SetPublicOnly. +func (mr *MockStreamMockRecorder) SetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPublicOnly", reflect.TypeOf((*MockStream)(nil).SetPublicOnly)) +} + +// Subscribe mocks base method. +func (m *MockStream) Subscribe(arg0 types.Channel, arg1 string, arg2 types.SubscribeOptions) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2) +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockStreamMockRecorder) Subscribe(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockStream)(nil).Subscribe), arg0, arg1, arg2) +} diff --git a/pkg/types/price_type.go b/pkg/types/price_type.go index 93bc269e6..cfde80668 100644 --- a/pkg/types/price_type.go +++ b/pkg/types/price_type.go @@ -4,19 +4,38 @@ import ( "encoding/json" "strings" - "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/fixedpoint" ) type PriceType string const ( - PriceTypeLast PriceType = "LAST" - PriceTypeBuy PriceType = "BUY" // BID - PriceTypeSell PriceType = "SELL" // ASK - PriceTypeMid PriceType = "MID" + // PriceTypeLast uses the last price from the given ticker + PriceTypeLast PriceType = "LAST" + + // PriceTypeBid uses the bid price from the given ticker + PriceTypeBid PriceType = "BID" + + // PriceTypeAsk uses the ask price from the given ticker + PriceTypeAsk PriceType = "ASK" + + // PriceTypeMid calculates the middle price from the given ticker + PriceTypeMid PriceType = "MID" + PriceTypeMaker PriceType = "MAKER" PriceTypeTaker PriceType = "TAKER" + + // See best bid offer types + // https://www.binance.com/en/support/faq/understanding-and-using-bbo-orders-on-binance-futures-7f93c89ef09042678cfa73e8a28612e8 + + PriceTypeBestBidOfferCounterParty1 PriceType = "COUNTERPARTY1" + PriceTypeBestBidOfferCounterParty5 PriceType = "COUNTERPARTY5" + + PriceTypeBestBidOfferQueue1 PriceType = "QUEUE1" + PriceTypeBestBidOfferQueue5 PriceType = "QUEUE5" ) var ErrInvalidPriceType = errors.New("invalid price type") @@ -24,7 +43,10 @@ var ErrInvalidPriceType = errors.New("invalid price type") func ParsePriceType(s string) (p PriceType, err error) { p = PriceType(strings.ToUpper(s)) switch p { - case PriceTypeLast, PriceTypeBuy, PriceTypeSell, PriceTypeMid, PriceTypeMaker, PriceTypeTaker: + case PriceTypeLast, PriceTypeBid, PriceTypeAsk, + PriceTypeMid, PriceTypeMaker, PriceTypeTaker, + PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5, + PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5: return p, err } return p, ErrInvalidPriceType @@ -47,25 +69,31 @@ func (p *PriceType) UnmarshalJSON(data []byte) error { return nil } -func (p PriceType) Map(ticker *Ticker, side SideType) fixedpoint.Value { +// GetPrice returns the price from the given ticker based on the price type +func (p PriceType) GetPrice(ticker *Ticker, side SideType) fixedpoint.Value { + switch p { + case PriceTypeBestBidOfferQueue5, PriceTypeBestBidOfferCounterParty5: + log.Warnf("price type %s is not supported with ticker", p) + } + price := ticker.Last switch p { case PriceTypeLast: price = ticker.Last - case PriceTypeBuy: + case PriceTypeBid: price = ticker.Buy - case PriceTypeSell: + case PriceTypeAsk: price = ticker.Sell case PriceTypeMid: price = ticker.Buy.Add(ticker.Sell).Div(fixedpoint.NewFromInt(2)) - case PriceTypeMaker: + case PriceTypeMaker, PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5: if side == SideTypeBuy { price = ticker.Buy } else if side == SideTypeSell { price = ticker.Sell } - case PriceTypeTaker: + case PriceTypeTaker, PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5: if side == SideTypeBuy { price = ticker.Sell } else if side == SideTypeSell { diff --git a/pkg/types/stream.go b/pkg/types/stream.go index a2246ed68..2da4df7dd 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -24,6 +24,7 @@ var defaultDialer = &websocket.Dialer{ ReadBufferSize: 4096, } +//go:generate mockgen -destination=mocks/mock_stream.go -package=mocks . Stream type Stream interface { StandardStreamEventHub