From 621a2b86cf9a5d8f2dabade2686d69a718a071a2 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 13:29:27 +0800 Subject: [PATCH 01/12] twap: move twap execution to a single package --- pkg/cmd/orders.go | 3 +- .../stream_executor.go} | 85 ++++++++++--------- 2 files changed, 45 insertions(+), 43 deletions(-) rename pkg/{bbgo/twap_order_executor.go => twap/stream_executor.go} (78%) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 475237a20..1dd5d8c9d 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/cobra" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/twap" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/types" @@ -255,7 +256,7 @@ var executeOrderCmd = &cobra.Command{ executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution := &bbgo.TwapExecution{ + execution := &twap.Execution{ Session: session, Symbol: symbol, Side: side, diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/twap/stream_executor.go similarity index 78% rename from pkg/bbgo/twap_order_executor.go rename to pkg/twap/stream_executor.go index 05ad9e5e4..6ac4a809b 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/twap/stream_executor.go @@ -1,4 +1,4 @@ -package bbgo +package twap import ( "context" @@ -7,16 +7,17 @@ import ( "time" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) -type TwapExecution struct { - Session *ExchangeSession +type Execution struct { + Session *bbgo.ExchangeSession Symbol string Side types.SideType TargetQuantity fixedpoint.Value @@ -37,7 +38,7 @@ type TwapExecution struct { currentPrice fixedpoint.Value activePosition fixedpoint.Value - activeMakerOrders *ActiveOrderBook + activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position @@ -51,21 +52,21 @@ type TwapExecution struct { mu sync.Mutex } -func (e *TwapExecution) connectMarketData(ctx context.Context) { - log.Infof("connecting market data stream...") +func (e *Execution) connectMarketData(ctx context.Context) { + logrus.Infof("connecting market data stream...") if err := e.marketDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("market data stream connect error") + logrus.WithError(err).Errorf("market data stream connect error") } } -func (e *TwapExecution) connectUserData(ctx context.Context) { - log.Infof("connecting user data stream...") +func (e *Execution) connectUserData(ctx context.Context) { + logrus.Infof("connecting user data stream...") if err := e.userDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("user data stream connect error") + logrus.WithError(err).Errorf("user data stream connect error") } } -func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { +func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -111,7 +112,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er switch e.Side { case types.SideTypeSell: if newPrice.Compare(e.StopPrice) < 0 { - log.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -121,7 +122,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: if newPrice.Compare(e.StopPrice) > 0 { - log.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -157,7 +158,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er } minNotional := e.market.MinNotional - orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) switch e.Side { case types.SideTypeSell: @@ -169,11 +170,11 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: // check base balance for sell, try to sell as more as possible if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok { - orderQuantity = AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) } } - if e.DeadlineTime != emptyTime { + if !e.DeadlineTime.IsZero() { now := time.Now() if now.After(e.DeadlineTime) { orderForm = types.SubmitOrder{ @@ -200,7 +201,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er return orderForm, err } -func (e *TwapExecution) updateOrder(ctx context.Context) error { +func (e *Execution) updateOrder(ctx context.Context) error { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -224,7 +225,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { - log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) + logrus.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) } // get the first order @@ -234,7 +235,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { - log.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) return nil } @@ -243,24 +244,24 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // DO NOT UPDATE IF: // tickSpread > 0 AND current order price == second price + tickSpread // current order price == first price - log.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) switch e.Side { case types.SideTypeBuy: if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best bid price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) return nil } case types.SideTypeSell: if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } } @@ -283,13 +284,13 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { return nil } -func (e *TwapExecution) cancelActiveOrders() { +func (e *Execution) cancelActiveOrders() { gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) defer gracefulCancel() e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) } -func (e *TwapExecution) orderUpdater(ctx context.Context) { +func (e *Execution) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() @@ -317,9 +318,9 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { return } - log.Infof("%s order book changed, checking order...", e.Symbol) + logrus.Infof("%s order book changed, checking order...", e.Symbol) if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } case <-ticker.C: @@ -332,25 +333,25 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } } } } -func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { +func (e *Execution) cancelContextIfTargetQuantityFilled() bool { base := e.position.GetBase() if base.Abs().Compare(e.TargetQuantity) >= 0 { - log.Infof("filled target quantity, canceling the order execution context") + logrus.Infof("filled target quantity, canceling the order execution context") e.cancelExecution() return true } return false } -func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { +func (e *Execution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -360,21 +361,21 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { return } - log.Info(trade.String()) + logrus.Info(trade.String()) e.position.AddTrade(trade) - log.Infof("position updated: %+v", e.position) + logrus.Infof("position updated: %+v", e.position) } -func (e *TwapExecution) handleFilledOrder(order types.Order) { - log.Info(order.String()) +func (e *Execution) handleFilledOrder(order types.Order) { + logrus.Info(order.String()) // filled event triggers the order removal from the active order store // we need to ensure we received every order update event before the execution is done. e.cancelContextIfTargetQuantityFilled() } -func (e *TwapExecution) Run(parentCtx context.Context) error { +func (e *Execution) Run(parentCtx context.Context) error { e.mu.Lock() e.stoppedC = make(chan struct{}) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) @@ -409,7 +410,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { e.orderStore = core.NewOrderStore(e.Symbol) e.orderStore.BindStream(e.userDataStream) - e.activeMakerOrders = NewActiveOrderBook(e.Symbol) + e.activeMakerOrders = bbgo.NewActiveOrderBook(e.Symbol) e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) @@ -418,7 +419,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { return nil } -func (e *TwapExecution) emitDone() { +func (e *Execution) emitDone() { e.mu.Lock() if e.stoppedC == nil { e.stoppedC = make(chan struct{}) @@ -427,7 +428,7 @@ func (e *TwapExecution) emitDone() { e.mu.Unlock() } -func (e *TwapExecution) Done() (c <-chan struct{}) { +func (e *Execution) Done() (c <-chan struct{}) { e.mu.Lock() // if the channel is not allocated, it means it's not started yet, we need to return a closed channel if e.stoppedC == nil { @@ -447,7 +448,7 @@ func (e *TwapExecution) Done() (c <-chan struct{}) { // We need to: // 1. stop the order updater (by using the execution context) // 2. the order updater cancels all open orders and close the user data stream -func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { +func (e *Execution) Shutdown(shutdownCtx context.Context) { e.mu.Lock() if e.cancelExecution != nil { e.cancelExecution() From 9dd85623b99f3b27502da87de94df8b3c143093f Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 14:05:29 +0800 Subject: [PATCH 02/12] types,strategy: refactor price type and add more bbo (best bid offer) --- pkg/strategy/autobuy/strategy.go | 7 +++-- pkg/strategy/rebalance/strategy.go | 2 +- pkg/types/price_type.go | 44 ++++++++++++++++++++++-------- 3 files changed, 38 insertions(+), 15 deletions(-) diff --git a/pkg/strategy/autobuy/strategy.go b/pkg/strategy/autobuy/strategy.go index 13a143161..aee4f0f3b 100644 --- a/pkg/strategy/autobuy/strategy.go +++ b/pkg/strategy/autobuy/strategy.go @@ -5,13 +5,14 @@ import ( "fmt" "sync" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2" "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" - "github.com/robfig/cron/v3" - "github.com/sirupsen/logrus" ) const ID = "autobuy" @@ -128,7 +129,7 @@ func (s *Strategy) autobuy(ctx context.Context) { } side := types.SideTypeBuy - price := s.PriceType.Map(ticker, side) + price := s.PriceType.GetPrice(ticker, side) if price.Float64() > s.boll.UpBand.Last(0) { log.Infof("price %s is higher than upper band %f, skip", price.String(), s.boll.UpBand.Last(0)) diff --git a/pkg/strategy/rebalance/strategy.go b/pkg/strategy/rebalance/strategy.go index eab1de87c..6b4009063 100644 --- a/pkg/strategy/rebalance/strategy.go +++ b/pkg/strategy/rebalance/strategy.go @@ -262,7 +262,7 @@ func (s *Strategy) generateOrder(ctx context.Context) (*types.SubmitOrder, error } quantity = market.RoundDownQuantityByPrecision(quantity) - price := s.PriceType.Map(ticker, side) + price := s.PriceType.GetPrice(ticker, side) if s.MaxAmount.Float64() > 0 { quantity = bbgo.AdjustQuantityByMaxAmount(quantity, price, s.MaxAmount) diff --git a/pkg/types/price_type.go b/pkg/types/price_type.go index 93bc269e6..55db880a8 100644 --- a/pkg/types/price_type.go +++ b/pkg/types/price_type.go @@ -4,19 +4,37 @@ import ( "encoding/json" "strings" - "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/pkg/errors" + + "github.com/c9s/bbgo/pkg/fixedpoint" ) type PriceType string const ( - PriceTypeLast PriceType = "LAST" - PriceTypeBuy PriceType = "BUY" // BID - PriceTypeSell PriceType = "SELL" // ASK - PriceTypeMid PriceType = "MID" + // PriceTypeLast uses the last price from the given ticker + PriceTypeLast PriceType = "LAST" + + // PriceTypeBid uses the bid price from the given ticker + PriceTypeBid PriceType = "BID" + + // PriceTypeAsk uses the ask price from the given ticker + PriceTypeAsk PriceType = "ASK" + + // PriceTypeMid calculates the middle price from the given ticker + PriceTypeMid PriceType = "MID" + PriceTypeMaker PriceType = "MAKER" PriceTypeTaker PriceType = "TAKER" + + // See best bid offer types + // https://www.binance.com/en/support/faq/understanding-and-using-bbo-orders-on-binance-futures-7f93c89ef09042678cfa73e8a28612e8 + + PriceTypeBestBidOfferCounterParty1 PriceType = "COUNTERPARTY1" + PriceTypeBestBidOfferCounterParty5 PriceType = "COUNTERPARTY5" + + PriceTypeBestBidOfferQueue1 PriceType = "QUEUE1" + PriceTypeBestBidOfferQueue5 PriceType = "QUEUE5" ) var ErrInvalidPriceType = errors.New("invalid price type") @@ -24,7 +42,10 @@ var ErrInvalidPriceType = errors.New("invalid price type") func ParsePriceType(s string) (p PriceType, err error) { p = PriceType(strings.ToUpper(s)) switch p { - case PriceTypeLast, PriceTypeBuy, PriceTypeSell, PriceTypeMid, PriceTypeMaker, PriceTypeTaker: + case PriceTypeLast, PriceTypeBid, PriceTypeAsk, + PriceTypeMid, PriceTypeMaker, PriceTypeTaker, + PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5, + PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5: return p, err } return p, ErrInvalidPriceType @@ -47,25 +68,26 @@ func (p *PriceType) UnmarshalJSON(data []byte) error { return nil } -func (p PriceType) Map(ticker *Ticker, side SideType) fixedpoint.Value { +// GetPrice returns the price from the given ticker based on the price type +func (p PriceType) GetPrice(ticker *Ticker, side SideType) fixedpoint.Value { price := ticker.Last switch p { case PriceTypeLast: price = ticker.Last - case PriceTypeBuy: + case PriceTypeBid: price = ticker.Buy - case PriceTypeSell: + case PriceTypeAsk: price = ticker.Sell case PriceTypeMid: price = ticker.Buy.Add(ticker.Sell).Div(fixedpoint.NewFromInt(2)) - case PriceTypeMaker: + case PriceTypeMaker, PriceTypeBestBidOfferQueue1: if side == SideTypeBuy { price = ticker.Buy } else if side == SideTypeSell { price = ticker.Sell } - case PriceTypeTaker: + case PriceTypeTaker, PriceTypeBestBidOfferCounterParty1: if side == SideTypeBuy { price = ticker.Sell } else if side == SideTypeSell { From 1294cd95be0249d4b9e93f36a5645a5ab83c3d4a Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 14:09:25 +0800 Subject: [PATCH 03/12] rename twap.Execution to twap.StreamExecutor --- pkg/cmd/orders.go | 2 +- pkg/twap/stream_executor.go | 29 +++++++++++++++-------------- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 1dd5d8c9d..6d7ae236a 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -256,7 +256,7 @@ var executeOrderCmd = &cobra.Command{ executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution := &twap.Execution{ + execution := &twap.StreamExecutor{ Session: session, Symbol: symbol, Side: side, diff --git a/pkg/twap/stream_executor.go b/pkg/twap/stream_executor.go index 6ac4a809b..6d0e32b70 100644 --- a/pkg/twap/stream_executor.go +++ b/pkg/twap/stream_executor.go @@ -16,7 +16,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type Execution struct { +// StreamExecutor is a TWAP execution that places orders on the best price by connecting to the real time market data stream. +type StreamExecutor struct { Session *bbgo.ExchangeSession Symbol string Side types.SideType @@ -52,21 +53,21 @@ type Execution struct { mu sync.Mutex } -func (e *Execution) connectMarketData(ctx context.Context) { +func (e *StreamExecutor) connectMarketData(ctx context.Context) { logrus.Infof("connecting market data stream...") if err := e.marketDataStream.Connect(ctx); err != nil { logrus.WithError(err).Errorf("market data stream connect error") } } -func (e *Execution) connectUserData(ctx context.Context) { +func (e *StreamExecutor) connectUserData(ctx context.Context) { logrus.Infof("connecting user data stream...") if err := e.userDataStream.Connect(ctx); err != nil { logrus.WithError(err).Errorf("user data stream connect error") } } -func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { +func (e *StreamExecutor) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -201,7 +202,7 @@ func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) return orderForm, err } -func (e *Execution) updateOrder(ctx context.Context) error { +func (e *StreamExecutor) updateOrder(ctx context.Context) error { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -284,13 +285,13 @@ func (e *Execution) updateOrder(ctx context.Context) error { return nil } -func (e *Execution) cancelActiveOrders() { +func (e *StreamExecutor) cancelActiveOrders() { gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) defer gracefulCancel() e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) } -func (e *Execution) orderUpdater(ctx context.Context) { +func (e *StreamExecutor) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() @@ -340,7 +341,7 @@ func (e *Execution) orderUpdater(ctx context.Context) { } } -func (e *Execution) cancelContextIfTargetQuantityFilled() bool { +func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool { base := e.position.GetBase() if base.Abs().Compare(e.TargetQuantity) >= 0 { @@ -351,7 +352,7 @@ func (e *Execution) cancelContextIfTargetQuantityFilled() bool { return false } -func (e *Execution) handleTradeUpdate(trade types.Trade) { +func (e *StreamExecutor) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -367,7 +368,7 @@ func (e *Execution) handleTradeUpdate(trade types.Trade) { logrus.Infof("position updated: %+v", e.position) } -func (e *Execution) handleFilledOrder(order types.Order) { +func (e *StreamExecutor) handleFilledOrder(order types.Order) { logrus.Info(order.String()) // filled event triggers the order removal from the active order store @@ -375,7 +376,7 @@ func (e *Execution) handleFilledOrder(order types.Order) { e.cancelContextIfTargetQuantityFilled() } -func (e *Execution) Run(parentCtx context.Context) error { +func (e *StreamExecutor) Run(parentCtx context.Context) error { e.mu.Lock() e.stoppedC = make(chan struct{}) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) @@ -419,7 +420,7 @@ func (e *Execution) Run(parentCtx context.Context) error { return nil } -func (e *Execution) emitDone() { +func (e *StreamExecutor) emitDone() { e.mu.Lock() if e.stoppedC == nil { e.stoppedC = make(chan struct{}) @@ -428,7 +429,7 @@ func (e *Execution) emitDone() { e.mu.Unlock() } -func (e *Execution) Done() (c <-chan struct{}) { +func (e *StreamExecutor) Done() (c <-chan struct{}) { e.mu.Lock() // if the channel is not allocated, it means it's not started yet, we need to return a closed channel if e.stoppedC == nil { @@ -448,7 +449,7 @@ func (e *Execution) Done() (c <-chan struct{}) { // We need to: // 1. stop the order updater (by using the execution context) // 2. the order updater cancels all open orders and close the user data stream -func (e *Execution) Shutdown(shutdownCtx context.Context) { +func (e *StreamExecutor) Shutdown(shutdownCtx context.Context) { e.mu.Lock() if e.cancelExecution != nil { e.cancelExecution() From 0a83c26fd5643860e039ceb9ce756aa01946ffb9 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 14:15:43 +0800 Subject: [PATCH 04/12] types: add warning to the price type --- pkg/types/price_type.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pkg/types/price_type.go b/pkg/types/price_type.go index 55db880a8..cfde80668 100644 --- a/pkg/types/price_type.go +++ b/pkg/types/price_type.go @@ -5,6 +5,7 @@ import ( "strings" "github.com/pkg/errors" + log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/fixedpoint" ) @@ -70,6 +71,11 @@ func (p *PriceType) UnmarshalJSON(data []byte) error { // GetPrice returns the price from the given ticker based on the price type func (p PriceType) GetPrice(ticker *Ticker, side SideType) fixedpoint.Value { + switch p { + case PriceTypeBestBidOfferQueue5, PriceTypeBestBidOfferCounterParty5: + log.Warnf("price type %s is not supported with ticker", p) + } + price := ticker.Last switch p { @@ -81,13 +87,13 @@ func (p PriceType) GetPrice(ticker *Ticker, side SideType) fixedpoint.Value { price = ticker.Sell case PriceTypeMid: price = ticker.Buy.Add(ticker.Sell).Div(fixedpoint.NewFromInt(2)) - case PriceTypeMaker, PriceTypeBestBidOfferQueue1: + case PriceTypeMaker, PriceTypeBestBidOfferQueue1, PriceTypeBestBidOfferQueue5: if side == SideTypeBuy { price = ticker.Buy } else if side == SideTypeSell { price = ticker.Sell } - case PriceTypeTaker, PriceTypeBestBidOfferCounterParty1: + case PriceTypeTaker, PriceTypeBestBidOfferCounterParty1, PriceTypeBestBidOfferCounterParty5: if side == SideTypeBuy { price = ticker.Sell } else if side == SideTypeSell { From 51c1b995c231f932af08988184b792991f8850d1 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 18:00:49 +0800 Subject: [PATCH 05/12] twap: add v2 fixed quantity executor --- pkg/bbgo/order_executor_general.go | 1 + pkg/bbgo/order_executor_simple.go | 1 + pkg/strategy/dca2/recover_test.go | 17 +- pkg/twap/stream_executor.go | 6 +- pkg/twap/v2/stream_executor.go | 291 ++++++++++++++++++++++++++++ pkg/twap/v2/stream_executor_test.go | 9 + 6 files changed, 314 insertions(+), 11 deletions(-) create mode 100644 pkg/twap/v2/stream_executor.go create mode 100644 pkg/twap/v2/stream_executor_test.go diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 8de07c9e3..59ee81639 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -84,6 +84,7 @@ func NewGeneralOrderExecutor( executor := &GeneralOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ session: session, + exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(symbol), orderStore: orderStore, }, diff --git a/pkg/bbgo/order_executor_simple.go b/pkg/bbgo/order_executor_simple.go index e7c5308d0..123cd03f1 100644 --- a/pkg/bbgo/order_executor_simple.go +++ b/pkg/bbgo/order_executor_simple.go @@ -22,6 +22,7 @@ func NewSimpleOrderExecutor(session *ExchangeSession) *SimpleOrderExecutor { return &SimpleOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ session: session, + exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(""), orderStore: core.NewOrderStore(""), }, diff --git a/pkg/strategy/dca2/recover_test.go b/pkg/strategy/dca2/recover_test.go index 472ecee77..c26205c2d 100644 --- a/pkg/strategy/dca2/recover_test.go +++ b/pkg/strategy/dca2/recover_test.go @@ -6,9 +6,10 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" ) func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt time.Time) types.Order { @@ -29,7 +30,7 @@ func Test_RecoverState(t *testing.T) { t.Run("new strategy", func(t *testing.T) { currentRound := Round{} position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) @@ -47,7 +48,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionReady, state) @@ -65,7 +66,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrderFilled, state) @@ -83,7 +84,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) @@ -101,7 +102,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) @@ -122,7 +123,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, TakeProfitReady, state) @@ -143,7 +144,7 @@ func Test_RecoverState(t *testing.T) { }, } position := types.NewPositionFromMarket(strategy.Market) - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, strategy.Symbol, ID, "", position) state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) diff --git a/pkg/twap/stream_executor.go b/pkg/twap/stream_executor.go index 6d0e32b70..839d42670 100644 --- a/pkg/twap/stream_executor.go +++ b/pkg/twap/stream_executor.go @@ -48,8 +48,6 @@ type StreamExecutor struct { stoppedC chan struct{} - state int - mu sync.Mutex } @@ -349,6 +347,7 @@ func (e *StreamExecutor) cancelContextIfTargetQuantityFilled() bool { e.cancelExecution() return true } + return false } @@ -399,10 +398,10 @@ func (e *StreamExecutor) Run(parentCtx context.Context) error { e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook.BindStream(e.marketDataStream) - go e.connectMarketData(e.executionCtx) e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) + e.position = &types.Position{ Symbol: e.Symbol, BaseCurrency: e.market.BaseCurrency, @@ -415,6 +414,7 @@ func (e *StreamExecutor) Run(parentCtx context.Context) error { e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) + go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx) go e.orderUpdater(e.executionCtx) return nil diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go new file mode 100644 index 000000000..1f76176bc --- /dev/null +++ b/pkg/twap/v2/stream_executor.go @@ -0,0 +1,291 @@ +package twap + +import ( + "context" + "errors" + "sync" + "time" + + "github.com/sirupsen/logrus" + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/core" + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +var defaultUpdateInterval = time.Minute + +type DoneSignal struct { + doneC chan struct{} + mu sync.Mutex +} + +func NewDoneSignal() *DoneSignal { + return &DoneSignal{ + doneC: make(chan struct{}), + } +} + +func (e *DoneSignal) Emit() { + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + } + + close(e.doneC) + e.mu.Unlock() +} + +// Chan returns a channel that emits a signal when the execution is done. +func (e *DoneSignal) Chan() (c <-chan struct{}) { + // if the channel is not allocated, it means it's not started yet, we need to return a closed channel + e.mu.Lock() + if e.doneC == nil { + e.doneC = make(chan struct{}) + c = e.doneC + } else { + c = e.doneC + } + e.mu.Unlock() + + return c +} + +// FixedQuantityExecutor is a TWAP executor that places orders on the exchange using the exchange's stream API. +// It uses a fixed target quantity to place orders. +type FixedQuantityExecutor struct { + exchange types.Exchange + + // configuration fields + + symbol string + side types.SideType + targetQuantity fixedpoint.Value + + // updateInterval is a fixed update interval for placing new order + updateInterval time.Duration + + // delayInterval is the delay interval between each order placement + delayInterval time.Duration + + // priceLimit is the price limit for the order + // for buy-orders, the price limit is the maximum price + // for sell-orders, the price limit is the minimum price + priceLimit fixedpoint.Value + + // deadlineTime is the deadline time for the order execution + deadlineTime *time.Time + + executionCtx context.Context + cancelExecution context.CancelFunc + + userDataStreamCtx context.Context + cancelUserDataStream context.CancelFunc + + market types.Market + marketDataStream types.Stream + orderBook *types.StreamOrderBook + + userDataStream types.Stream + activeMakerOrders *bbgo.ActiveOrderBook + orderStore *core.OrderStore + position *types.Position + + logger logrus.FieldLogger + + mu sync.Mutex + + done *DoneSignal +} + +func NewStreamExecutor( + exchange types.Exchange, + symbol string, + market types.Market, + side types.SideType, + targetQuantity fixedpoint.Value, +) *FixedQuantityExecutor { + return &FixedQuantityExecutor{ + exchange: exchange, + symbol: symbol, + side: side, + market: market, + position: types.NewPositionFromMarket(market), + targetQuantity: targetQuantity, + updateInterval: defaultUpdateInterval, + logger: logrus.WithFields(logrus.Fields{ + "executor": "twapStream", + "symbol": symbol, + }), + + done: NewDoneSignal(), + } +} + +func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) { + e.deadlineTime = &t +} + +func (e *FixedQuantityExecutor) SetDelayInterval(delayInterval time.Duration) { + e.delayInterval = delayInterval +} + +func (e *FixedQuantityExecutor) SetUpdateInterval(updateInterval time.Duration) { + e.updateInterval = updateInterval +} + +func (e *FixedQuantityExecutor) connectMarketData(ctx context.Context) { + e.logger.Infof("connecting market data stream...") + if err := e.marketDataStream.Connect(ctx); err != nil { + e.logger.WithError(err).Errorf("market data stream connect error") + } +} + +func (e *FixedQuantityExecutor) connectUserData(ctx context.Context) { + e.logger.Infof("connecting user data stream...") + if err := e.userDataStream.Connect(ctx); err != nil { + e.logger.WithError(err).Errorf("user data stream connect error") + } +} + +func (e *FixedQuantityExecutor) handleFilledOrder(order types.Order) { + e.logger.Info(order.String()) + + // filled event triggers the order removal from the active order store + // we need to ensure we received every order update event before the execution is done. + e.cancelContextIfTargetQuantityFilled() +} + +func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool { + base := e.position.GetBase() + + if base.Abs().Compare(e.targetQuantity) >= 0 { + e.logger.Infof("filled target quantity, canceling the order execution context") + e.cancelExecution() + return true + } + return false +} + +func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { + gracefulCtx, gracefulCancel := context.WithTimeout(ctx, 30*time.Second) + defer gracefulCancel() + return e.activeMakerOrders.GracefulCancel(gracefulCtx, e.exchange) +} + +func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { + updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) + _ = updateLimiter + + defer func() { + if err := e.cancelActiveOrders(ctx); err != nil { + e.logger.WithError(err).Error("cancel active orders error") + } + + e.cancelUserDataStream() + e.done.Emit() + }() + + ticker := time.NewTimer(e.updateInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-e.orderBook.C: + if !updateLimiter.Allow() { + break + } + + if e.cancelContextIfTargetQuantityFilled() { + return + } + + e.logger.Infof("%s order book changed, checking order...", e.symbol) + + /* + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } + */ + + case <-ticker.C: + if !updateLimiter.Allow() { + break + } + + if e.cancelContextIfTargetQuantityFilled() { + return + } + + /* + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } + */ + } + } +} + +func (e *FixedQuantityExecutor) Start(ctx context.Context) error { + if e.marketDataStream != nil { + return errors.New("market data stream is not nil, you can't start the executor twice") + } + + e.executionCtx, e.cancelExecution = context.WithCancel(ctx) + e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx) + + e.marketDataStream = e.exchange.NewStream() + e.marketDataStream.SetPublicOnly() + e.marketDataStream.Subscribe(types.BookChannel, e.symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + + e.orderBook = types.NewStreamBook(e.symbol) + e.orderBook.BindStream(e.marketDataStream) + + e.orderStore = core.NewOrderStore(e.symbol) + e.orderStore.BindStream(e.userDataStream) + e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol) + e.activeMakerOrders.OnFilled(e.handleFilledOrder) + e.activeMakerOrders.BindStream(e.userDataStream) + + go e.connectMarketData(e.executionCtx) + go e.connectUserData(e.userDataStreamCtx) + go e.orderUpdater(e.executionCtx) + return nil +} + +// Done returns a channel that emits a signal when the execution is done. +func (e *FixedQuantityExecutor) Done() <-chan struct{} { + return e.done.Chan() +} + +// Shutdown stops the execution +// If we call this method, it means the execution is still running, +// We need it to: +// 1. Stop the order updater (by using the execution context) +// 2. The order updater cancels all open orders and closes the user data stream +func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) { + e.mu.Lock() + if e.cancelExecution != nil { + e.cancelExecution() + } + e.mu.Unlock() + + for { + select { + + case <-shutdownCtx.Done(): + return + + case <-e.done.Chan(): + return + + } + } +} diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go new file mode 100644 index 000000000..4b6c2eaf8 --- /dev/null +++ b/pkg/twap/v2/stream_executor_test.go @@ -0,0 +1,9 @@ +package twap + +import ( + "testing" +) + +func TestNewStreamExecutorV2(t *testing.T) { + +} From b7d18e687e21248aeeb656f6648b63a35e283746 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 18 Aug 2024 13:16:55 +0800 Subject: [PATCH 06/12] twap: implement orderUpdater --- pkg/bbgo/order_executor_general.go | 9 +- pkg/twap/v2/bbomonitor.go | 53 +++++ pkg/twap/v2/bbomonitor_callbacks.go | 17 ++ pkg/twap/v2/stream_executor.go | 291 ++++++++++++++++++++++++++-- 4 files changed, 347 insertions(+), 23 deletions(-) create mode 100644 pkg/twap/v2/bbomonitor.go create mode 100644 pkg/twap/v2/bbomonitor_callbacks.go diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 59ee81639..64df59c00 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -27,7 +27,9 @@ var quantityReduceDelta = fixedpoint.NewFromFloat(0.005) // This is for the maximum retries const submitOrderRetryLimit = 5 +// BaseOrderExecutor provides the common accessors for order executor type BaseOrderExecutor struct { + exchange types.Exchange session *ExchangeSession activeMakerOrders *ActiveOrderBook orderStore *core.OrderStore @@ -43,8 +45,8 @@ func (e *BaseOrderExecutor) ActiveMakerOrders() *ActiveOrderBook { // GracefulCancel cancels all active maker orders if orders are not given, otherwise cancel all the given orders func (e *BaseOrderExecutor) GracefulCancel(ctx context.Context, orders ...types.Order) error { - if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange, orders...); err != nil { - return errors.Wrap(err, "graceful cancel error") + if err := e.activeMakerOrders.GracefulCancel(ctx, e.exchange, orders...); err != nil { + return errors.Wrap(err, "graceful cancel order error") } return nil @@ -83,6 +85,7 @@ func NewGeneralOrderExecutor( executor := &GeneralOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ + exchange: session.Exchange, session: session, exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(symbol), @@ -112,7 +115,7 @@ func (e *GeneralOrderExecutor) SetMaxRetries(maxRetries uint) { } func (e *GeneralOrderExecutor) startMarginAssetUpdater(ctx context.Context) { - marginService, ok := e.session.Exchange.(types.MarginBorrowRepayService) + marginService, ok := e.exchange.(types.MarginBorrowRepayService) if !ok { log.Warnf("session %s (%T) exchange does not support MarginBorrowRepayService", e.session.Name, e.session.Exchange) return diff --git a/pkg/twap/v2/bbomonitor.go b/pkg/twap/v2/bbomonitor.go new file mode 100644 index 000000000..5081fc9bb --- /dev/null +++ b/pkg/twap/v2/bbomonitor.go @@ -0,0 +1,53 @@ +package twap + +import ( + "time" + + "github.com/c9s/bbgo/pkg/types" +) + +// BboMonitor monitors the best bid and ask price and volume. +// +//go:generate callbackgen -type BboMonitor +type BboMonitor struct { + Bid types.PriceVolume + Ask types.PriceVolume + UpdatedTime time.Time + + updateCallbacks []func(bid, ask types.PriceVolume) +} + +func NewBboMonitor() *BboMonitor { + return &BboMonitor{} +} + +func (m *BboMonitor) OnUpdateFromBook(book *types.StreamOrderBook) bool { + bestBid, ok1 := book.BestBid() + bestAsk, ok2 := book.BestAsk() + if !ok1 || !ok2 { + return false + } + + return m.Update(bestBid, bestAsk, book.LastUpdateTime()) +} + +func (m *BboMonitor) Update(bid, ask types.PriceVolume, t time.Time) bool { + changed := false + if m.Bid.Price.Compare(bid.Price) != 0 || m.Bid.Volume.Compare(bid.Volume) != 0 { + changed = true + } + + if m.Ask.Price.Compare(ask.Price) != 0 || m.Ask.Volume.Compare(ask.Volume) != 0 { + changed = true + } + + m.Bid = bid + m.Ask = ask + m.UpdatedTime = t + + if changed { + m.EmitUpdate(bid, ask) + } + + return changed +} diff --git a/pkg/twap/v2/bbomonitor_callbacks.go b/pkg/twap/v2/bbomonitor_callbacks.go new file mode 100644 index 000000000..4e413eaab --- /dev/null +++ b/pkg/twap/v2/bbomonitor_callbacks.go @@ -0,0 +1,17 @@ +// Code generated by "callbackgen -type BboMonitor"; DO NOT EDIT. + +package twap + +import ( + "github.com/c9s/bbgo/pkg/types" +) + +func (m *BboMonitor) OnUpdate(cb func(bid types.PriceVolume, ask types.PriceVolume)) { + m.updateCallbacks = append(m.updateCallbacks, cb) +} + +func (m *BboMonitor) EmitUpdate(bid types.PriceVolume, ask types.PriceVolume) { + for _, cb := range m.updateCallbacks { + cb(bid, ask) + } +} diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 1f76176bc..b05d7e410 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -3,6 +3,7 @@ package twap import ( "context" "errors" + "fmt" "sync" "time" @@ -60,9 +61,9 @@ type FixedQuantityExecutor struct { // configuration fields - symbol string - side types.SideType - targetQuantity fixedpoint.Value + symbol string + side types.SideType + targetQuantity, sliceQuantity fixedpoint.Value // updateInterval is a fixed update interval for placing new order updateInterval time.Duration @@ -70,10 +71,13 @@ type FixedQuantityExecutor struct { // delayInterval is the delay interval between each order placement delayInterval time.Duration - // priceLimit is the price limit for the order + // numOfTicks is the number of price ticks behind the best bid to place the order + numOfTicks int + + // stopPrice is the price limit for the order // for buy-orders, the price limit is the maximum price // for sell-orders, the price limit is the minimum price - priceLimit fixedpoint.Value + stopPrice fixedpoint.Value // deadlineTime is the deadline time for the order execution deadlineTime *time.Time @@ -92,6 +96,7 @@ type FixedQuantityExecutor struct { activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position + tradeCollector *core.TradeCollector logger logrus.FieldLogger @@ -105,22 +110,28 @@ func NewStreamExecutor( symbol string, market types.Market, side types.SideType, - targetQuantity fixedpoint.Value, + targetQuantity, sliceQuantity fixedpoint.Value, ) *FixedQuantityExecutor { + orderStore := core.NewOrderStore(symbol) + position := types.NewPositionFromMarket(market) + tradeCollector := core.NewTradeCollector(symbol, position, orderStore) return &FixedQuantityExecutor{ exchange: exchange, symbol: symbol, side: side, market: market, - position: types.NewPositionFromMarket(market), targetQuantity: targetQuantity, + sliceQuantity: sliceQuantity, updateInterval: defaultUpdateInterval, logger: logrus.WithFields(logrus.Fields{ "executor": "twapStream", "symbol": symbol, }), - done: NewDoneSignal(), + orderStore: orderStore, + tradeCollector: tradeCollector, + position: position, + done: NewDoneSignal(), } } @@ -177,7 +188,6 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) - _ = updateLimiter defer func() { if err := e.cancelActiveOrders(ctx); err != nil { @@ -191,12 +201,20 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { ticker := time.NewTimer(e.updateInterval) defer ticker.Stop() + monitor := NewBboMonitor() + for { select { case <-ctx.Done(): return case <-e.orderBook.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + + // orderBook.C sends a signal when any price or quantity changes in the order book if !updateLimiter.Allow() { break } @@ -207,13 +225,16 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { e.logger.Infof("%s order book changed, checking order...", e.symbol) - /* - if err := e.updateOrder(ctx); err != nil { - e.logger.WithError(err).Errorf("order update failed") - } - */ + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } case <-ticker.C: + changed := monitor.OnUpdateFromBook(e.orderBook) + if !changed { + continue + } + if !updateLimiter.Allow() { break } @@ -222,15 +243,240 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { return } - /* - if err := e.updateOrder(ctx); err != nil { - e.logger.WithError(err).Errorf("order update failed") - } - */ + if err := e.updateOrder(ctx); err != nil { + e.logger.WithError(err).Errorf("order update failed") + } } } } +func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + // if there is no gap between the first price entry and the second price entry + second, ok := sideBook.Second() + if !ok { + return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.symbol, e.side) + } + + tickSize := e.market.TickSize + numOfTicks := fixedpoint.NewFromInt(int64(e.numOfTicks)) + tickSpread := tickSize.Mul(numOfTicks) + + // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { + orders := e.activeMakerOrders.Orders() + + if len(orders) > 1 { + logrus.Warnf("more than 1 %s open orders in the strategy...", e.symbol) + } + + // get the first active order + order := orders[0] + orderPrice := order.Price + // quantity := fixedpoint.NewFromFloat(order.Quantity) + + remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) + if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + return nil + } + + // if the first bid price or first ask price is the same to the current active order + // we should skip updating the order + // DO NOT UPDATE IF: + // tickSpread > 0 AND current order price == second price + tickSpread + // current order price == first price + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + + switch e.side { + case types.SideTypeBuy: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) + return nil + } + + case types.SideTypeSell: + if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } else if orderPrice == first.Price { + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) + return nil + } + } + + if err := e.cancelActiveOrders(ctx); err != nil { + e.logger.Warnf("cancel active orders error: %v", err) + } + } + + orderForm, err := e.generateOrder() + if err != nil { + return err + } else if orderForm == nil { + return nil + } + + createdOrder, err := e.exchange.SubmitOrder(ctx, *orderForm) + if err != nil { + return err + } + + if createdOrder != nil { + e.orderStore.Add(*createdOrder) + e.activeMakerOrders.Add(*createdOrder) + e.tradeCollector.Process() + } + + return nil +} + +func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) { + newPrice := fixedpoint.Zero + book := e.orderBook.Copy() + sideBook := book.SideBook(e.side) + + first, ok := sideBook.First() + if !ok { + return newPrice, fmt.Errorf("empty %s %s side book", e.symbol, e.side) + } + + newPrice = first.Price + spread, ok := book.Spread() + if !ok { + return newPrice, errors.New("can not calculate spread, neither bid price or ask price exists") + } + + tickSize := e.market.TickSize + tickSpread := tickSize.Mul(fixedpoint.NewFromInt(int64(e.numOfTicks))) + if spread.Compare(tickSize) > 0 { + // there is a gap in the spread + tickSpread = fixedpoint.Min(tickSpread, spread.Sub(tickSize)) + switch e.side { + case types.SideTypeSell: + newPrice = newPrice.Sub(tickSpread) + case types.SideTypeBuy: + newPrice = newPrice.Add(tickSpread) + } + } + + if e.stopPrice.Sign() > 0 { + switch e.side { + case types.SideTypeSell: + if newPrice.Compare(e.stopPrice) < 0 { + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + + case types.SideTypeBuy: + if newPrice.Compare(e.stopPrice) > 0 { + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + e.symbol, + newPrice.String(), + e.stopPrice.String(), + e.stopPrice.String()) + newPrice = e.stopPrice + } + } + } + + return newPrice, nil +} + +func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, err error) { + newPrice, err := e.getNewPrice() + if err != nil { + return nil, err + } + + minQuantity := e.market.MinQuantity + base := e.position.GetBase() + + restQuantity := e.targetQuantity.Sub(base.Abs()) + + if restQuantity.Sign() <= 0 { + if e.cancelContextIfTargetQuantityFilled() { + return nil, nil + } + } + + if restQuantity.Compare(minQuantity) < 0 { + return nil, fmt.Errorf("can not continue placing orders, rest quantity %s is less than the min quantity %s", restQuantity.String(), minQuantity.String()) + } + + // when slice = 1000, if we only have 998, we should adjust our quantity to 998 + orderQuantity := fixedpoint.Min(e.sliceQuantity, restQuantity) + + // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round + // if there are rest slices + nextRestQuantity := restQuantity.Sub(e.sliceQuantity) + if nextRestQuantity.Sign() > 0 && nextRestQuantity.Compare(minQuantity) < 0 { + orderQuantity = restQuantity + } + + minNotional := e.market.MinNotional + orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + + balances, err := e.exchange.QueryAccountBalances(e.executionCtx) + if err != nil { + return nil, err + } + + switch e.side { + case types.SideTypeSell: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.BaseCurrency]; ok { + orderQuantity = fixedpoint.Min(b.Available, orderQuantity) + } + + case types.SideTypeBuy: + // check base balance for sell, try to sell as more as possible + if b, ok := balances[e.market.QuoteCurrency]; ok { + orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + } + } + + if e.deadlineTime != nil && !e.deadlineTime.IsZero() { + now := time.Now() + if now.After(*e.deadlineTime) { + orderForm = &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeMarket, + Quantity: restQuantity, + Market: e.market, + } + return orderForm, nil + } + } + + orderForm = &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeLimitMaker, + Quantity: orderQuantity, + Price: newPrice, + Market: e.market, + TimeInForce: "GTC", + } + + return orderForm, err +} + func (e *FixedQuantityExecutor) Start(ctx context.Context) error { if e.marketDataStream != nil { return errors.New("market data stream is not nil, you can't start the executor twice") @@ -248,11 +494,16 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { e.orderBook = types.NewStreamBook(e.symbol) e.orderBook.BindStream(e.marketDataStream) - e.orderStore = core.NewOrderStore(e.symbol) + // private channels + e.userDataStream = e.exchange.NewStream() e.orderStore.BindStream(e.userDataStream) e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol) e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) + e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { + e.logger.Info(trade.String()) + }) + e.tradeCollector.BindStream(e.userDataStream) go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx) From 648e10fd7ca3774e048dc7259a25c34c339c603c Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 19 Aug 2024 15:37:38 +0800 Subject: [PATCH 07/12] binance: fix time in force setting for limit maker --- pkg/exchange/binance/exchange.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index bc3e1001b..e757f0454 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -995,13 +995,15 @@ func (e *Exchange) submitMarginOrder(ctx context.Context, order types.SubmitOrde } // could be IOC or FOK - if len(order.TimeInForce) > 0 { - // TODO: check the TimeInForce value - req.TimeInForce(binance.TimeInForceType(order.TimeInForce)) - } else { - switch order.Type { - case types.OrderTypeLimit, types.OrderTypeStopLimit: - req.TimeInForce(binance.TimeInForceTypeGTC) + switch order.Type { + case types.OrderTypeLimit, types.OrderTypeStopLimit: + req.TimeInForce(binance.TimeInForceTypeGTC) + case types.OrderTypeLimitMaker: + // do not set TimeInForce for LimitMaker + default: + if len(order.TimeInForce) > 0 { + // TODO: check the TimeInForce value + req.TimeInForce(binance.TimeInForceType(order.TimeInForce)) } } From cec078f4bf9cc938f0b756815ba0e934c188365d Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 19 Aug 2024 15:37:56 +0800 Subject: [PATCH 08/12] twap: add stream executor test --- pkg/twap/v2/stream_executor.go | 230 +++++++++++------ pkg/twap/v2/stream_executor_test.go | 115 ++++++++- pkg/types/market.go | 13 + pkg/types/mocks/mock_stream.go | 375 ++++++++++++++++++++++++++++ pkg/types/stream.go | 1 + 5 files changed, 655 insertions(+), 79 deletions(-) create mode 100644 pkg/types/mocks/mock_stream.go diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index b05d7e410..bf5fa7ec9 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -90,9 +90,11 @@ type FixedQuantityExecutor struct { market types.Market marketDataStream types.Stream - orderBook *types.StreamOrderBook - userDataStream types.Stream + orderBook *types.StreamOrderBook + + userDataStream types.Stream + activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position @@ -102,7 +104,9 @@ type FixedQuantityExecutor struct { mu sync.Mutex - done *DoneSignal + userDataStreamConnectC chan struct{} + marketDataStreamConnectC chan struct{} + done *DoneSignal } func NewStreamExecutor( @@ -112,10 +116,25 @@ func NewStreamExecutor( side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value, ) *FixedQuantityExecutor { + + marketDataStream := exchange.NewStream() + marketDataStream.SetPublicOnly() + marketDataStream.Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + + orderBook := types.NewStreamBook(symbol) + orderBook.BindStream(marketDataStream) + + userDataStream := exchange.NewStream() orderStore := core.NewOrderStore(symbol) position := types.NewPositionFromMarket(market) tradeCollector := core.NewTradeCollector(symbol, position, orderStore) - return &FixedQuantityExecutor{ + orderStore.BindStream(userDataStream) + + activeMakerOrders := bbgo.NewActiveOrderBook(symbol) + + e := &FixedQuantityExecutor{ exchange: exchange, symbol: symbol, side: side, @@ -128,11 +147,43 @@ func NewStreamExecutor( "symbol": symbol, }), - orderStore: orderStore, - tradeCollector: tradeCollector, - position: position, - done: NewDoneSignal(), + marketDataStream: marketDataStream, + orderBook: orderBook, + + userDataStream: userDataStream, + + activeMakerOrders: activeMakerOrders, + orderStore: orderStore, + tradeCollector: tradeCollector, + position: position, + done: NewDoneSignal(), + + userDataStreamConnectC: make(chan struct{}), + marketDataStreamConnectC: make(chan struct{}), } + + e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { + e.logger.Info(trade.String()) + }) + e.tradeCollector.BindStream(e.userDataStream) + + activeMakerOrders.OnFilled(e.handleFilledOrder) + activeMakerOrders.BindStream(e.userDataStream) + + e.marketDataStream.OnConnect(func() { + e.logger.Info("market data stream on connect") + close(e.marketDataStreamConnectC) + e.logger.Infof("marketDataStreamConnectC closed") + }) + + // private channels + e.userDataStream.OnAuth(func() { + e.logger.Info("user data stream on auth") + close(e.userDataStreamConnectC) + e.logger.Info("userDataStreamConnectC closed") + }) + + return e } func (e *FixedQuantityExecutor) SetDeadlineTime(t time.Time) { @@ -270,11 +321,11 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { tickSpread := tickSize.Mul(numOfTicks) // check and see if we need to cancel the existing active orders + for e.activeMakerOrders.NumOfOrders() > 0 { orders := e.activeMakerOrders.Orders() - if len(orders) > 1 { - logrus.Warnf("more than 1 %s open orders in the strategy...", e.symbol) + e.logger.Warnf("found more than 1 %s open orders on the orderbook", e.symbol) } // get the first active order @@ -397,40 +448,66 @@ func (e *FixedQuantityExecutor) getNewPrice() (fixedpoint.Value, error) { return newPrice, nil } -func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, err error) { +func (e *FixedQuantityExecutor) getRemainingQuantity() fixedpoint.Value { + base := e.position.GetBase() + return e.targetQuantity.Sub(base.Abs()) +} + +func (e *FixedQuantityExecutor) isDeadlineExceeded() bool { + if e.deadlineTime != nil && !e.deadlineTime.IsZero() { + return time.Since(*e.deadlineTime) > 0 + } + + return false +} + +func (e *FixedQuantityExecutor) calculateNewOrderQuantity(price fixedpoint.Value) (fixedpoint.Value, error) { + minQuantity := e.market.MinQuantity + remainingQuantity := e.getRemainingQuantity() + + if remainingQuantity.Sign() <= 0 { + e.cancelExecution() + return fixedpoint.Zero, nil + } + + if remainingQuantity.Compare(minQuantity) < 0 { + e.logger.Warnf("can not continue placing orders, the remaining quantity %s is less than the min quantity %s", remainingQuantity.String(), minQuantity.String()) + + e.cancelExecution() + return fixedpoint.Zero, nil + } + + // if deadline exceeded, we should return the remaining quantity + if e.isDeadlineExceeded() { + return remainingQuantity, nil + } + + // when slice = 1000, if we only have 998, we should adjust our quantity to 998 + orderQuantity := fixedpoint.Min(e.sliceQuantity, remainingQuantity) + + // if the remaining quantity in the next round is not enough, we should merge the remaining quantity into this round + // if there are rest slices + nextRemainingQuantity := remainingQuantity.Sub(e.sliceQuantity) + + if nextRemainingQuantity.Sign() > 0 && e.market.IsDustQuantity(nextRemainingQuantity, price) { + orderQuantity = remainingQuantity + } + + orderQuantity = e.market.AdjustQuantityByMinNotional(orderQuantity, price) + return orderQuantity, nil +} + +func (e *FixedQuantityExecutor) generateOrder() (*types.SubmitOrder, error) { newPrice, err := e.getNewPrice() if err != nil { return nil, err } - minQuantity := e.market.MinQuantity - base := e.position.GetBase() - - restQuantity := e.targetQuantity.Sub(base.Abs()) - - if restQuantity.Sign() <= 0 { - if e.cancelContextIfTargetQuantityFilled() { - return nil, nil - } + orderQuantity, err := e.calculateNewOrderQuantity(newPrice) + if err != nil { + return nil, err } - if restQuantity.Compare(minQuantity) < 0 { - return nil, fmt.Errorf("can not continue placing orders, rest quantity %s is less than the min quantity %s", restQuantity.String(), minQuantity.String()) - } - - // when slice = 1000, if we only have 998, we should adjust our quantity to 998 - orderQuantity := fixedpoint.Min(e.sliceQuantity, restQuantity) - - // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round - // if there are rest slices - nextRestQuantity := restQuantity.Sub(e.sliceQuantity) - if nextRestQuantity.Sign() > 0 && nextRestQuantity.Compare(minQuantity) < 0 { - orderQuantity = restQuantity - } - - minNotional := e.market.MinNotional - orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) - balances, err := e.exchange.QueryAccountBalances(e.executionCtx) if err != nil { return nil, err @@ -446,67 +523,55 @@ func (e *FixedQuantityExecutor) generateOrder() (orderForm *types.SubmitOrder, e case types.SideTypeBuy: // check base balance for sell, try to sell as more as possible if b, ok := balances[e.market.QuoteCurrency]; ok { - orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + orderQuantity = e.market.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) } } - if e.deadlineTime != nil && !e.deadlineTime.IsZero() { - now := time.Now() - if now.After(*e.deadlineTime) { - orderForm = &types.SubmitOrder{ - Symbol: e.symbol, - Side: e.side, - Type: types.OrderTypeMarket, - Quantity: restQuantity, - Market: e.market, - } - return orderForm, nil - } + if e.isDeadlineExceeded() { + return &types.SubmitOrder{ + Symbol: e.symbol, + Side: e.side, + Type: types.OrderTypeMarket, + Quantity: orderQuantity, + Market: e.market, + }, nil } - orderForm = &types.SubmitOrder{ + return &types.SubmitOrder{ Symbol: e.symbol, Side: e.side, Type: types.OrderTypeLimitMaker, Quantity: orderQuantity, Price: newPrice, Market: e.market, - TimeInForce: "GTC", - } - - return orderForm, err + TimeInForce: types.TimeInForceGTC, + }, nil } func (e *FixedQuantityExecutor) Start(ctx context.Context) error { - if e.marketDataStream != nil { - return errors.New("market data stream is not nil, you can't start the executor twice") + if e.executionCtx != nil { + return errors.New("executionCtx is not nil, you can't start the executor twice") } e.executionCtx, e.cancelExecution = context.WithCancel(ctx) e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(ctx) - e.marketDataStream = e.exchange.NewStream() - e.marketDataStream.SetPublicOnly() - e.marketDataStream.Subscribe(types.BookChannel, e.symbol, types.SubscribeOptions{ - Depth: types.DepthLevelMedium, - }) - - e.orderBook = types.NewStreamBook(e.symbol) - e.orderBook.BindStream(e.marketDataStream) - - // private channels - e.userDataStream = e.exchange.NewStream() - e.orderStore.BindStream(e.userDataStream) - e.activeMakerOrders = bbgo.NewActiveOrderBook(e.symbol) - e.activeMakerOrders.OnFilled(e.handleFilledOrder) - e.activeMakerOrders.BindStream(e.userDataStream) - e.tradeCollector.OnTrade(func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { - e.logger.Info(trade.String()) - }) - e.tradeCollector.BindStream(e.userDataStream) - go e.connectMarketData(e.executionCtx) go e.connectUserData(e.userDataStreamCtx) + + e.logger.Infof("waiting for connections ready...") + if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { + e.cancelExecution() + return fmt.Errorf("market data stream connection timeout") + } + + if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + e.cancelExecution() + return fmt.Errorf("user data stream connection timeout") + } + + e.logger.Infof("connections ready, starting order updater...") + go e.orderUpdater(e.executionCtx) return nil } @@ -540,3 +605,14 @@ func (e *FixedQuantityExecutor) Shutdown(shutdownCtx context.Context) { } } } + +func selectSignalOrTimeout(ctx context.Context, c chan struct{}, timeout time.Duration) bool { + select { + case <-ctx.Done(): + return false + case <-time.After(timeout): + return false + case <-c: + return true + } +} diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index 4b6c2eaf8..842ef21c3 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -1,9 +1,120 @@ package twap import ( + "context" "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" + + "github.com/c9s/bbgo/pkg/fixedpoint" + . "github.com/c9s/bbgo/pkg/testing/testhelper" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/types/mocks" ) -func TestNewStreamExecutorV2(t *testing.T) { - +func getTestMarket() types.Market { + market := types.Market{ + Symbol: "BTCUSDT", + PricePrecision: 8, + VolumePrecision: 8, + QuoteCurrency: "USDT", + BaseCurrency: "BTC", + MinNotional: fixedpoint.MustNewFromString("0.001"), + MinAmount: fixedpoint.MustNewFromString("10.0"), + MinQuantity: fixedpoint.MustNewFromString("0.001"), + } + return market +} + +type CatchMatcher struct { + f func(x any) +} + +func Catch(f func(x any)) *CatchMatcher { + return &CatchMatcher{ + f: f, + } +} + +func (m *CatchMatcher) Matches(x interface{}) bool { + m.f(x) + return true +} + +func (m *CatchMatcher) String() string { + return "CatchMatcher" +} + +func TestNewStreamExecutor(t *testing.T) { + symbol := "BTCUSDT" + market := getTestMarket() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mockEx := mocks.NewMockExchange(mockCtrl) + + marketDataStream := &types.StandardStream{} + userDataStream := &types.StandardStream{} + + mockMarketDataStream := mocks.NewMockStream(mockCtrl) + mockMarketDataStream.EXPECT().SetPublicOnly() + mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ + Depth: types.DepthLevelMedium, + }) + mockMarketDataStream.EXPECT().OnBookSnapshot(Catch(func(x any) { + marketDataStream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockMarketDataStream.EXPECT().OnBookUpdate(Catch(func(x any) { + marketDataStream.OnBookUpdate(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockMarketDataStream.EXPECT().OnConnect(Catch(func(x any) { + marketDataStream.OnConnect(x.(func())) + })).AnyTimes() + mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + mockUserDataStream := mocks.NewMockStream(mockCtrl) + mockUserDataStream.EXPECT().OnOrderUpdate(Catch(func(x any) { + userDataStream.OnOrderUpdate(x.(func(order types.Order))) + })).AnyTimes() + mockUserDataStream.EXPECT().OnTradeUpdate(Catch(func(x any) { + userDataStream.OnTradeUpdate(x.(func(order types.Trade))) + })).AnyTimes() + mockUserDataStream.EXPECT().OnConnect(Catch(func(x any) { + userDataStream.OnConnect(x.(func())) + })).AnyTimes() + mockUserDataStream.EXPECT().OnAuth(Catch(func(x any) { + userDataStream.OnAuth(x.(func())) + })) + mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + + mockEx.EXPECT().NewStream().Return(mockMarketDataStream) + mockEx.EXPECT().NewStream().Return(mockUserDataStream) + + executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, Number(100), Number(1)) + executor.SetUpdateInterval(time.Second) + + go func() { + err := executor.Start(ctx) + assert.NoError(t, err) + }() + + go func() { + time.Sleep(500 * time.Millisecond) + marketDataStream.EmitConnect() + userDataStream.EmitConnect() + userDataStream.EmitAuth() + }() + + select { + case <-ctx.Done(): + case <-time.After(10 * time.Second): + case <-executor.Done(): + } + t.Logf("executor done") } diff --git a/pkg/types/market.go b/pkg/types/market.go index c7135aff3..aae71bd1f 100644 --- a/pkg/types/market.go +++ b/pkg/types/market.go @@ -247,6 +247,19 @@ func (m Market) AdjustQuantityByMinNotional(quantity, currentPrice fixedpoint.Va return quantity } +// AdjustQuantityByMaxAmount adjusts the quantity to make the amount less than the given maxAmount +func (m Market) AdjustQuantityByMaxAmount(quantity, currentPrice, maxAmount fixedpoint.Value) fixedpoint.Value { + // modify quantity for the min amount + amount := currentPrice.Mul(quantity) + if amount.Compare(maxAmount) < 0 { + return quantity + } + + ratio := maxAmount.Div(amount) + quantity = quantity.Mul(ratio) + return m.TruncateQuantity(quantity) +} + type MarketMap map[string]Market func (m MarketMap) Add(market Market) { diff --git a/pkg/types/mocks/mock_stream.go b/pkg/types/mocks/mock_stream.go new file mode 100644 index 000000000..176484aea --- /dev/null +++ b/pkg/types/mocks/mock_stream.go @@ -0,0 +1,375 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/c9s/bbgo/pkg/types (interfaces: Stream) +// +// Generated by this command: +// +// mockgen -destination=mocks/mock_stream.go -package=mocks . Stream +// + +// Package mocks is a generated GoMock package. +package mocks + +import ( + context "context" + reflect "reflect" + + types "github.com/c9s/bbgo/pkg/types" + gomock "go.uber.org/mock/gomock" +) + +// MockStream is a mock of Stream interface. +type MockStream struct { + ctrl *gomock.Controller + recorder *MockStreamMockRecorder +} + +// MockStreamMockRecorder is the mock recorder for MockStream. +type MockStreamMockRecorder struct { + mock *MockStream +} + +// NewMockStream creates a new mock instance. +func NewMockStream(ctrl *gomock.Controller) *MockStream { + mock := &MockStream{ctrl: ctrl} + mock.recorder = &MockStreamMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockStream) EXPECT() *MockStreamMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockStream) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockStreamMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockStream)(nil).Close)) +} + +// Connect mocks base method. +func (m *MockStream) Connect(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Connect", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Connect indicates an expected call of Connect. +func (mr *MockStreamMockRecorder) Connect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Connect", reflect.TypeOf((*MockStream)(nil).Connect), arg0) +} + +// GetPublicOnly mocks base method. +func (m *MockStream) GetPublicOnly() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetPublicOnly") + ret0, _ := ret[0].(bool) + return ret0 +} + +// GetPublicOnly indicates an expected call of GetPublicOnly. +func (mr *MockStreamMockRecorder) GetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPublicOnly", reflect.TypeOf((*MockStream)(nil).GetPublicOnly)) +} + +// GetSubscriptions mocks base method. +func (m *MockStream) GetSubscriptions() []types.Subscription { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetSubscriptions") + ret0, _ := ret[0].([]types.Subscription) + return ret0 +} + +// GetSubscriptions indicates an expected call of GetSubscriptions. +func (mr *MockStreamMockRecorder) GetSubscriptions() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSubscriptions", reflect.TypeOf((*MockStream)(nil).GetSubscriptions)) +} + +// OnAggTrade mocks base method. +func (m *MockStream) OnAggTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAggTrade", arg0) +} + +// OnAggTrade indicates an expected call of OnAggTrade. +func (mr *MockStreamMockRecorder) OnAggTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAggTrade", reflect.TypeOf((*MockStream)(nil).OnAggTrade), arg0) +} + +// OnAuth mocks base method. +func (m *MockStream) OnAuth(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnAuth", arg0) +} + +// OnAuth indicates an expected call of OnAuth. +func (mr *MockStreamMockRecorder) OnAuth(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnAuth", reflect.TypeOf((*MockStream)(nil).OnAuth), arg0) +} + +// OnBalanceSnapshot mocks base method. +func (m *MockStream) OnBalanceSnapshot(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceSnapshot", arg0) +} + +// OnBalanceSnapshot indicates an expected call of OnBalanceSnapshot. +func (mr *MockStreamMockRecorder) OnBalanceSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceSnapshot", reflect.TypeOf((*MockStream)(nil).OnBalanceSnapshot), arg0) +} + +// OnBalanceUpdate mocks base method. +func (m *MockStream) OnBalanceUpdate(arg0 func(types.BalanceMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBalanceUpdate", arg0) +} + +// OnBalanceUpdate indicates an expected call of OnBalanceUpdate. +func (mr *MockStreamMockRecorder) OnBalanceUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBalanceUpdate", reflect.TypeOf((*MockStream)(nil).OnBalanceUpdate), arg0) +} + +// OnBookSnapshot mocks base method. +func (m *MockStream) OnBookSnapshot(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookSnapshot", arg0) +} + +// OnBookSnapshot indicates an expected call of OnBookSnapshot. +func (mr *MockStreamMockRecorder) OnBookSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookSnapshot", reflect.TypeOf((*MockStream)(nil).OnBookSnapshot), arg0) +} + +// OnBookTickerUpdate mocks base method. +func (m *MockStream) OnBookTickerUpdate(arg0 func(types.BookTicker)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookTickerUpdate", arg0) +} + +// OnBookTickerUpdate indicates an expected call of OnBookTickerUpdate. +func (mr *MockStreamMockRecorder) OnBookTickerUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookTickerUpdate", reflect.TypeOf((*MockStream)(nil).OnBookTickerUpdate), arg0) +} + +// OnBookUpdate mocks base method. +func (m *MockStream) OnBookUpdate(arg0 func(types.SliceOrderBook)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnBookUpdate", arg0) +} + +// OnBookUpdate indicates an expected call of OnBookUpdate. +func (mr *MockStreamMockRecorder) OnBookUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnBookUpdate", reflect.TypeOf((*MockStream)(nil).OnBookUpdate), arg0) +} + +// OnConnect mocks base method. +func (m *MockStream) OnConnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnConnect", arg0) +} + +// OnConnect indicates an expected call of OnConnect. +func (mr *MockStreamMockRecorder) OnConnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnConnect", reflect.TypeOf((*MockStream)(nil).OnConnect), arg0) +} + +// OnDisconnect mocks base method. +func (m *MockStream) OnDisconnect(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnDisconnect", arg0) +} + +// OnDisconnect indicates an expected call of OnDisconnect. +func (mr *MockStreamMockRecorder) OnDisconnect(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnDisconnect", reflect.TypeOf((*MockStream)(nil).OnDisconnect), arg0) +} + +// OnForceOrder mocks base method. +func (m *MockStream) OnForceOrder(arg0 func(types.LiquidationInfo)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnForceOrder", arg0) +} + +// OnForceOrder indicates an expected call of OnForceOrder. +func (mr *MockStreamMockRecorder) OnForceOrder(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnForceOrder", reflect.TypeOf((*MockStream)(nil).OnForceOrder), arg0) +} + +// OnFuturesPositionSnapshot mocks base method. +func (m *MockStream) OnFuturesPositionSnapshot(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionSnapshot", arg0) +} + +// OnFuturesPositionSnapshot indicates an expected call of OnFuturesPositionSnapshot. +func (mr *MockStreamMockRecorder) OnFuturesPositionSnapshot(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionSnapshot", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionSnapshot), arg0) +} + +// OnFuturesPositionUpdate mocks base method. +func (m *MockStream) OnFuturesPositionUpdate(arg0 func(types.FuturesPositionMap)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnFuturesPositionUpdate", arg0) +} + +// OnFuturesPositionUpdate indicates an expected call of OnFuturesPositionUpdate. +func (mr *MockStreamMockRecorder) OnFuturesPositionUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnFuturesPositionUpdate", reflect.TypeOf((*MockStream)(nil).OnFuturesPositionUpdate), arg0) +} + +// OnKLine mocks base method. +func (m *MockStream) OnKLine(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLine", arg0) +} + +// OnKLine indicates an expected call of OnKLine. +func (mr *MockStreamMockRecorder) OnKLine(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLine", reflect.TypeOf((*MockStream)(nil).OnKLine), arg0) +} + +// OnKLineClosed mocks base method. +func (m *MockStream) OnKLineClosed(arg0 func(types.KLine)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnKLineClosed", arg0) +} + +// OnKLineClosed indicates an expected call of OnKLineClosed. +func (mr *MockStreamMockRecorder) OnKLineClosed(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnKLineClosed", reflect.TypeOf((*MockStream)(nil).OnKLineClosed), arg0) +} + +// OnMarketTrade mocks base method. +func (m *MockStream) OnMarketTrade(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnMarketTrade", arg0) +} + +// OnMarketTrade indicates an expected call of OnMarketTrade. +func (mr *MockStreamMockRecorder) OnMarketTrade(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnMarketTrade", reflect.TypeOf((*MockStream)(nil).OnMarketTrade), arg0) +} + +// OnOrderUpdate mocks base method. +func (m *MockStream) OnOrderUpdate(arg0 func(types.Order)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnOrderUpdate", arg0) +} + +// OnOrderUpdate indicates an expected call of OnOrderUpdate. +func (mr *MockStreamMockRecorder) OnOrderUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnOrderUpdate", reflect.TypeOf((*MockStream)(nil).OnOrderUpdate), arg0) +} + +// OnRawMessage mocks base method. +func (m *MockStream) OnRawMessage(arg0 func([]byte)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnRawMessage", arg0) +} + +// OnRawMessage indicates an expected call of OnRawMessage. +func (mr *MockStreamMockRecorder) OnRawMessage(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnRawMessage", reflect.TypeOf((*MockStream)(nil).OnRawMessage), arg0) +} + +// OnStart mocks base method. +func (m *MockStream) OnStart(arg0 func()) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnStart", arg0) +} + +// OnStart indicates an expected call of OnStart. +func (mr *MockStreamMockRecorder) OnStart(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnStart", reflect.TypeOf((*MockStream)(nil).OnStart), arg0) +} + +// OnTradeUpdate mocks base method. +func (m *MockStream) OnTradeUpdate(arg0 func(types.Trade)) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "OnTradeUpdate", arg0) +} + +// OnTradeUpdate indicates an expected call of OnTradeUpdate. +func (mr *MockStreamMockRecorder) OnTradeUpdate(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnTradeUpdate", reflect.TypeOf((*MockStream)(nil).OnTradeUpdate), arg0) +} + +// Reconnect mocks base method. +func (m *MockStream) Reconnect() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Reconnect") +} + +// Reconnect indicates an expected call of Reconnect. +func (mr *MockStreamMockRecorder) Reconnect() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Reconnect", reflect.TypeOf((*MockStream)(nil).Reconnect)) +} + +// Resubscribe mocks base method. +func (m *MockStream) Resubscribe(arg0 func([]types.Subscription) ([]types.Subscription, error)) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Resubscribe", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// Resubscribe indicates an expected call of Resubscribe. +func (mr *MockStreamMockRecorder) Resubscribe(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Resubscribe", reflect.TypeOf((*MockStream)(nil).Resubscribe), arg0) +} + +// SetPublicOnly mocks base method. +func (m *MockStream) SetPublicOnly() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "SetPublicOnly") +} + +// SetPublicOnly indicates an expected call of SetPublicOnly. +func (mr *MockStreamMockRecorder) SetPublicOnly() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SetPublicOnly", reflect.TypeOf((*MockStream)(nil).SetPublicOnly)) +} + +// Subscribe mocks base method. +func (m *MockStream) Subscribe(arg0 types.Channel, arg1 string, arg2 types.SubscribeOptions) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Subscribe", arg0, arg1, arg2) +} + +// Subscribe indicates an expected call of Subscribe. +func (mr *MockStreamMockRecorder) Subscribe(arg0, arg1, arg2 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockStream)(nil).Subscribe), arg0, arg1, arg2) +} diff --git a/pkg/types/stream.go b/pkg/types/stream.go index a2246ed68..2da4df7dd 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -24,6 +24,7 @@ var defaultDialer = &websocket.Dialer{ ReadBufferSize: 4096, } +//go:generate mockgen -destination=mocks/mock_stream.go -package=mocks . Stream type Stream interface { StandardStreamEventHub From c8aea81505aec3a14a94c4089fb89a43655e3186 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 19 Aug 2024 17:58:25 +0800 Subject: [PATCH 09/12] twap: implement twap mock testing --- pkg/twap/v2/stream_executor.go | 42 +++-- pkg/twap/v2/stream_executor_test.go | 242 +++++++++++++++++++++++++--- 2 files changed, 246 insertions(+), 38 deletions(-) diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index bf5fa7ec9..042a1fa94 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -8,7 +8,6 @@ import ( "time" "github.com/sirupsen/logrus" - "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" @@ -238,7 +237,7 @@ func (e *FixedQuantityExecutor) cancelActiveOrders(ctx context.Context) error { } func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { - updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) + // updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 2) defer func() { if err := e.cancelActiveOrders(ctx); err != nil { @@ -266,9 +265,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { } // orderBook.C sends a signal when any price or quantity changes in the order book - if !updateLimiter.Allow() { - break - } + /* + if !updateLimiter.Allow() { + break + } + */ if e.cancelContextIfTargetQuantityFilled() { return @@ -286,9 +287,11 @@ func (e *FixedQuantityExecutor) orderUpdater(ctx context.Context) { continue } - if !updateLimiter.Allow() { - break - } + /* + if !updateLimiter.Allow() { + break + } + */ if e.cancelContextIfTargetQuantityFilled() { return @@ -321,7 +324,6 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { tickSpread := tickSize.Mul(numOfTicks) // check and see if we need to cancel the existing active orders - for e.activeMakerOrders.NumOfOrders() > 0 { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { @@ -371,6 +373,8 @@ func (e *FixedQuantityExecutor) updateOrder(ctx context.Context) error { } } + e.tradeCollector.Process() + orderForm, err := e.generateOrder() if err != nil { return err @@ -560,14 +564,10 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { go e.connectUserData(e.userDataStreamCtx) e.logger.Infof("waiting for connections ready...") - if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { - e.cancelExecution() - return fmt.Errorf("market data stream connection timeout") - } - if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + if err := e.WaitForConnection(ctx); err != nil { e.cancelExecution() - return fmt.Errorf("user data stream connection timeout") + return err } e.logger.Infof("connections ready, starting order updater...") @@ -576,6 +576,18 @@ func (e *FixedQuantityExecutor) Start(ctx context.Context) error { return nil } +func (e *FixedQuantityExecutor) WaitForConnection(ctx context.Context) error { + if !selectSignalOrTimeout(ctx, e.marketDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("market data stream connection timeout") + } + + if !selectSignalOrTimeout(ctx, e.userDataStreamConnectC, 10*time.Second) { + return fmt.Errorf("user data stream connection timeout") + } + + return nil +} + // Done returns a channel that emits a signal when the execution is done. func (e *FixedQuantityExecutor) Done() <-chan struct{} { return e.done.Chan() diff --git a/pkg/twap/v2/stream_executor_test.go b/pkg/twap/v2/stream_executor_test.go index 842ef21c3..9ed443e27 100644 --- a/pkg/twap/v2/stream_executor_test.go +++ b/pkg/twap/v2/stream_executor_test.go @@ -2,6 +2,7 @@ package twap import ( "context" + "fmt" "testing" "time" @@ -28,6 +29,29 @@ func getTestMarket() types.Market { return market } +type OrderMatcher struct { + Order types.Order +} + +func MatchOrder(o types.Order) *OrderMatcher { + return &OrderMatcher{ + Order: o, + } +} + +func (m *OrderMatcher) Matches(x interface{}) bool { + order, ok := x.(types.Order) + if !ok { + return false + } + + return m.Order.OrderID == order.OrderID && m.Order.Price.Compare(m.Order.Price) == 0 +} + +func (m *OrderMatcher) String() string { + return fmt.Sprintf("OrderMatcher expects %+v", m.Order) +} + type CatchMatcher struct { f func(x any) } @@ -47,10 +71,44 @@ func (m *CatchMatcher) String() string { return "CatchMatcher" } +func bindMockMarketDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnBookSnapshot(Catch(func(x any) { + stream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnBookUpdate(Catch(func(x any) { + stream.OnBookUpdate(x.(func(book types.SliceOrderBook))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() +} + +func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { + mockStream.EXPECT().OnOrderUpdate(Catch(func(x any) { + stream.OnOrderUpdate(x.(func(order types.Order))) + })).AnyTimes() + mockStream.EXPECT().OnTradeUpdate(Catch(func(x any) { + stream.OnTradeUpdate(x.(func(order types.Trade))) + })).AnyTimes() + mockStream.EXPECT().OnBalanceUpdate(Catch(func(x any) { + stream.OnBalanceUpdate(x.(func(m types.BalanceMap))) + })).AnyTimes() + mockStream.EXPECT().OnConnect(Catch(func(x any) { + stream.OnConnect(x.(func())) + })).AnyTimes() + mockStream.EXPECT().OnAuth(Catch(func(x any) { + stream.OnAuth(x.(func())) + })) +} + func TestNewStreamExecutor(t *testing.T) { + exchangeName := types.ExchangeBinance symbol := "BTCUSDT" market := getTestMarket() + targetQuantity := Number(100) + sliceQuantity := Number(1) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -67,37 +125,53 @@ func TestNewStreamExecutor(t *testing.T) { mockMarketDataStream.EXPECT().Subscribe(types.BookChannel, symbol, types.SubscribeOptions{ Depth: types.DepthLevelMedium, }) - mockMarketDataStream.EXPECT().OnBookSnapshot(Catch(func(x any) { - marketDataStream.OnBookSnapshot(x.(func(book types.SliceOrderBook))) - })).AnyTimes() - mockMarketDataStream.EXPECT().OnBookUpdate(Catch(func(x any) { - marketDataStream.OnBookUpdate(x.(func(book types.SliceOrderBook))) - })).AnyTimes() - mockMarketDataStream.EXPECT().OnConnect(Catch(func(x any) { - marketDataStream.OnConnect(x.(func())) - })).AnyTimes() + + bindMockMarketDataStream(mockMarketDataStream, marketDataStream) + mockMarketDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) mockUserDataStream := mocks.NewMockStream(mockCtrl) - mockUserDataStream.EXPECT().OnOrderUpdate(Catch(func(x any) { - userDataStream.OnOrderUpdate(x.(func(order types.Order))) - })).AnyTimes() - mockUserDataStream.EXPECT().OnTradeUpdate(Catch(func(x any) { - userDataStream.OnTradeUpdate(x.(func(order types.Trade))) - })).AnyTimes() - mockUserDataStream.EXPECT().OnConnect(Catch(func(x any) { - userDataStream.OnConnect(x.(func())) - })).AnyTimes() - mockUserDataStream.EXPECT().OnAuth(Catch(func(x any) { - userDataStream.OnAuth(x.(func())) - })) + bindMockUserDataStream(mockUserDataStream, userDataStream) mockUserDataStream.EXPECT().Connect(gomock.AssignableToTypeOf(ctx)) + initialBalances := types.BalanceMap{ + "BTC": types.Balance{ + Available: Number(2), + }, + "USDT": types.Balance{ + Available: Number(20_000), + }, + } + mockEx.EXPECT().NewStream().Return(mockMarketDataStream) mockEx.EXPECT().NewStream().Return(mockUserDataStream) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) - executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, Number(100), Number(1)) - executor.SetUpdateInterval(time.Second) + // first order + firstSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19400), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + firstSubmitOrderTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + firstOrder := types.Order{ + SubmitOrder: firstSubmitOrder, + Exchange: exchangeName, + OrderID: 1, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(firstSubmitOrderTime), + UpdateTime: types.Time(firstSubmitOrderTime), + } + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), firstSubmitOrder).Return(&firstOrder, nil) + + executor := NewStreamExecutor(mockEx, symbol, market, types.SideTypeBuy, targetQuantity, sliceQuantity) + executor.SetUpdateInterval(200 * time.Millisecond) go func() { err := executor.Start(ctx) @@ -111,6 +185,128 @@ func TestNewStreamExecutor(t *testing.T) { userDataStream.EmitAuth() }() + err := executor.WaitForConnection(ctx) + assert.NoError(t, err) + + t.Logf("sending book snapshot...") + snapshotTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + marketDataStream.EmitBookSnapshot(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19400), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + time.Sleep(500 * time.Millisecond) + + t.Logf("sending book update...") + + // we expect the second order will be placed when the order update is received + secondSubmitOrder := types.SubmitOrder{ + Symbol: symbol, + Side: types.SideTypeBuy, + Type: types.OrderTypeLimitMaker, + Quantity: Number(1), + Price: Number(19420), + Market: market, + TimeInForce: types.TimeInForceGTC, + } + secondSubmitOrderTime := time.Date(2021, 1, 1, 0, 1, 0, 0, time.UTC) + secondOrder := types.Order{ + SubmitOrder: secondSubmitOrder, + Exchange: exchangeName, + OrderID: 2, + Status: types.OrderStatusNew, + ExecutedQuantity: Number(0.0), + IsWorking: true, + CreationTime: types.Time(secondSubmitOrderTime), + UpdateTime: types.Time(secondSubmitOrderTime), + } + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(firstOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := firstOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order update: %+v", orderUpdate) + return nil + }) + mockEx.EXPECT().QueryAccountBalances(gomock.AssignableToTypeOf(ctx)).Return(initialBalances, nil) + mockEx.EXPECT().SubmitOrder(gomock.AssignableToTypeOf(ctx), secondSubmitOrder).Return(&secondOrder, nil) + + t.Logf("waiting for the order update...") + time.Sleep(500 * time.Millisecond) + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + marketDataStream.EmitBookUpdate(types.SliceOrderBook{ + Symbol: symbol, + Bids: types.PriceVolumeSlice{ + {Price: Number(19420), Volume: Number(1)}, + {Price: Number(19300), Volume: Number(2)}, + {Price: Number(19200), Volume: Number(3)}, + }, + Asks: types.PriceVolumeSlice{ + {Price: Number(19450), Volume: Number(1)}, + {Price: Number(19550), Volume: Number(2)}, + {Price: Number(19650), Volume: Number(3)}, + }, + Time: snapshotTime, + LastUpdateId: 101, + }) + + t.Logf("waiting for the next order update...") + time.Sleep(500 * time.Millisecond) + + { + orders := executor.orderStore.Orders() + assert.Len(t, orders, 1, "should have 1 order in the order store") + } + + t.Logf("emitting trade update...") + userDataStream.EmitTradeUpdate(types.Trade{ + ID: 1, + OrderID: 2, + Exchange: exchangeName, + Price: Number(19420.0), + Quantity: Number(100.0), + QuoteQuantity: Number(100.0 * 19420.0), + Symbol: symbol, + Side: types.SideTypeBuy, + IsBuyer: true, + IsMaker: true, + Time: types.Time(secondSubmitOrderTime), + }) + + t.Logf("waiting for the trade callbacks...") + time.Sleep(500 * time.Millisecond) + + executor.tradeCollector.Process() + assert.Equal(t, Number(100), executor.position.GetBase()) + + mockEx.EXPECT().CancelOrders(context.Background(), MatchOrder(secondOrder)).DoAndReturn(func( + ctx context.Context, orders ...types.Order, + ) error { + orderUpdate := secondOrder + orderUpdate.Status = types.OrderStatusCanceled + userDataStream.EmitOrderUpdate(orderUpdate) + t.Logf("emit order #2 update: %+v", orderUpdate) + return nil + }) + assert.True(t, executor.cancelContextIfTargetQuantityFilled(), "target quantity should be filled") + + // finalizing and stop the executor select { case <-ctx.Done(): case <-time.After(10 * time.Second): From d8fad8250c2ab09df9cc9d2d482973df080f780f Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 20 Aug 2024 14:01:19 +0800 Subject: [PATCH 10/12] fix duplicated field --- pkg/bbgo/order_executor_general.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/bbgo/order_executor_general.go b/pkg/bbgo/order_executor_general.go index 64df59c00..2046b2ffa 100644 --- a/pkg/bbgo/order_executor_general.go +++ b/pkg/bbgo/order_executor_general.go @@ -85,7 +85,6 @@ func NewGeneralOrderExecutor( executor := &GeneralOrderExecutor{ BaseOrderExecutor: BaseOrderExecutor{ - exchange: session.Exchange, session: session, exchange: session.Exchange, activeMakerOrders: NewActiveOrderBook(symbol), From 05308098346f10d55edfc9d2598723b866796d7c Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 20 Aug 2024 14:10:22 +0800 Subject: [PATCH 11/12] fix position test --- pkg/risk/riskcontrol/position_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/risk/riskcontrol/position_test.go b/pkg/risk/riskcontrol/position_test.go index 554bac94e..7893b3445 100644 --- a/pkg/risk/riskcontrol/position_test.go +++ b/pkg/risk/riskcontrol/position_test.go @@ -23,7 +23,7 @@ func Test_ModifiedQuantity(t *testing.T) { BaseCurrency: "BTC", }, } - orderExecutor := bbgo.NewGeneralOrderExecutor(nil, "BTCUSDT", "strategy", "strategy-1", pos) + orderExecutor := bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, "BTCUSDT", "strategy", "strategy-1", pos) riskControl := NewPositionRiskControl(orderExecutor, fixedpoint.NewFromInt(10), fixedpoint.NewFromInt(2)) cases := []struct { From b9c41b7ad7cb01535bef91c4b72a726119de9602 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 20 Aug 2024 14:14:19 +0800 Subject: [PATCH 12/12] twap: improve cancelContextIfTargetQuantityFilled check method --- pkg/twap/v2/stream_executor.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/twap/v2/stream_executor.go b/pkg/twap/v2/stream_executor.go index 042a1fa94..caa43686d 100644 --- a/pkg/twap/v2/stream_executor.go +++ b/pkg/twap/v2/stream_executor.go @@ -220,10 +220,14 @@ func (e *FixedQuantityExecutor) handleFilledOrder(order types.Order) { } func (e *FixedQuantityExecutor) cancelContextIfTargetQuantityFilled() bool { + // ensure that the trades are processed + e.tradeCollector.Process() + + // now get the base quantity from the position base := e.position.GetBase() - if base.Abs().Compare(e.targetQuantity) >= 0 { - e.logger.Infof("filled target quantity, canceling the order execution context") + if base.Abs().Sub(e.targetQuantity).Compare(e.market.MinQuantity.Neg()) >= 0 { + e.logger.Infof("position is filled with target quantity, canceling the order execution context") e.cancelExecution() return true }