From 621a2b86cf9a5d8f2dabade2686d69a718a071a2 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 17 Aug 2024 13:29:27 +0800 Subject: [PATCH] twap: move twap execution to a single package --- pkg/cmd/orders.go | 3 +- .../stream_executor.go} | 85 ++++++++++--------- 2 files changed, 45 insertions(+), 43 deletions(-) rename pkg/{bbgo/twap_order_executor.go => twap/stream_executor.go} (78%) diff --git a/pkg/cmd/orders.go b/pkg/cmd/orders.go index 475237a20..1dd5d8c9d 100644 --- a/pkg/cmd/orders.go +++ b/pkg/cmd/orders.go @@ -14,6 +14,7 @@ import ( "github.com/spf13/cobra" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/twap" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/types" @@ -255,7 +256,7 @@ var executeOrderCmd = &cobra.Command{ executionCtx, cancelExecution := context.WithCancel(ctx) defer cancelExecution() - execution := &bbgo.TwapExecution{ + execution := &twap.Execution{ Session: session, Symbol: symbol, Side: side, diff --git a/pkg/bbgo/twap_order_executor.go b/pkg/twap/stream_executor.go similarity index 78% rename from pkg/bbgo/twap_order_executor.go rename to pkg/twap/stream_executor.go index 05ad9e5e4..6ac4a809b 100644 --- a/pkg/bbgo/twap_order_executor.go +++ b/pkg/twap/stream_executor.go @@ -1,4 +1,4 @@ -package bbgo +package twap import ( "context" @@ -7,16 +7,17 @@ import ( "time" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) -type TwapExecution struct { - Session *ExchangeSession +type Execution struct { + Session *bbgo.ExchangeSession Symbol string Side types.SideType TargetQuantity fixedpoint.Value @@ -37,7 +38,7 @@ type TwapExecution struct { currentPrice fixedpoint.Value activePosition fixedpoint.Value - activeMakerOrders *ActiveOrderBook + activeMakerOrders *bbgo.ActiveOrderBook orderStore *core.OrderStore position *types.Position @@ -51,21 +52,21 @@ type TwapExecution struct { mu sync.Mutex } -func (e *TwapExecution) connectMarketData(ctx context.Context) { - log.Infof("connecting market data stream...") +func (e *Execution) connectMarketData(ctx context.Context) { + logrus.Infof("connecting market data stream...") if err := e.marketDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("market data stream connect error") + logrus.WithError(err).Errorf("market data stream connect error") } } -func (e *TwapExecution) connectUserData(ctx context.Context) { - log.Infof("connecting user data stream...") +func (e *Execution) connectUserData(ctx context.Context) { + logrus.Infof("connecting user data stream...") if err := e.userDataStream.Connect(ctx); err != nil { - log.WithError(err).Errorf("user data stream connect error") + logrus.WithError(err).Errorf("user data stream connect error") } } -func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { +func (e *Execution) newBestPriceOrder() (orderForm types.SubmitOrder, err error) { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -111,7 +112,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er switch e.Side { case types.SideTypeSell: if newPrice.Compare(e.StopPrice) < 0 { - log.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", + logrus.Infof("%s order price %s is lower than the stop sell price %s, setting order price to the stop sell price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -121,7 +122,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: if newPrice.Compare(e.StopPrice) > 0 { - log.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", + logrus.Infof("%s order price %s is higher than the stop buy price %s, setting order price to the stop buy price %s", e.Symbol, newPrice.String(), e.StopPrice.String(), @@ -157,7 +158,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er } minNotional := e.market.MinNotional - orderQuantity = AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) + orderQuantity = bbgo.AdjustQuantityByMinAmount(orderQuantity, newPrice, minNotional) switch e.Side { case types.SideTypeSell: @@ -169,11 +170,11 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er case types.SideTypeBuy: // check base balance for sell, try to sell as more as possible if b, ok := e.Session.GetAccount().Balance(e.market.QuoteCurrency); ok { - orderQuantity = AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) + orderQuantity = bbgo.AdjustQuantityByMaxAmount(orderQuantity, newPrice, b.Available) } } - if e.DeadlineTime != emptyTime { + if !e.DeadlineTime.IsZero() { now := time.Now() if now.After(e.DeadlineTime) { orderForm = types.SubmitOrder{ @@ -200,7 +201,7 @@ func (e *TwapExecution) newBestPriceOrder() (orderForm types.SubmitOrder, err er return orderForm, err } -func (e *TwapExecution) updateOrder(ctx context.Context) error { +func (e *Execution) updateOrder(ctx context.Context) error { book := e.orderBook.Copy() sideBook := book.SideBook(e.Side) @@ -224,7 +225,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { orders := e.activeMakerOrders.Orders() if len(orders) > 1 { - log.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) + logrus.Warnf("more than 1 %s open orders in the strategy...", e.Symbol) } // get the first order @@ -234,7 +235,7 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { remainingQuantity := order.Quantity.Sub(order.ExecutedQuantity) if remainingQuantity.Compare(e.market.MinQuantity) <= 0 { - log.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) + logrus.Infof("order remaining quantity %s is less than the market minimal quantity %s, skip updating order", remainingQuantity.String(), e.market.MinQuantity.String()) return nil } @@ -243,24 +244,24 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { // DO NOT UPDATE IF: // tickSpread > 0 AND current order price == second price + tickSpread // current order price == first price - log.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) + logrus.Infof("orderPrice = %s first.Price = %s second.Price = %s tickSpread = %s", orderPrice.String(), first.Price.String(), second.Price.String(), tickSpread.String()) switch e.Side { case types.SideTypeBuy: if tickSpread.Sign() > 0 && orderPrice == second.Price.Add(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best bid price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best bid price %s", orderPrice.String()) return nil } case types.SideTypeSell: if tickSpread.Sign() > 0 && orderPrice == second.Price.Sub(tickSpread) { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } else if orderPrice == first.Price { - log.Infof("the current order is already on the best ask price %s", orderPrice.String()) + logrus.Infof("the current order is already on the best ask price %s", orderPrice.String()) return nil } } @@ -283,13 +284,13 @@ func (e *TwapExecution) updateOrder(ctx context.Context) error { return nil } -func (e *TwapExecution) cancelActiveOrders() { +func (e *Execution) cancelActiveOrders() { gracefulCtx, gracefulCancel := context.WithTimeout(context.TODO(), 30*time.Second) defer gracefulCancel() e.activeMakerOrders.GracefulCancel(gracefulCtx, e.Session.Exchange) } -func (e *TwapExecution) orderUpdater(ctx context.Context) { +func (e *Execution) orderUpdater(ctx context.Context) { updateLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) ticker := time.NewTimer(e.UpdateInterval) defer ticker.Stop() @@ -317,9 +318,9 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { return } - log.Infof("%s order book changed, checking order...", e.Symbol) + logrus.Infof("%s order book changed, checking order...", e.Symbol) if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } case <-ticker.C: @@ -332,25 +333,25 @@ func (e *TwapExecution) orderUpdater(ctx context.Context) { } if err := e.updateOrder(ctx); err != nil { - log.WithError(err).Errorf("order update failed") + logrus.WithError(err).Errorf("order update failed") } } } } -func (e *TwapExecution) cancelContextIfTargetQuantityFilled() bool { +func (e *Execution) cancelContextIfTargetQuantityFilled() bool { base := e.position.GetBase() if base.Abs().Compare(e.TargetQuantity) >= 0 { - log.Infof("filled target quantity, canceling the order execution context") + logrus.Infof("filled target quantity, canceling the order execution context") e.cancelExecution() return true } return false } -func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { +func (e *Execution) handleTradeUpdate(trade types.Trade) { // ignore trades that are not in the symbol we interested if trade.Symbol != e.Symbol { return @@ -360,21 +361,21 @@ func (e *TwapExecution) handleTradeUpdate(trade types.Trade) { return } - log.Info(trade.String()) + logrus.Info(trade.String()) e.position.AddTrade(trade) - log.Infof("position updated: %+v", e.position) + logrus.Infof("position updated: %+v", e.position) } -func (e *TwapExecution) handleFilledOrder(order types.Order) { - log.Info(order.String()) +func (e *Execution) handleFilledOrder(order types.Order) { + logrus.Info(order.String()) // filled event triggers the order removal from the active order store // we need to ensure we received every order update event before the execution is done. e.cancelContextIfTargetQuantityFilled() } -func (e *TwapExecution) Run(parentCtx context.Context) error { +func (e *Execution) Run(parentCtx context.Context) error { e.mu.Lock() e.stoppedC = make(chan struct{}) e.executionCtx, e.cancelExecution = context.WithCancel(parentCtx) @@ -409,7 +410,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { e.orderStore = core.NewOrderStore(e.Symbol) e.orderStore.BindStream(e.userDataStream) - e.activeMakerOrders = NewActiveOrderBook(e.Symbol) + e.activeMakerOrders = bbgo.NewActiveOrderBook(e.Symbol) e.activeMakerOrders.OnFilled(e.handleFilledOrder) e.activeMakerOrders.BindStream(e.userDataStream) @@ -418,7 +419,7 @@ func (e *TwapExecution) Run(parentCtx context.Context) error { return nil } -func (e *TwapExecution) emitDone() { +func (e *Execution) emitDone() { e.mu.Lock() if e.stoppedC == nil { e.stoppedC = make(chan struct{}) @@ -427,7 +428,7 @@ func (e *TwapExecution) emitDone() { e.mu.Unlock() } -func (e *TwapExecution) Done() (c <-chan struct{}) { +func (e *Execution) Done() (c <-chan struct{}) { e.mu.Lock() // if the channel is not allocated, it means it's not started yet, we need to return a closed channel if e.stoppedC == nil { @@ -447,7 +448,7 @@ func (e *TwapExecution) Done() (c <-chan struct{}) { // We need to: // 1. stop the order updater (by using the execution context) // 2. the order updater cancels all open orders and close the user data stream -func (e *TwapExecution) Shutdown(shutdownCtx context.Context) { +func (e *Execution) Shutdown(shutdownCtx context.Context) { e.mu.Lock() if e.cancelExecution != nil { e.cancelExecution()