diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go new file mode 100644 index 000000000..dfbf3e7b5 --- /dev/null +++ b/pkg/strategy/grid2/active_order_recover.go @@ -0,0 +1,94 @@ +package grid2 + +import ( + "context" + "strconv" + "sync/atomic" + "time" + + "github.com/c9s/bbgo/pkg/exchange/retry" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" +) + +func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Context) { + // sleep for a while to wait for recovered + time.Sleep(util.MillisecondsJitter(5*time.Second, 1000*10)) + + if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil { + s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") + } else { + if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil { + s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook") + } + } + + ticker := time.NewTicker(util.MillisecondsJitter(40*time.Minute, 30*60*1000)) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil { + s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") + } else { + if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil { + s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook") + } + } + case <-ctx.Done(): + return + } + } +} + +func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOrders []types.Order) error { + recovered := atomic.LoadInt32(&s.recovered) + if recovered == 0 { + s.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready") + return nil + } + + s.logger.Infof("[ActiveOrderRecover] recovering active orders with open orders") + + if s.getGrid() == nil { + return nil + } + + s.mu.Lock() + defer s.mu.Unlock() + + activeOrderBook := s.orderExecutor.ActiveMakerOrders() + activeOrders := activeOrderBook.Orders() + + openOrdersMap := make(map[uint64]types.Order) + for _, openOrder := range openOrders { + openOrders[openOrder.OrderID] = openOrder + } + + // update active orders not in open orders + for _, activeOrder := range activeOrders { + if _, exist := openOrdersMap[activeOrder.OrderID]; !exist { + s.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID) + delete(openOrdersMap, activeOrder.OrderID) + updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{ + Symbol: activeOrder.Symbol, + OrderID: strconv.FormatUint(activeOrder.OrderID, 10), + }) + + if err != nil { + s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order (%d)", activeOrder.OrderID) + continue + } + + activeOrderBook.Update(*updatedOrder) + } + } + + // update open orders not in active orders + for _, openOrders := range openOrdersMap { + activeOrderBook.Update(openOrders) + } + + return nil +} diff --git a/pkg/strategy/grid2/checker.go b/pkg/strategy/grid2/checker.go deleted file mode 100644 index 2a91b34d6..000000000 --- a/pkg/strategy/grid2/checker.go +++ /dev/null @@ -1,105 +0,0 @@ -package grid2 - -import ( - "context" - "strconv" - "sync/atomic" - "time" - - "github.com/c9s/bbgo/pkg/exchange/retry" - "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" - "github.com/pkg/errors" -) - -type ActiveOrderRecover struct { - strategy *Strategy - - interval time.Duration -} - -func NewActiveOrderRecover(strategy *Strategy, interval time.Duration) *ActiveOrderRecover { - return &ActiveOrderRecover{ - strategy: strategy, - interval: interval, - } -} - -func (c *ActiveOrderRecover) Run(ctx context.Context) { - // sleep for a while to wait for recovered - time.Sleep(util.MillisecondsJitter(5*time.Second, 1000*10)) - - if err := c.recover(ctx); err != nil { - c.strategy.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook") - } - - ticker := time.NewTicker(util.MillisecondsJitter(c.interval, 30*60*1000)) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - if err := c.recover(ctx); err != nil { - c.strategy.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook") - } - case <-ctx.Done(): - return - } - } -} - -func (c *ActiveOrderRecover) recover(ctx context.Context) error { - recovered := atomic.LoadInt32(&c.strategy.recovered) - if recovered == 0 { - c.strategy.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready") - return nil - } - - c.strategy.logger.Infof("[ActiveOrderRecover] recovering active orders with open orders") - - if c.strategy.getGrid() == nil { - return nil - } - - c.strategy.mu.Lock() - defer c.strategy.mu.Unlock() - - openOrders, err := c.strategy.session.Exchange.QueryOpenOrders(ctx, c.strategy.Symbol) - if err != nil { - return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders") - } - - activeOrderBook := c.strategy.orderExecutor.ActiveMakerOrders() - activeOrders := activeOrderBook.Orders() - - openOrdersMap := make(map[uint64]types.Order) - for _, openOrder := range openOrders { - openOrders[openOrder.OrderID] = openOrder - } - - // update active orders not in open orders - for _, activeOrder := range activeOrders { - if _, exist := openOrdersMap[activeOrder.OrderID]; !exist { - c.strategy.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID) - delete(openOrdersMap, activeOrder.OrderID) - updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, c.strategy.orderQueryService, types.OrderQuery{ - Symbol: activeOrder.Symbol, - OrderID: strconv.FormatUint(activeOrder.OrderID, 10), - }) - - if err != nil { - c.strategy.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order (%d)", activeOrder.OrderID) - continue - } - - activeOrderBook.Update(*updatedOrder) - } - } - - // update open orders not in active orders - for _, openOrders := range openOrdersMap { - activeOrderBook.Update(openOrders) - } - - return nil -} diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 71ca97e10..6a8f6574b 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -171,9 +171,6 @@ type Strategy struct { RecoverGridByScanningTrades bool `json:"recoverGridByScanningTrades"` RecoverGridWithin time.Duration `json:"recoverGridWithin"` - // activeOrderRecover periodically check the open orders is the same as active orderbook and recover it - activeOrderRecover *ActiveOrderRecover - EnableProfitFixer bool `json:"enableProfitFixer"` FixProfitSince *types.Time `json:"fixProfitSince"` @@ -1989,10 +1986,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) } - s.activeOrderRecover = NewActiveOrderRecover(s, 40*time.Minute) session.UserDataStream.OnAuth(func() { if !bbgo.IsBackTesting { - go s.activeOrderRecover.Run(ctx) + go s.recoverActiveOrdersWithOpenOrdersPeriodically(ctx) /* // callback may block the stream execution, so we spawn the recover function to the background // add (5 seconds + random <10 seconds jitter) delay