From 553976449d4e286106e0e19be7af64a94e1d0072 Mon Sep 17 00:00:00 2001 From: kbearXD Date: Thu, 21 Mar 2024 16:18:48 +0800 Subject: [PATCH] FEATURE: [dca2] when all open-position orders are filled, place the take-profit order --- pkg/strategy/dca2/active_order_recover.go | 11 ++++-- pkg/strategy/dca2/background_runner.go | 35 +++++++++++++++++++ pkg/strategy/dca2/recover.go | 41 ++++++++--------------- pkg/strategy/dca2/state.go | 5 --- pkg/strategy/dca2/strategy.go | 39 +++++++++++---------- 5 files changed, 78 insertions(+), 53 deletions(-) create mode 100644 pkg/strategy/dca2/background_runner.go diff --git a/pkg/strategy/dca2/active_order_recover.go b/pkg/strategy/dca2/active_order_recover.go index cbde64e63..9c27b6acb 100644 --- a/pkg/strategy/dca2/active_order_recover.go +++ b/pkg/strategy/dca2/active_order_recover.go @@ -28,13 +28,14 @@ func (s *Strategy) recoverPeriodically(ctx context.Context) { } func (s *Strategy) recoverActiveOrders(ctx context.Context) error { + s.logger.Info("recover active orders...") openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.ExchangeSession.Exchange, s.Symbol) if err != nil { s.logger.WithError(err).Warn("failed to query open orders") return err } - activeOrders := s.OrderExecutor.ActiveMakerOrders().Orders() + activeOrders := s.OrderExecutor.ActiveMakerOrders() // update num of open orders metrics if metricsNumOfOpenOrders != nil { @@ -43,13 +44,17 @@ func (s *Strategy) recoverActiveOrders(ctx context.Context) error { // update num of active orders metrics if metricsNumOfActiveOrders != nil { - metricsNumOfActiveOrders.With(baseLabels).Set(float64(len(activeOrders))) + metricsNumOfActiveOrders.With(baseLabels).Set(float64(activeOrders.NumOfOrders())) + } + + if len(openOrders) != activeOrders.NumOfOrders() { + s.logger.Warnf("num of open orders (%d) and active orders (%d) is different before active orders recovery, please check it.", len(openOrders), activeOrders.NumOfOrders()) } opts := common.SyncActiveOrdersOpts{ Logger: s.logger, Exchange: s.ExchangeSession.Exchange, - ActiveOrderBook: s.OrderExecutor.ActiveMakerOrders(), + ActiveOrderBook: activeOrders, OpenOrders: openOrders, } diff --git a/pkg/strategy/dca2/background_runner.go b/pkg/strategy/dca2/background_runner.go new file mode 100644 index 000000000..f687cf773 --- /dev/null +++ b/pkg/strategy/dca2/background_runner.go @@ -0,0 +1,35 @@ +package dca2 + +import ( + "context" + "time" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/util" +) + +func (s *Strategy) runBackgrounTask(ctx context.Context) { + s.logger.Info("run background task") + + // recover active orders + recoverActiveOrdersInterval := util.MillisecondsJitter(10*time.Minute, 5*60*1000) + recoverActiveOrdersTicker := time.NewTicker(recoverActiveOrdersInterval) + defer recoverActiveOrdersTicker.Stop() + + // sync strategy + syncPersistenceTicker := time.NewTicker(1 * time.Hour) + defer syncPersistenceTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-syncPersistenceTicker.C: + bbgo.Sync(ctx, s) + case <-recoverActiveOrdersTicker.C: + if err := s.recoverActiveOrders(ctx); err != nil { + s.logger.WithError(err).Warn(err, "failed to recover active orders") + } + } + } +} diff --git a/pkg/strategy/dca2/recover.go b/pkg/strategy/dca2/recover.go index 3dae11c6d..197ed4dae 100644 --- a/pkg/strategy/dca2/recover.go +++ b/pkg/strategy/dca2/recover.go @@ -135,40 +135,27 @@ func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, or } } - // the number of open-position orders is the same as maxOrderCount -> place open-position orders successfully - if numOpenPositionOrders == maxOrderCount { - // all open-position orders are still not filled -> OpenPositionReady - if filledCnt == 0 && cancelledCnt == 0 { - return OpenPositionReady, nil - } - - // there are at least one open-position orders filled -> OpenPositionOrderFilled - if filledCnt > 0 && cancelledCnt == 0 { - return OpenPositionOrderFilled, nil - } - - // there are at last one open-position orders cancelled -> - if cancelledCnt > 0 { - return OpenPositionOrdersCancelling, nil - } - - return None, fmt.Errorf("unexpected order status combination when numOpenPositionOrders(%d) == maxOrderCount(%d) (opened, filled, cancelled) = (%d, %d, %d)", numOpenPositionOrders, maxOrderCount, openedCnt, filledCnt, cancelledCnt) - } - - // the number of open-position orders is less than maxOrderCount -> failed to place open-position orders - // 1. This strategy is at position opening, so it may not place all orders we want successfully - // 2. There are some errors when placing open-position orders. e.g. cannot lock fund..... + // all open-position orders are still not filled -> OpenPositionReady if filledCnt == 0 && cancelledCnt == 0 { - // TODO: place the remaining open-position orders return OpenPositionReady, nil } + // there are at least one open-position orders filled if filledCnt > 0 && cancelledCnt == 0 { - // TODO: place the remaing open-position orders and change state to OpenPositionOrderFilled - return OpenPositionOrderFilled, nil + if openedCnt > 0 { + return OpenPositionOrderFilled, nil + } else { + // all open-position orders filled, change to cancelling and place the take-profit order + return OpenPositionOrdersCancelling, nil + } } - return None, fmt.Errorf("unexpected order status combination when numOpenPositionOrders(%d) < maxOrderCount(%d) (opened, filled, cancelled) = (%d, %d, %d)", numOpenPositionOrders, maxOrderCount, openedCnt, filledCnt, cancelledCnt) + // there are at last one open-position orders cancelled -> + if cancelledCnt > 0 { + return OpenPositionOrdersCancelling, nil + } + + return None, fmt.Errorf("unexpected order status combination (opened, filled, cancelled) = (%d, %d, %d)", openedCnt, filledCnt, cancelledCnt) } func recoverPosition(ctx context.Context, position *types.Position, queryService RecoverApiQueryService, currentRound Round) error { diff --git a/pkg/strategy/dca2/state.go b/pkg/strategy/dca2/state.go index 4400e2f5c..22f593437 100644 --- a/pkg/strategy/dca2/state.go +++ b/pkg/strategy/dca2/state.go @@ -73,9 +73,6 @@ func (s *Strategy) runState(ctx context.Context) { stateTriggerTicker := time.NewTicker(5 * time.Second) defer stateTriggerTicker.Stop() - monitorTicker := time.NewTicker(10 * time.Minute) - defer monitorTicker.Stop() - for { select { case <-ctx.Done(): @@ -84,8 +81,6 @@ func (s *Strategy) runState(ctx context.Context) { case <-stateTriggerTicker.C: // s.logger.Infof("[DCA] triggerNextState current state: %d", s.state) s.triggerNextState() - case <-monitorTicker.C: - s.updateNumOfOrdersMetrics(ctx) case nextState := <-s.nextStateC: // s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState) // check the next state is valid diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index 63d16faff..8ca7defc2 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -242,8 +242,25 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.logger.Infof("unsupported side (%s) of order: %s", o.Side, o) } - // update metrics when filled - s.updateNumOfOrdersMetrics(ctx) + openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.ExchangeSession.Exchange, s.Symbol) + if err != nil { + s.logger.WithError(err).Warn("failed to query open orders when order filled") + } else { + // update open orders metrics + metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders))) + } + + // update active orders metrics + numActiveMakerOrders := s.OrderExecutor.ActiveMakerOrders().NumOfOrders() + metricsNumOfActiveOrders.With(baseLabels).Set(float64(numActiveMakerOrders)) + + if len(openOrders) != numActiveMakerOrders { + s.logger.Warnf("num of open orders (%d) and active orders (%d) is different when order filled, please check it.", len(openOrders), numActiveMakerOrders) + } + + if err == nil && o.Side == openPositionSide && numActiveMakerOrders == 0 && len(openOrders) == 0 { + s.emitNextState(OpenPositionOrdersCancelling) + } }) session.MarketDataStream.OnKLine(func(kline types.KLine) { @@ -311,6 +328,8 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) }) + go s.runBackgrounTask(ctx) + bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() @@ -461,19 +480,3 @@ func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) { return updated, nil } - -func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) { - // update open orders metrics - openOrders, err := s.ExchangeSession.Exchange.QueryOpenOrders(ctx, s.Symbol) - if err != nil { - s.logger.WithError(err).Warn("failed to query open orders to update num of the orders metrics") - } else { - metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders))) - } - - // update active orders metrics - metricsNumOfActiveOrders.With(baseLabels).Set(float64(s.OrderExecutor.ActiveMakerOrders().NumOfOrders())) - - // set persistence - bbgo.Sync(ctx, s) -}