From 37354997537a8fef66d4fdfb52b87061048899a0 Mon Sep 17 00:00:00 2001 From: kbearXD Date: Thu, 27 Jun 2024 16:11:19 +0800 Subject: [PATCH] FEATURE: merge recover logic and run periodically --- pkg/strategy/grid2/active_order_recover.go | 98 ------------- pkg/strategy/grid2/recover.go | 157 +++++++++++++++------ pkg/strategy/grid2/strategy.go | 6 +- pkg/strategy/grid2/twin_order.go | 17 +-- pkg/strategy/grid2/twin_order_test.go | 2 +- 5 files changed, 128 insertions(+), 152 deletions(-) delete mode 100644 pkg/strategy/grid2/active_order_recover.go diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go deleted file mode 100644 index e834bc343..000000000 --- a/pkg/strategy/grid2/active_order_recover.go +++ /dev/null @@ -1,98 +0,0 @@ -package grid2 - -import ( - "context" - "time" - - "github.com/c9s/bbgo/pkg/exchange/retry" - "github.com/c9s/bbgo/pkg/strategy/common" - "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" -) - -func (s *Strategy) initializeRecoverC() bool { - s.mu.Lock() - defer s.mu.Unlock() - - isInitialize := false - - if s.recoverC == nil { - s.logger.Info("initializing recover channel") - s.recoverC = make(chan struct{}, 1) - } else { - s.logger.Info("recover channel is already initialized, trigger active orders recover") - isInitialize = true - - select { - case s.recoverC <- struct{}{}: - s.logger.Info("trigger active orders recover") - default: - s.logger.Info("activeOrdersRecoverC is full") - } - } - - return isInitialize -} - -func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { - // every time we activeOrdersRecoverC receive signal, do active orders recover - if isInitialize := s.initializeRecoverC(); isInitialize { - return - } - - // make ticker's interval random in 25 min ~ 35 min - interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) - s.logger.Infof("[ActiveOrderRecover] interval: %s", interval) - - metricsLabel := s.newPrometheusLabels() - - orderQueryService, ok := s.session.Exchange.(types.ExchangeOrderQueryService) - if !ok { - s.logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService, please check it", s.session.ExchangeName) - return - } - - opts := common.SyncActiveOrdersOpts{ - Logger: s.logger, - Exchange: s.session.Exchange, - OrderQueryService: orderQueryService, - ActiveOrderBook: s.orderExecutor.ActiveMakerOrders(), - } - - ticker := time.NewTicker(interval) - defer ticker.Stop() - - var lastRecoverTime time.Time - - for { - select { - case <-ctx.Done(): - return - - case <-ticker.C: - s.recoverC <- struct{}{} - case <-s.recoverC: - if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) { - continue - } - - openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.session.Exchange, s.Symbol) - if err != nil { - s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") - continue - } - - if metricsNumOfOpenOrders != nil { - metricsNumOfOpenOrders.With(metricsLabel).Set(float64(len(openOrders))) - } - - opts.OpenOrders = openOrders - - if err := common.SyncActiveOrders(ctx, opts); err != nil { - log.WithError(err).Errorf("unable to sync active orders") - } else { - lastRecoverTime = time.Now() - } - } - } -} diff --git a/pkg/strategy/grid2/recover.go b/pkg/strategy/grid2/recover.go index 0dc185a37..e3a3a7fa1 100644 --- a/pkg/strategy/grid2/recover.go +++ b/pkg/strategy/grid2/recover.go @@ -14,26 +14,84 @@ import ( "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" ) var syncWindow = -3 * time.Minute +func (s *Strategy) initializeRecoverC() bool { + s.mu.Lock() + defer s.mu.Unlock() + + isInitialize := false + + if s.recoverC == nil { + s.logger.Info("[Recover] initializing recover channel") + s.recoverC = make(chan struct{}, 1) + } else { + s.logger.Info("[Recover] recover channel is already initialized, trigger active orders recover") + isInitialize = true + + select { + case s.recoverC <- struct{}{}: + s.logger.Info("[Recover] trigger active orders recover") + default: + s.logger.Info("[Recover] activeOrdersRecoverC is full") + } + } + + return isInitialize +} + +func (s *Strategy) recoverPeriodically(ctx context.Context) { + if isInitialize := s.initializeRecoverC(); isInitialize { + return + } + + interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) + s.logger.Infof("[Recover] interval: %s", interval) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + var lastRecoverTime time.Time + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.recoverC <- struct{}{} + case <-s.recoverC: + // if we already recovered in 10 min, we should skip to avoid recovering too frequently + if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) { + continue + } + + if err := s.recover(ctx); err != nil { + s.logger.WithError(err).Error("failed to recover") + } else { + lastRecoverTime = time.Now() + } + } + } +} + /* Background knowledge 1. active orderbook add orders only when receive new order event or call Add/Update method manually 2. active orderbook remove orders only when receive filled/cancelled event or call Remove/Update method manually As a result - 1. at the same twin-order-price, there is order in open orders but not in active orderbook - - not receive new order event - => add order into active orderbook - 2. at the same twin-order-price, there is order in active orderbook but not in open orders - - not receive filled event - => query the filled order and call Update method - 3. at the same twin-order-price, there is no order in open orders and no order in active orderbook + 1. at the same twin-order-price, there is no order in open orders and no order in active orderbook - failed to create the order => query the last order from trades to emit filled, and it will submit again - not receive new order event and the order filled before we find it. => query the untracked order (also is the last order) from trades to emit filled and it will submit the reversed order + 2. at the same twin-order-price, there is order in open orders but not in active orderbook + - not receive new order event + => add order into active orderbook + 3. at the same twin-order-price, there is order in active orderbook but not in open orders + - not receive filled event + => query the filled order and call Update method 4. at the same twin-order-price, there are different orders in open orders and active orderbook - should not happen !!! => log error @@ -49,10 +107,11 @@ var syncWindow = -3 * time.Minute */ func (s *Strategy) recover(ctx context.Context) error { + s.logger.Info("[Recover] try to recover") historyService, implemented := s.session.Exchange.(types.ExchangeTradeHistoryService) // if the exchange doesn't support ExchangeTradeHistoryService, do not run recover if !implemented { - s.logger.Warn("ExchangeTradeHistoryService is not implemented, can not recover grid") + s.logger.Warn("[Recover] ExchangeTradeHistoryService is not implemented, can not recover grid") return nil } @@ -76,28 +135,28 @@ func (s *Strategy) recover(ctx context.Context) error { }) if err != nil { - return errors.Wrapf(err, "unable to query trades when recovering") + return errors.Wrapf(err, "[Recover] unable to query trades when recovering") } if len(trades) == 0 { - s.logger.Info("no open order, no active order, no trade, it's a new strategy so no need to recover") + s.logger.Info("[Recover] no open order, no active order, no trade, it's a new strategy so no need to recover") return nil } } - s.logger.Info("start recovering") + s.logger.Info("[Recover] start recovering") if s.getGrid() == nil { s.setGrid(s.newGrid()) } - s.mu.Lock() - defer s.mu.Unlock() - pins := s.getGrid().Pins syncBefore := time.Now().Add(syncWindow) + s.mu.Lock() + defer s.mu.Unlock() + activeOrdersInTwinOrderBook, err := buildTwinOrderBook(pins, activeOrders) openOrdersInTwinOrderBook, err := buildTwinOrderBook(pins, openOrders) @@ -113,7 +172,7 @@ func (s *Strategy) recover(ctx context.Context) error { activeOrder := activeOrdersInTwinOrderBook.GetTwinOrder(v) openOrder := openOrdersInTwinOrderBook.GetTwinOrder(v) if activeOrder == nil || openOrder == nil { - return fmt.Errorf("there is no any twin order at this pin, can not recover") + return fmt.Errorf("this pin (%s) is invalid. Please check it.", v.String()) } var activeOrderID uint64 = 0 @@ -126,23 +185,28 @@ func (s *Strategy) recover(ctx context.Context) error { openOrderID = openOrder.GetOrder().OrderID } - // case 3 + // case 1 if activeOrderID == 0 && openOrderID == 0 { noTwinOrderPins = append(noTwinOrderPins, v) continue } - // case 1 + // case 2 if activeOrderID == 0 { order := openOrder.GetOrder() s.logger.Infof("[Recover] found open order #%d is not in the active orderbook, adding...", order.OrderID) - activeOrderBook.Add(order) - // also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid - activeOrdersInTwinOrderBook.AddTwinOrder(v, openOrder) + + if order.UpdateTime.Before(syncBefore) { + activeOrderBook.Add(order) + // also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid + activeOrdersInTwinOrderBook.AddOrder(order, true) + } else { + s.logger.Infof("[Recover] open order #%d is updated in 3 min, skip adding...", order.OrderID) + } continue } - // case 2 + // case 3 if openOrderID == 0 { order := activeOrder.GetOrder() s.logger.Infof("[Recover] found active order #%d is not in the open orders, updating...", order.OrderID) @@ -155,43 +219,50 @@ func (s *Strategy) recover(ctx context.Context) error { if !isActiveOrderBookUpdated { s.logger.Infof("[Recover] active order #%d is updated in 3 min, skip updating...", order.OrderID) } - continue } // case 4 if activeOrderID != openOrderID { - return fmt.Errorf("there are two different orders in the same pin, can not recover") + return fmt.Errorf("[Recover] there are two different orders in the same pin, can not recover") } // case 5 // do nothing } - s.logger.Infof("twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String()) + s.logger.Infof("[Recover] twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String()) + s.logger.Infof("[Recover] pins without twin orders: %+v", noTwinOrderPins) if len(noTwinOrderPins) != 0 { if err := s.recoverEmptyGridOnTwinOrderBook(ctx, activeOrdersInTwinOrderBook, historyService, s.orderQueryService); err != nil { - s.logger.WithError(err).Error("failed to recover empty grid") + s.logger.WithError(err).Error("[Recover] failed to recover empty grid") return err } - s.logger.Infof("twin orderbook after recovering no twin order on grid\n%s", activeOrdersInTwinOrderBook.String()) + s.logger.Infof("[Recover] twin orderbook after recovering no twin order on grid\n%s", activeOrdersInTwinOrderBook.String()) if activeOrdersInTwinOrderBook.EmptyTwinOrderSize() > 0 { - return fmt.Errorf("there is still empty grid in twin orderbook") + return fmt.Errorf("[Recover] there is still empty grid in twin orderbook") } for _, pin := range noTwinOrderPins { twinOrder := activeOrdersInTwinOrderBook.GetTwinOrder(pin) if twinOrder == nil { - return fmt.Errorf("should not get nil twin order after recovering empty grid, check it") + return fmt.Errorf("[Recover] should not get nil twin order after recovering empty grid, check it") } if !twinOrder.Exist() { - return fmt.Errorf("should not get empty twin order after recovering empty grid, check it") + return fmt.Errorf("[Recover] should not get empty twin order after recovering empty grid, check it") } + filledOrder := twinOrder.GetOrder() + s.logger.Infof("[Recover] find filled order #%d (status: %s)", filledOrder.OrderID, filledOrder.Status) + if filledOrder.Status != types.OrderStatusFilled { + return fmt.Errorf("[Recover] should not get non-filled status, check it") + } + + s.logger.Infof("[Recover] emit filled order %s", filledOrder) activeOrderBook.EmitFilled(twinOrder.GetOrder()) time.Sleep(100 * time.Millisecond) @@ -216,7 +287,7 @@ func (s *Strategy) recoverEmptyGridOnTwinOrderBook( queryOrderService types.ExchangeOrderQueryService, ) error { if twinOrderBook.EmptyTwinOrderSize() == 0 { - s.logger.Info("no empty grid") + s.logger.Info("[Recover] no empty grid") return nil } @@ -233,24 +304,24 @@ func (s *Strategy) recoverEmptyGridOnTwinOrderBook( for { if err := queryTradesToUpdateTwinOrderBook(ctx, s.Symbol, twinOrderBook, queryTradesService, queryOrderService, existedOrders, since, until, s.debugLog); err != nil { - return errors.Wrapf(err, "failed to query trades to update twin orderbook") + return errors.Wrapf(err, "[Recover] failed to query trades to update twin orderbook") } until = since since = until.Add(-6 * time.Hour) if twinOrderBook.EmptyTwinOrderSize() == 0 { - s.logger.Infof("stop querying trades because there is no empty twin order on twin orderbook") + s.logger.Infof("[Recover] stop querying trades because there is no empty twin order on twin orderbook") break } if s.GridProfitStats != nil && s.GridProfitStats.Since != nil && until.Before(*s.GridProfitStats.Since) { - s.logger.Infof("stop querying trades because the time range is out of the strategy's since (%s)", *s.GridProfitStats.Since) + s.logger.Infof("[Recover] stop querying trades because the time range is out of the strategy's since (%s)", *s.GridProfitStats.Since) break } if until.Before(recoverSinceLimit) { - s.logger.Infof("stop querying trades because the time range is out of the limit (%s)", recoverSinceLimit) + s.logger.Infof("[Recover] stop querying trades because the time range is out of the limit (%s)", recoverSinceLimit) break } } @@ -262,7 +333,7 @@ func buildTwinOrderBook(pins []Pin, orders []types.Order) (*TwinOrderBook, error book := newTwinOrderBook(pins) for _, order := range orders { - if err := book.AddOrder(order); err != nil { + if err := book.AddOrder(order, true); err != nil { return nil, err } } @@ -311,7 +382,7 @@ func queryTradesToUpdateTwinOrderBook( logger func(format string, args ...interface{}), ) error { if twinOrderBook == nil { - return fmt.Errorf("twin orderbook should not be nil, please check it") + return fmt.Errorf("[Recover] twin orderbook should not be nil, please check it") } var fromTradeID uint64 = 0 @@ -325,11 +396,11 @@ func queryTradesToUpdateTwinOrderBook( }) if err != nil { - return errors.Wrapf(err, "failed to query trades to recover the grid") + return errors.Wrapf(err, "[Recover] failed to query trades to recover the grid") } if logger != nil { - logger("QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades)) + logger("[Recover] QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades)) } for _, trade := range trades { @@ -338,7 +409,7 @@ func queryTradesToUpdateTwinOrderBook( } if logger != nil { - logger(trade.String()) + logger("[Recover] " + trade.String()) } if existedOrders.Exists(trade.OrderID) { @@ -351,19 +422,19 @@ func queryTradesToUpdateTwinOrderBook( }) if err != nil { - return errors.Wrapf(err, "failed to query order by trade (trade id: %d, order id: %d)", trade.ID, trade.OrderID) + return errors.Wrapf(err, "[Recover] failed to query order by trade (trade id: %d, order id: %d)", trade.ID, trade.OrderID) } if logger != nil { - logger(order.String()) + logger("[Recover] " + order.String()) } // avoid query this order again existedOrders.Add(*order) // add 1 to avoid duplicate fromTradeID = trade.ID + 1 - if err := twinOrderBook.AddOrder(*order); err != nil { - return errors.Wrapf(err, "failed to add queried order into twin orderbook") + if err := twinOrderBook.AddOrder(*order, true); err != nil { + return errors.Wrapf(err, "[Recover] failed to add queried order into twin orderbook: %s", order.String()) } } diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 37411f5f6..bc1ca2bb5 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1993,7 +1993,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. return } - s.recoverActiveOrdersPeriodically(ctx) + s.recoverPeriodically(ctx) }) } else { s.startProcess(ctx, session) @@ -2008,12 +2008,14 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi if s.RecoverOrdersWhenStart { // do recover only when triggerPrice is not set and not in the back-test mode s.logger.Infof("recoverWhenStart is set, trying to recover grid orders...") - if err := s.recoverGrid(ctx, session); err != nil { + if err := s.recover(ctx); err != nil { // if recover fail, return and do not open grid s.logger.WithError(err).Error("failed to start process, recover error") s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error")) return err } + + s.EmitGridReady() } // avoid using goroutine here for back-test diff --git a/pkg/strategy/grid2/twin_order.go b/pkg/strategy/grid2/twin_order.go index f3a093b95..0d322654e 100644 --- a/pkg/strategy/grid2/twin_order.go +++ b/pkg/strategy/grid2/twin_order.go @@ -175,6 +175,9 @@ func (b *TwinOrderBook) String() string { sb.WriteString("================== TWIN ORDERBOOK ==================\n") for _, pin := range b.pins { twin := b.m[fixedpoint.Value(pin)] + if twin == nil { + continue + } twinOrder := twin.GetOrder() sb.WriteString(fmt.Sprintf("-> %8s) %s\n", pin, twinOrder.String())) } @@ -217,7 +220,7 @@ func (b *TwinOrderBook) GetTwinOrderPin(order types.Order) (fixedpoint.Value, er return b.pins[idx], nil } -func (b *TwinOrderBook) AddOrder(order types.Order) error { +func (b *TwinOrderBook) AddOrder(order types.Order, checkUpdateTime bool) error { b.mu.Lock() defer b.mu.Unlock() @@ -238,7 +241,12 @@ func (b *TwinOrderBook) AddOrder(order types.Order) error { // Exist == false means there is no twin order on this pin if !twinOrder.Exist() { b.size++ + } else { + if checkUpdateTime && twinOrder.GetOrder().UpdateTime.After(order.UpdateTime.Time()) { + return nil + } } + if b.size >= len(b.pins) { return fmt.Errorf("the maximum size of twin orderbook is len(pins) - 1, need to check it") } @@ -251,13 +259,6 @@ func (b *TwinOrderBook) GetTwinOrder(pin fixedpoint.Value) *TwinOrder { return b.m[pin] } -func (b *TwinOrderBook) AddTwinOrder(pin fixedpoint.Value, order *TwinOrder) { - b.mu.Lock() - defer b.mu.Unlock() - - b.m[pin] = order -} - // Size is the valid twin order on grid. func (b *TwinOrderBook) Size() int { return b.size diff --git a/pkg/strategy/grid2/twin_order_test.go b/pkg/strategy/grid2/twin_order_test.go index 47395303f..671254f45 100644 --- a/pkg/strategy/grid2/twin_order_test.go +++ b/pkg/strategy/grid2/twin_order_test.go @@ -53,7 +53,7 @@ func TestTwinOrderBook(t *testing.T) { } for _, order := range orders { - assert.NoError(book.AddOrder(order)) + assert.NoError(book.AddOrder(order, false)) } assert.Equal(2, book.Size()) assert.Equal(2, book.EmptyTwinOrderSize())