From bb34b1002a6a3f1c1c60fefd3d2f276407b26d2f Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 14 May 2021 14:52:19 +0800 Subject: [PATCH] improve order execution graceful shutdown --- pkg/bbgo/order_execution.go | 10 +- pkg/bbgo/order_processor.go | 17 ++- pkg/bbgo/order_processor_test.go | 2 +- pkg/bbgo/twap_order_executor.go | 191 ++++++++++++++++++++++++++++--- 4 files changed, 194 insertions(+), 26 deletions(-) diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index fc169e7d2..b701272ff 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -162,10 +162,10 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... // Increase the quantity if the amount is not enough, // this is the only increase op, later we will decrease the quantity if it meets the criteria - quantity = AdjustQuantityByMinAmount(quantity, price, market.MinAmount*1.01) + quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinAmount*1.01) if c.MaxOrderAmount > 0 { - quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) } quoteAssetQuota := math.Max(0.0, quoteBalance.Available.Float64()-c.MinQuoteBalance.Float64()) @@ -178,7 +178,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... continue } - quantity = AdjustQuantityByMaxAmount(quantity, price, quoteAssetQuota) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, quoteAssetQuota) // if MaxBaseAssetBalance is enabled, we should check the current base asset balance if baseBalance, hasBaseAsset := balances[market.BaseCurrency]; hasBaseAsset && c.MaxBaseAssetBalance > 0 { @@ -226,7 +226,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... } // if the amount is too small, we should increase it. - quantity = AdjustQuantityByMinAmount(quantity, price, market.MinNotional*1.01) + quantity = AdjustFloatQuantityByMinAmount(quantity, price, market.MinNotional*1.01) // we should not SELL too much quantity = math.Min(quantity, baseAssetBalance.Available.Float64()) @@ -253,7 +253,7 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... } if c.MaxOrderAmount > 0 { - quantity = AdjustQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) + quantity = AdjustFloatQuantityByMaxAmount(quantity, price, c.MaxOrderAmount.Float64()) } notional := quantity * lastPrice diff --git a/pkg/bbgo/order_processor.go b/pkg/bbgo/order_processor.go index b891fd010..4d203fee7 100644 --- a/pkg/bbgo/order_processor.go +++ b/pkg/bbgo/order_processor.go @@ -1,6 +1,7 @@ package bbgo import ( + "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/pkg/errors" ) @@ -14,7 +15,19 @@ var ( ) // AdjustQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount -func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 { +func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount fixedpoint.Value) fixedpoint.Value { + // modify quantity for the min amount + amount := currentPrice.Mul(quantity) + if amount < minAmount { + ratio := minAmount.Div(amount) + quantity = quantity.Mul(ratio) + } + + return quantity +} + +// AdjustFloatQuantityByMinAmount adjusts the quantity to make the amount greater than the given minAmount +func AdjustFloatQuantityByMinAmount(quantity, currentPrice, minAmount float64) float64 { // modify quantity for the min amount amount := currentPrice * quantity if amount < minAmount { @@ -25,7 +38,7 @@ func AdjustQuantityByMinAmount(quantity, currentPrice, minAmount float64) float6 return quantity } -func AdjustQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 { +func AdjustFloatQuantityByMaxAmount(quantity float64, price float64, maxAmount float64) float64 { amount := price * quantity if amount > maxAmount { ratio := maxAmount / amount diff --git a/pkg/bbgo/order_processor_test.go b/pkg/bbgo/order_processor_test.go index df1687e96..3e522a9d9 100644 --- a/pkg/bbgo/order_processor_test.go +++ b/pkg/bbgo/order_processor_test.go @@ -36,7 +36,7 @@ func TestAdjustQuantityByMinAmount(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - q := AdjustQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount) + q := AdjustFloatQuantityByMinAmount(test.args.quantity, test.args.price, test.args.minAmount) assert.Equal(t, test.wanted, q) }) } diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/bbgo/twap_order_executor.go index f4dbfa555..15cd2d276 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/bbgo/twap_order_executor.go @@ -13,18 +13,20 @@ import ( "golang.org/x/time/rate" ) -const OrderExecutionReady = 1 - type TwapExecution struct { Session *ExchangeSession Symbol string Side types.SideType TargetQuantity fixedpoint.Value SliceQuantity fixedpoint.Value + StopPrice fixedpoint.Value market types.Market marketDataStream types.Stream - userDataStream types.Stream + + userDataStream types.Stream + userDataStreamCtx context.Context + cancelUserDataStream context.CancelFunc orderBook *types.StreamOrderBook currentPrice fixedpoint.Value @@ -34,6 +36,11 @@ type TwapExecution struct { orderStore *OrderStore position *Position + executionCtx context.Context + cancelExecution context.CancelFunc + + stoppedC chan struct{} + state int mu sync.Mutex @@ -84,7 +91,6 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } newPrice := first.Price - spread, ok := book.Spread() if !ok { return orderForm, errors.New("can not calculate spread, neither bid price or ask price exists") @@ -101,12 +107,55 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } } + if e.StopPrice > 0 { + switch e.Side { + case types.SideTypeSell: + if newPrice < e.StopPrice { + log.Infof("%s order price %f is lower than the stop sell price %f, setting order price to the stop sell price %f", + e.Symbol, + newPrice.Float64(), + e.StopPrice.Float64(), + e.StopPrice.Float64()) + newPrice = e.StopPrice + } + + case types.SideTypeBuy: + if newPrice > e.StopPrice { + log.Infof("%s order price %f is higher than the stop buy price %f, setting order price to the stop buy price %f", + e.Symbol, + newPrice.Float64(), + e.StopPrice.Float64(), + e.StopPrice.Float64()) + newPrice = e.StopPrice + } + } + } + + minQuantity := fixedpoint.NewFromFloat(e.market.MinQuantity) + restQuantity := e.TargetQuantity - fixedpoint.Abs(e.position.Base) + if restQuantity < minQuantity { + return orderForm, fmt.Errorf("can not continue placing orders, rest quantity %f is less than the min quantity %f", restQuantity.Float64(), minQuantity.Float64()) + } + + // if the rest quantity in the next round is not enough, we should merge the rest quantity into this round + orderQuantity := e.SliceQuantity + nextRestQuantity := restQuantity - e.SliceQuantity + if nextRestQuantity < minQuantity { + orderQuantity = restQuantity + } + + minNotional := fixedpoint.NewFromFloat(e.market.MinNotional) + orderAmount := newPrice.Mul(orderQuantity) + if orderAmount <= minNotional { + orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + } + orderForm = types.SubmitOrder{ // ClientOrderID: "", Symbol: e.Symbol, Side: e.Side, Type: types.OrderTypeLimitMaker, - Quantity: e.SliceQuantity.Float64(), + Quantity: orderQuantity.Float64(), Price: newPrice.Float64(), Market: e.market, TimeInForce: "GTC", @@ -115,6 +164,7 @@ func (e *TwapExecution) newBestPriceMakerOrder() (orderForm types.SubmitOrder, e } func (e *TwapExecution) updateOrder(ctx context.Context) error { + sideBook, err := e.getSideBook() if err != nil { return err @@ -132,7 +182,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { - log.Warnf("there are more than 1 open orders in the strategy...") + log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) } // get the first order @@ -140,6 +190,29 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { price := fixedpoint.NewFromFloat(order.Price) quantity := fixedpoint.NewFromFloat(order.Quantity) + remainingQuantity := order.Quantity - order.ExecutedQuantity + if remainingQuantity <= e.market.MinQuantity { + log.Infof("order remaining quantity %f is less than the market minimal quantity %f, skip updating order", remainingQuantity, e.market.MinQuantity) + return nil + } + + if e.StopPrice > 0 { + switch e.Side { + case types.SideTypeBuy: + if first.Price > e.StopPrice { + log.Infof("%s first bid price %f is higher than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64()) + return nil + } + + case types.SideTypeSell: + if first.Price < e.StopPrice { + log.Infof("%s first ask price %f is lower than the stop price %f, skip updating order", e.Symbol, first.Price.Float64(), e.StopPrice.Float64()) + return nil + } + } + + } + // if the first bid price or first ask price is the same to the current active order // we should skip updating the order if first.Price == price { @@ -151,12 +224,12 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // if there is no gap between the first price entry and the second price entry second, ok := sideBook.Second() if !ok { - return fmt.Errorf("there is no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) + return fmt.Errorf("no secoond price on the %s order book %s, can not update", e.Symbol, e.Side) } // if there is no gap if fixedpoint.Abs(first.Price-second.Price) == tickSize { - log.Infof("there is no gap between the second price %f and the first price %f (tick size = %f), skip updating", + log.Infof("no gap between the second price %f and the first price %f (tick size = %f), skip updating", first.Price.Float64(), second.Price.Float64(), tickSize.Float64()) @@ -187,8 +260,10 @@ func (e *TwapExecution) cancelActiveOrders(ctx context.Context) { for e.activeMakerOrders.NumOfOrders() > 0 { didCancel = true - log.Infof("canceling open orders...") orders := e.activeMakerOrders.Orders() + log.Infof("canceling %d open orders:", len(orders)) + e.activeMakerOrders.Print() + if err := e.Session.Exchange.CancelOrders(ctx, orders...); err != nil { log.WithError(err).Errorf("can not cancel %s orders", e.Symbol) } @@ -205,8 +280,13 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { ticker := time.NewTimer(5 * time.Second) defer ticker.Stop() + // we should stop updater and clean up our open orders, if + // 1. the given context is canceled. + // 2. the base quantity equals to or greater than the target quantity defer func() { e.cancelActiveOrders(context.Background()) + e.cancelUserDataStream() + e.emitDone() }() for { @@ -219,6 +299,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { break } + if e.cancelContextIfTargetQuantityFilled() { + return + } + if err := e.updateOrder(ctx); err != nil { log.WithError(err).Errorf("order update failed") } @@ -228,6 +312,10 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { break } + if e.cancelContextIfTargetQuantityFilled() { + return + } + if err := e.updateOrder(ctx); err != nil { log.WithError(err).Errorf("order update failed") } @@ -236,6 +324,15 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } } +func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { + if fixedpoint.Abs(e.position.Base) >= e.TargetQuantity { + log.Infof("filled target quantity, canceling the order execution context") + e.cancelExecution() + return true + } + return false +} + func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { @@ -246,19 +343,27 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { return } - q := fixedpoint.NewFromFloat(trade.Quantity) - _ = q + log.Info(trade.String()) e.position.AddTrade(trade) - log.Infof("position updated: %+v", e.position) } func (e *TwapExecution) handleFilledOrder(order types.Order) { - log.Infof("order is filled: %s", order.String()) + log.Info(order.String()) + + // filled event triggers the order removal from the active order store + // we need to ensure we received every order update event before the execution is done. + e.cancelContextIfTargetQuantityFilled() } -func (e *TwapExecution) Run(ctx context.Context) error { +func (e *TwapExecution) Run(parentCtx context.Context) error { + e.mu.Lock() + e.stoppedC = make(chan struct{}) + e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) + e.userDataStreamCtx, e.cancelUserDataStream = context.WithCancel(context.Background()) + e.mu.Unlock() + var ok bool e.market, ok = e.Session.Market(e.Symbol) if !ok { @@ -271,7 +376,7 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.orderBook = types.NewStreamBook(e.Symbol) e.orderBook.BindStream(e.marketDataStream) - go e.connectMarketData(ctx) + go e.connectMarketData(e.executionCtx) e.userDataStream = e.Session.Exchange.NewStream() e.userDataStream.OnTradeUpdate(e.handleTradeUpdate) @@ -287,11 +392,60 @@ func (e *TwapExecution) Run(ctx context.Context) error { e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) - go e.connectUserData(ctx) - go e.orderUpdater(ctx) + go e.connectUserData(e.userDataStreamCtx) + go e.orderUpdater(e.executionCtx) return nil } +func (e *TwapExecution) emitDone() { + e.mu.Lock() + if e.stoppedC == nil { + e.stoppedC = make(chan struct{}) + } + close(e.stoppedC) + e.mu.Unlock() +} + +func (e *TwapExecution) Done() (c <-chan struct{}) { + e.mu.Lock() + // if the channel is not allocated, it means it's not started yet, we need to return a closed channel + if e.stoppedC == nil { + e.stoppedC = make(chan struct{}) + close(e.stoppedC) + c = e.stoppedC + } else { + c = e.stoppedC + } + + e.mu.Unlock() + return c +} + +// Shutdown stops the execution +// If we call this method, it means the execution is still running, +// We need to: +// 1. stop the order updater (by using the execution context) +// 2. the order updater cancels all open orders and close the user data stream +func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { + e.mu.Lock() + if e.cancelExecution != nil { + e.cancelExecution() + } + e.mu.Unlock() + + for { + select { + + case <-shutdownCtx.Done(): + return + + case <-e.Done(): + return + + } + } +} + type TwapOrderExecutor struct { Session *ExchangeSession @@ -300,13 +454,14 @@ type TwapOrderExecutor struct { DelayTime types.Duration } -func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity fixedpoint.Value) (*TwapExecution, error) { +func (e *TwapOrderExecutor) Execute(ctx context.Context, symbol string, side types.SideType, targetQuantity, sliceQuantity, stopPrice fixedpoint.Value) (*TwapExecution, error) { execution := &TwapExecution{ Session: e.Session, Symbol: symbol, Side: side, TargetQuantity: targetQuantity, SliceQuantity: sliceQuantity, + StopPrice: stopPrice, } err := execution.Run(ctx) return execution, err