From 8e224739dedc0f41540120f99e6cf46860078240 Mon Sep 17 00:00:00 2001 From: kbearXD Date: Mon, 4 Mar 2024 20:52:01 +0800 Subject: [PATCH] sync active orders and send metrics of order nums --- pkg/exchange/util.go | 10 +- pkg/strategy/common/sync.go | 100 +++++++++++++++ .../sync_test.go} | 86 ++++++------- pkg/strategy/dca2/active_order_recover.go | 57 +++++++++ pkg/strategy/dca2/open_position.go | 2 +- pkg/strategy/dca2/recover.go | 9 +- pkg/strategy/dca2/recover_test.go | 16 +-- pkg/strategy/dca2/strategy.go | 26 ++-- pkg/strategy/grid2/active_order_recover.go | 121 +++++------------- pkg/strategy/grid2/grid_recover.go | 3 +- pkg/strategy/grid2/recover.go | 3 +- 11 files changed, 264 insertions(+), 169 deletions(-) create mode 100644 pkg/strategy/common/sync.go rename pkg/strategy/{grid2/active_order_recover_test.go => common/sync_test.go} (73%) create mode 100644 pkg/strategy/dca2/active_order_recover.go diff --git a/pkg/exchange/util.go b/pkg/exchange/util.go index 639103722..56d3395e5 100644 --- a/pkg/exchange/util.go +++ b/pkg/exchange/util.go @@ -1,6 +1,9 @@ package exchange -import "github.com/c9s/bbgo/pkg/types" +import ( + "github.com/c9s/bbgo/pkg/exchange/max" + "github.com/c9s/bbgo/pkg/types" +) func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) { if marginExchange, ok := exchange.(types.MarginExchange); ok { @@ -27,3 +30,8 @@ func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsola return isMargin, isFutures, isIsolated, isolatedSymbol } + +func IsMaxExchange(exchange interface{}) bool { + _, res := exchange.(*max.Exchange) + return res +} diff --git a/pkg/strategy/common/sync.go b/pkg/strategy/common/sync.go new file mode 100644 index 000000000..1fd9f8ff6 --- /dev/null +++ b/pkg/strategy/common/sync.go @@ -0,0 +1,100 @@ +package common + +import ( + "context" + "strconv" + "time" + + "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/exchange" + maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" + "github.com/c9s/bbgo/pkg/exchange/retry" + "github.com/c9s/bbgo/pkg/types" + "github.com/sirupsen/logrus" + "go.uber.org/multierr" +) + +func SyncActiveOrder(ctx context.Context, ex types.Exchange, orderQueryService types.ExchangeOrderQueryService, activeOrderBook *bbgo.ActiveOrderBook, orderID uint64, syncBefore time.Time) (isOrderUpdated bool, err error) { + isMax := exchange.IsMaxExchange(ex) + + updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{ + Symbol: activeOrderBook.Symbol, + OrderID: strconv.FormatUint(orderID, 10), + }) + + if err != nil { + return isOrderUpdated, err + } + + // maxapi.OrderStateFinalizing does not mean the fee is calculated + // we should only consider order state done for MAX + if isMax && updatedOrder.OriginalStatus != string(maxapi.OrderStateDone) { + return isOrderUpdated, nil + } + + // should only trigger order update when the updated time is old enough + isOrderUpdated = updatedOrder.UpdateTime.Before(syncBefore) + if isOrderUpdated { + activeOrderBook.Update(*updatedOrder) + } + + return isOrderUpdated, nil +} + +type SyncActiveOrdersOpts struct { + Logger *logrus.Entry + Exchange types.Exchange + OrderQueryService types.ExchangeOrderQueryService + ActiveOrderBook *bbgo.ActiveOrderBook + OpenOrders []types.Order +} + +func SyncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { + opts.Logger.Infof("[ActiveOrderRecover] syncActiveOrders") + + // only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice + syncBefore := time.Now().Add(-3 * time.Minute) + + activeOrders := opts.ActiveOrderBook.Orders() + + openOrdersMap := make(map[uint64]types.Order) + for _, openOrder := range opts.OpenOrders { + openOrdersMap[openOrder.OrderID] = openOrder + } + + var errs error + // update active orders not in open orders + for _, activeOrder := range activeOrders { + if _, exist := openOrdersMap[activeOrder.OrderID]; exist { + // no need to sync active order already in active orderbook, because we only need to know if it filled or not. + delete(openOrdersMap, activeOrder.OrderID) + } else { + opts.Logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID) + + isActiveOrderBookUpdated, err := SyncActiveOrder(ctx, opts.Exchange, opts.OrderQueryService, opts.ActiveOrderBook, activeOrder.OrderID, syncBefore) + if err != nil { + opts.Logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID) + errs = multierr.Append(errs, err) + continue + } + + if !isActiveOrderBookUpdated { + opts.Logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID) + } + } + } + + // update open orders not in active orders + for _, openOrder := range openOrdersMap { + opts.Logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID) + // we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice. + if openOrder.UpdateTime.After(syncBefore) { + opts.Logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID) + continue + } + + opts.ActiveOrderBook.Add(openOrder) + } + + return errs +} diff --git a/pkg/strategy/grid2/active_order_recover_test.go b/pkg/strategy/common/sync_test.go similarity index 73% rename from pkg/strategy/grid2/active_order_recover_test.go rename to pkg/strategy/common/sync_test.go index dffdccc38..95bb76b16 100644 --- a/pkg/strategy/grid2/active_order_recover_test.go +++ b/pkg/strategy/common/sync_test.go @@ -1,4 +1,4 @@ -package grid2 +package common import ( "context" @@ -10,7 +10,7 @@ import ( "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types/mocks" "github.com/golang/mock/gomock" - "github.com/prometheus/client_golang/prometheus" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) @@ -23,34 +23,30 @@ func TestSyncActiveOrders(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + log := logrus.WithField("strategy", "test") symbol := "ETHUSDT" - labels := prometheus.Labels{ - "exchange": "default", - "symbol": symbol, - } t.Run("all open orders are match with active orderbook", func(t *testing.T) { mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) mockExchange := mocks.NewMockExchange(mockCtrl) activeOrderbook := bbgo.NewActiveOrderBook(symbol) - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - order := types.Order{ OrderID: 1, Status: types.OrderStatusNew, } order.Symbol = symbol - activeOrderbook.Add(order) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil) + opts := SyncActiveOrdersOpts{ + Logger: log, + Exchange: mockExchange, + OrderQueryService: mockOrderQueryService, + ActiveOrderBook: activeOrderbook, + OpenOrders: []types.Order{order}, + } - assert.NoError(syncActiveOrders(ctx, opts)) + activeOrderbook.Add(order) + + assert.NoError(SyncActiveOrders(ctx, opts)) // verify active orderbook activeOrders := activeOrderbook.Orders() @@ -64,14 +60,6 @@ func TestSyncActiveOrders(t *testing.T) { mockExchange := mocks.NewMockExchange(mockCtrl) activeOrderbook := bbgo.NewActiveOrderBook(symbol) - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - order := types.Order{ OrderID: 1, Status: types.OrderStatusNew, @@ -82,14 +70,21 @@ func TestSyncActiveOrders(t *testing.T) { updatedOrder := order updatedOrder.Status = types.OrderStatusFilled + opts := SyncActiveOrdersOpts{ + Logger: log, + ActiveOrderBook: activeOrderbook, + OrderQueryService: mockOrderQueryService, + Exchange: mockExchange, + OpenOrders: nil, + } + activeOrderbook.Add(order) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil) mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{ Symbol: symbol, OrderID: strconv.FormatUint(order.OrderID, 10), }).Return(&updatedOrder, nil) - assert.NoError(syncActiveOrders(ctx, opts)) + assert.NoError(SyncActiveOrders(ctx, opts)) // verify active orderbook activeOrders := activeOrderbook.Orders() @@ -101,14 +96,6 @@ func TestSyncActiveOrders(t *testing.T) { mockExchange := mocks.NewMockExchange(mockCtrl) activeOrderbook := bbgo.NewActiveOrderBook(symbol) - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - order := types.Order{ OrderID: 1, Status: types.OrderStatusNew, @@ -118,8 +105,14 @@ func TestSyncActiveOrders(t *testing.T) { CreationTime: types.Time(time.Now()), } - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil) - assert.NoError(syncActiveOrders(ctx, opts)) + opts := SyncActiveOrdersOpts{ + Logger: log, + ActiveOrderBook: activeOrderbook, + OrderQueryService: mockOrderQueryService, + Exchange: mockExchange, + OpenOrders: []types.Order{order}, + } + assert.NoError(SyncActiveOrders(ctx, opts)) // verify active orderbook activeOrders := activeOrderbook.Orders() @@ -133,14 +126,6 @@ func TestSyncActiveOrders(t *testing.T) { mockExchange := mocks.NewMockExchange(mockCtrl) activeOrderbook := bbgo.NewActiveOrderBook(symbol) - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - order1 := types.Order{ OrderID: 1, Status: types.OrderStatusNew, @@ -158,14 +143,21 @@ func TestSyncActiveOrders(t *testing.T) { }, } + opts := SyncActiveOrdersOpts{ + Logger: log, + ActiveOrderBook: activeOrderbook, + OrderQueryService: mockOrderQueryService, + Exchange: mockExchange, + OpenOrders: []types.Order{order2}, + } + activeOrderbook.Add(order1) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil) mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{ Symbol: symbol, OrderID: strconv.FormatUint(order1.OrderID, 10), }).Return(&updatedOrder1, nil) - assert.NoError(syncActiveOrders(ctx, opts)) + assert.NoError(SyncActiveOrders(ctx, opts)) // verify active orderbook activeOrders := activeOrderbook.Orders() diff --git a/pkg/strategy/dca2/active_order_recover.go b/pkg/strategy/dca2/active_order_recover.go new file mode 100644 index 000000000..08894fd23 --- /dev/null +++ b/pkg/strategy/dca2/active_order_recover.go @@ -0,0 +1,57 @@ +package dca2 + +import ( + "context" + "time" + + "github.com/c9s/bbgo/pkg/exchange/retry" + "github.com/c9s/bbgo/pkg/strategy/common" + "github.com/c9s/bbgo/pkg/util" +) + +func (s *Strategy) recoverPeriodically(ctx context.Context) { + s.logger.Info("[DCA] monitor and recover periodically") + interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := s.recoverActiveOrders(ctx); err != nil { + s.logger.WithError(err).Warn(err, "failed to recover active orders") + } + } + } +} + +func (s *Strategy) recoverActiveOrders(ctx context.Context) error { + openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.ExchangeSession.Exchange, s.Symbol) + if err != nil { + s.logger.WithError(err).Warn("failed to query open orders") + return err + } + + activeOrders := s.OrderExecutor.ActiveMakerOrders().Orders() + + // update num of open orders metrics + if metricsNumOfOpenOrders != nil { + metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders))) + } + + // update num of active orders metrics + if metricsNumOfActiveOrders != nil { + metricsNumOfActiveOrders.With(baseLabels).Set(float64(len(activeOrders))) + } + + opts := common.SyncActiveOrdersOpts{ + Logger: s.logger, + Exchange: s.ExchangeSession.Exchange, + ActiveOrderBook: s.OrderExecutor.ActiveMakerOrders(), + OpenOrders: openOrders, + } + + return common.SyncActiveOrders(ctx, opts) +} diff --git a/pkg/strategy/dca2/open_position.go b/pkg/strategy/dca2/open_position.go index 1b9abfcc4..db6f0b23c 100644 --- a/pkg/strategy/dca2/open_position.go +++ b/pkg/strategy/dca2/open_position.go @@ -16,7 +16,7 @@ type cancelOrdersByGroupIDApi interface { func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error { s.logger.Infof("[DCA] start placing open position orders") - price, err := getBestPriceUntilSuccess(ctx, s.Session.Exchange, s.Symbol) + price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol) if err != nil { return err } diff --git a/pkg/strategy/dca2/recover.go b/pkg/strategy/dca2/recover.go index ae192d9f8..c875eed46 100644 --- a/pkg/strategy/dca2/recover.go +++ b/pkg/strategy/dca2/recover.go @@ -7,7 +7,6 @@ import ( "time" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) @@ -25,9 +24,9 @@ type RecoverApiQueryService interface { func (s *Strategy) recover(ctx context.Context) error { s.logger.Info("[DCA] recover") - queryService, ok := s.Session.Exchange.(RecoverApiQueryService) + queryService, ok := s.ExchangeSession.Exchange.(RecoverApiQueryService) if !ok { - return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.Session.ExchangeName) + return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.ExchangeSession.ExchangeName) } openOrders, err := queryService.QueryOpenOrders(ctx, s.Symbol) @@ -63,7 +62,7 @@ func (s *Strategy) recover(ctx context.Context) error { s.startTimeOfNextRound = startTimeOfNextRound // recover state - state, err := recoverState(ctx, s.ProfitStats.QuoteInvestment, int(s.MaxOrderCount), currentRound, s.OrderExecutor) + state, err := recoverState(ctx, int(s.MaxOrderCount), currentRound, s.OrderExecutor) if err != nil { return err } @@ -74,7 +73,7 @@ func (s *Strategy) recover(ctx context.Context) error { } // recover state -func recoverState(ctx context.Context, quoteInvestment fixedpoint.Value, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) { +func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) { activeOrderBook := orderExecutor.ActiveMakerOrders() orderStore := orderExecutor.OrderStore() diff --git a/pkg/strategy/dca2/recover_test.go b/pkg/strategy/dca2/recover_test.go index 62d793359..ae3a41fa8 100644 --- a/pkg/strategy/dca2/recover_test.go +++ b/pkg/strategy/dca2/recover_test.go @@ -7,7 +7,6 @@ import ( "time" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" "github.com/stretchr/testify/assert" ) @@ -95,13 +94,12 @@ func (m *MockQueryOrders) QueryClosedOrdersDesc(ctx context.Context, symbol stri func Test_RecoverState(t *testing.T) { strategy := newTestStrategy() - quoteInvestment := fixedpoint.MustNewFromString("1000") t.Run("new strategy", func(t *testing.T) { currentRound := Round{} position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) }) @@ -119,7 +117,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionReady, state) }) @@ -137,7 +135,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrderFilled, state) }) @@ -155,7 +153,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) }) @@ -173,7 +171,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, OpenPositionOrdersCancelling, state) }) @@ -192,7 +190,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, TakeProfitReady, state) }) @@ -211,7 +209,7 @@ func Test_RecoverState(t *testing.T) { } position := types.NewPositionFromMarket(strategy.Market) orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position) - state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor) + state, err := recoverState(context.Background(), 5, currentRound, orderExecutor) assert.NoError(t, err) assert.Equal(t, WaitToOpenPosition, state) }) diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index dbbdc77b8..3c3462c17 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -46,10 +46,10 @@ type Strategy struct { Position *types.Position `json:"position,omitempty" persistence:"position"` ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` - Environment *bbgo.Environment - Session *bbgo.ExchangeSession - OrderExecutor *bbgo.GeneralOrderExecutor - Market types.Market + Environment *bbgo.Environment + ExchangeSession *bbgo.ExchangeSession + OrderExecutor *bbgo.GeneralOrderExecutor + Market types.Market Symbol string `json:"symbol"` @@ -146,8 +146,8 @@ func (s *Strategy) newPrometheusLabels() prometheus.Labels { "symbol": s.Symbol, } - if s.Session != nil { - labels["exchange"] = s.Session.Name + if s.ExchangeSession != nil { + labels["exchange"] = s.ExchangeSession.Name } if s.PrometheusLabels == nil { @@ -159,7 +159,7 @@ func (s *Strategy) newPrometheusLabels() prometheus.Labels { func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { instanceID := s.InstanceID() - s.Session = session + s.ExchangeSession = session if s.ProfitStats == nil { s.ProfitStats = newProfitStats(s.Market, s.QuoteInvestment) } @@ -320,7 +320,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error { _ = s.Initialize() defer s.EmitClosed() - session := s.Session + session := s.ExchangeSession if session == nil { return fmt.Errorf("Session is nil, please check it") } @@ -358,14 +358,14 @@ func (s *Strategy) CleanUp(ctx context.Context) error { } func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { - historyService, ok := s.Session.Exchange.(types.ExchangeTradeHistoryService) + historyService, ok := s.ExchangeSession.Exchange.(types.ExchangeTradeHistoryService) if !ok { - return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.Session.Exchange.Name()) + return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.ExchangeSession.Exchange.Name()) } - queryService, ok := s.Session.Exchange.(types.ExchangeOrderQueryService) + queryService, ok := s.ExchangeSession.Exchange.(types.ExchangeOrderQueryService) if !ok { - return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.Session.Exchange.Name()) + return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.ExchangeSession.Exchange.Name()) } // TODO: pagination for it @@ -444,7 +444,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) { // update open orders metrics - openOrders, err := s.Session.Exchange.QueryOpenOrders(ctx, s.Symbol) + openOrders, err := s.ExchangeSession.Exchange.QueryOpenOrders(ctx, s.Symbol) if err != nil { s.logger.WithError(err).Warn("failed to query open orders to update num of the orders metrics") } else { diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index d06f316fc..e834bc343 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -4,26 +4,12 @@ import ( "context" "time" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "go.uber.org/multierr" - - "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/exchange/max" "github.com/c9s/bbgo/pkg/exchange/retry" + "github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" ) -type SyncActiveOrdersOpts struct { - logger *logrus.Entry - metricsLabels prometheus.Labels - activeOrderBook *bbgo.ActiveOrderBook - orderQueryService types.ExchangeOrderQueryService - exchange types.Exchange -} - func (s *Strategy) initializeRecoverC() bool { s.mu.Lock() defer s.mu.Unlock() @@ -57,17 +43,25 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { // make ticker's interval random in 25 min ~ 35 min interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) s.logger.Infof("[ActiveOrderRecover] interval: %s", interval) + + metricsLabel := s.newPrometheusLabels() + + orderQueryService, ok := s.session.Exchange.(types.ExchangeOrderQueryService) + if !ok { + s.logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService, please check it", s.session.ExchangeName) + return + } + + opts := common.SyncActiveOrdersOpts{ + Logger: s.logger, + Exchange: s.session.Exchange, + OrderQueryService: orderQueryService, + ActiveOrderBook: s.orderExecutor.ActiveMakerOrders(), + } + ticker := time.NewTicker(interval) defer ticker.Stop() - opts := SyncActiveOrdersOpts{ - logger: s.logger, - metricsLabels: s.newPrometheusLabels(), - activeOrderBook: s.orderExecutor.ActiveMakerOrders(), - orderQueryService: s.orderQueryService, - exchange: s.session.Exchange, - } - var lastRecoverTime time.Time for { @@ -82,7 +76,19 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { continue } - if err := syncActiveOrders(ctx, opts); err != nil { + openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.session.Exchange, s.Symbol) + if err != nil { + s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") + continue + } + + if metricsNumOfOpenOrders != nil { + metricsNumOfOpenOrders.With(metricsLabel).Set(float64(len(openOrders))) + } + + opts.OpenOrders = openOrders + + if err := common.SyncActiveOrders(ctx, opts); err != nil { log.WithError(err).Errorf("unable to sync active orders") } else { lastRecoverTime = time.Now() @@ -90,70 +96,3 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { } } } - -func isMaxExchange(ex interface{}) bool { - _, yes := ex.(*max.Exchange) - return yes -} - -func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { - opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders") - - // only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice - syncBefore := time.Now().Add(-3 * time.Minute) - - openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, opts.exchange, opts.activeOrderBook.Symbol) - if err != nil { - opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") - return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time") - } - - if metricsNumOfOpenOrders != nil { - metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders))) - } - - activeOrders := opts.activeOrderBook.Orders() - - openOrdersMap := make(map[uint64]types.Order) - for _, openOrder := range openOrders { - openOrdersMap[openOrder.OrderID] = openOrder - } - - var errs error - // update active orders not in open orders - for _, activeOrder := range activeOrders { - - if _, exist := openOrdersMap[activeOrder.OrderID]; exist { - // no need to sync active order already in active orderbook, because we only need to know if it filled or not. - delete(openOrdersMap, activeOrder.OrderID) - } else { - opts.logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID) - - isActiveOrderBookUpdated, err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID, syncBefore) - if err != nil { - opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID) - errs = multierr.Append(errs, err) - continue - } - - if !isActiveOrderBookUpdated { - opts.logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID) - } - } - } - - // update open orders not in active orders - for _, openOrder := range openOrdersMap { - opts.logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID) - // we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice. - if openOrder.UpdateTime.After(syncBefore) { - opts.logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID) - continue - } - - opts.activeOrderBook.Add(openOrder) - // opts.activeOrderBook.Update(openOrder) - } - - return errs -} diff --git a/pkg/strategy/grid2/grid_recover.go b/pkg/strategy/grid2/grid_recover.go index 70ae575dd..efde00390 100644 --- a/pkg/strategy/grid2/grid_recover.go +++ b/pkg/strategy/grid2/grid_recover.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/exchange" maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -19,7 +20,7 @@ func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.Ex defer func() { s.updateGridNumOfOrdersMetricsWithLock() }() - isMax := isMaxExchange(session.Exchange) + isMax := exchange.IsMaxExchange(session.Exchange) s.logger.Infof("isMax: %t", isMax) historyService, implemented := session.Exchange.(types.ExchangeTradeHistoryService) diff --git a/pkg/strategy/grid2/recover.go b/pkg/strategy/grid2/recover.go index cae47b303..4b657c608 100644 --- a/pkg/strategy/grid2/recover.go +++ b/pkg/strategy/grid2/recover.go @@ -9,6 +9,7 @@ import ( "github.com/pkg/errors" "github.com/c9s/bbgo/pkg/bbgo" + "github.com/c9s/bbgo/pkg/exchange" maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" @@ -273,7 +274,7 @@ func syncActiveOrder( ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64, syncBefore time.Time, ) (isOrderUpdated bool, err error) { - isMax := isMaxExchange(orderQueryService) + isMax := exchange.IsMaxExchange(orderQueryService) updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{ Symbol: activeOrderBook.Symbol,