diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go deleted file mode 100644 index e93e722f0..000000000 --- a/pkg/strategy/grid2/active_order_recover.go +++ /dev/null @@ -1,159 +0,0 @@ -package grid2 - -import ( - "context" - "strconv" - "time" - - "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/exchange/retry" - "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" - "github.com/pkg/errors" - "github.com/prometheus/client_golang/prometheus" - "github.com/sirupsen/logrus" - "go.uber.org/multierr" -) - -type SyncActiveOrdersOpts struct { - logger *logrus.Entry - metricsLabels prometheus.Labels - activeOrderBook *bbgo.ActiveOrderBook - orderQueryService types.ExchangeOrderQueryService - exchange types.Exchange -} - -func (s *Strategy) initializeRecoverCh() bool { - s.mu.Lock() - defer s.mu.Unlock() - - isInitialize := false - - if s.activeOrdersRecoverC == nil { - s.logger.Info("initializing recover channel") - s.activeOrdersRecoverC = make(chan struct{}, 1) - } else { - s.logger.Info("recover channel is already initialized, trigger active orders recover") - isInitialize = true - - select { - case s.activeOrdersRecoverC <- struct{}{}: - s.logger.Info("trigger active orders recover") - default: - s.logger.Info("activeOrdersRecoverC is full") - } - } - - return isInitialize -} - -func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { - // every time we activeOrdersRecoverC receive signal, do active orders recover - if isInitialize := s.initializeRecoverCh(); isInitialize { - return - } - - // make ticker's interval random in 25 min ~ 35 min - interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) - s.logger.Infof("[ActiveOrderRecover] interval: %s", interval) - ticker := time.NewTicker(interval) - defer ticker.Stop() - - opts := SyncActiveOrdersOpts{ - logger: s.logger, - metricsLabels: s.newPrometheusLabels(), - activeOrderBook: s.orderExecutor.ActiveMakerOrders(), - orderQueryService: s.orderQueryService, - exchange: s.session.Exchange, - } - - for { - select { - - case <-ctx.Done(): - return - - case <-ticker.C: - if err := syncActiveOrders(ctx, opts); err != nil { - log.WithError(err).Errorf("unable to sync active orders") - } - - case <-s.activeOrdersRecoverC: - if err := syncActiveOrders(ctx, opts); err != nil { - log.WithError(err).Errorf("unable to sync active orders") - } - - } - } -} - -func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { - opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders") - - notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute) - - openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, opts.exchange, opts.activeOrderBook.Symbol) - if err != nil { - opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") - return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time") - } - - if metricsNumOfOpenOrders != nil { - metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders))) - } - - activeOrders := opts.activeOrderBook.Orders() - - openOrdersMap := make(map[uint64]types.Order) - for _, openOrder := range openOrders { - openOrdersMap[openOrder.OrderID] = openOrder - } - - var errs error - // update active orders not in open orders - for _, activeOrder := range activeOrders { - if _, exist := openOrdersMap[activeOrder.OrderID]; exist { - // no need to sync active order already in active orderbook, because we only need to know if it filled or not. - delete(openOrdersMap, activeOrder.OrderID) - } else { - opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID) - - // sleep 100ms to avoid DDOS - time.Sleep(100 * time.Millisecond) - - if err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID); err != nil { - opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID) - errs = multierr.Append(errs, err) - continue - } - } - } - - // update open orders not in active orders - for _, openOrder := range openOrdersMap { - // we don't add open orders into active orderbook if updated in 5 min - if openOrder.UpdateTime.After(notAddNonExistingOpenOrdersAfter) { - continue - } - - opts.activeOrderBook.Add(openOrder) - // opts.activeOrderBook.Update(openOrder) - } - - return errs -} - -func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error { - updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{ - Symbol: activeOrderBook.Symbol, - OrderID: strconv.FormatUint(orderID, 10), - }) - - if err != nil { - return err - } - - activeOrderBook.Update(*updatedOrder) - - return nil -} diff --git a/pkg/strategy/grid2/active_order_recover_test.go b/pkg/strategy/grid2/active_order_recover_test.go deleted file mode 100644 index dffdccc38..000000000 --- a/pkg/strategy/grid2/active_order_recover_test.go +++ /dev/null @@ -1,176 +0,0 @@ -package grid2 - -import ( - "context" - "strconv" - "testing" - "time" - - "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/types/mocks" - "github.com/golang/mock/gomock" - "github.com/prometheus/client_golang/prometheus" - "github.com/stretchr/testify/assert" -) - -func TestSyncActiveOrders(t *testing.T) { - assert := assert.New(t) - - mockCtrl := gomock.NewController(t) - defer mockCtrl.Finish() - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - symbol := "ETHUSDT" - labels := prometheus.Labels{ - "exchange": "default", - "symbol": symbol, - } - t.Run("all open orders are match with active orderbook", func(t *testing.T) { - mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) - mockExchange := mocks.NewMockExchange(mockCtrl) - activeOrderbook := bbgo.NewActiveOrderBook(symbol) - - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - - order := types.Order{ - OrderID: 1, - Status: types.OrderStatusNew, - } - order.Symbol = symbol - - activeOrderbook.Add(order) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil) - - assert.NoError(syncActiveOrders(ctx, opts)) - - // verify active orderbook - activeOrders := activeOrderbook.Orders() - assert.Equal(1, len(activeOrders)) - assert.Equal(uint64(1), activeOrders[0].OrderID) - assert.Equal(types.OrderStatusNew, activeOrders[0].Status) - }) - - t.Run("there is order in active orderbook but not in open orders", func(t *testing.T) { - mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) - mockExchange := mocks.NewMockExchange(mockCtrl) - activeOrderbook := bbgo.NewActiveOrderBook(symbol) - - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - - order := types.Order{ - OrderID: 1, - Status: types.OrderStatusNew, - SubmitOrder: types.SubmitOrder{ - Symbol: symbol, - }, - } - updatedOrder := order - updatedOrder.Status = types.OrderStatusFilled - - activeOrderbook.Add(order) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil) - mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{ - Symbol: symbol, - OrderID: strconv.FormatUint(order.OrderID, 10), - }).Return(&updatedOrder, nil) - - assert.NoError(syncActiveOrders(ctx, opts)) - - // verify active orderbook - activeOrders := activeOrderbook.Orders() - assert.Equal(0, len(activeOrders)) - }) - - t.Run("there is order on open orders but not in active orderbook", func(t *testing.T) { - mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) - mockExchange := mocks.NewMockExchange(mockCtrl) - activeOrderbook := bbgo.NewActiveOrderBook(symbol) - - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - - order := types.Order{ - OrderID: 1, - Status: types.OrderStatusNew, - SubmitOrder: types.SubmitOrder{ - Symbol: symbol, - }, - CreationTime: types.Time(time.Now()), - } - - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil) - assert.NoError(syncActiveOrders(ctx, opts)) - - // verify active orderbook - activeOrders := activeOrderbook.Orders() - assert.Equal(1, len(activeOrders)) - assert.Equal(uint64(1), activeOrders[0].OrderID) - assert.Equal(types.OrderStatusNew, activeOrders[0].Status) - }) - - t.Run("there is order on open order but not in active orderbook also order in active orderbook but not on open orders", func(t *testing.T) { - mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) - mockExchange := mocks.NewMockExchange(mockCtrl) - activeOrderbook := bbgo.NewActiveOrderBook(symbol) - - opts := SyncActiveOrdersOpts{ - logger: log, - metricsLabels: labels, - activeOrderBook: activeOrderbook, - orderQueryService: mockOrderQueryService, - exchange: mockExchange, - } - - order1 := types.Order{ - OrderID: 1, - Status: types.OrderStatusNew, - SubmitOrder: types.SubmitOrder{ - Symbol: symbol, - }, - } - updatedOrder1 := order1 - updatedOrder1.Status = types.OrderStatusFilled - order2 := types.Order{ - OrderID: 2, - Status: types.OrderStatusNew, - SubmitOrder: types.SubmitOrder{ - Symbol: symbol, - }, - } - - activeOrderbook.Add(order1) - mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil) - mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{ - Symbol: symbol, - OrderID: strconv.FormatUint(order1.OrderID, 10), - }).Return(&updatedOrder1, nil) - - assert.NoError(syncActiveOrders(ctx, opts)) - - // verify active orderbook - activeOrders := activeOrderbook.Orders() - assert.Equal(1, len(activeOrders)) - assert.Equal(uint64(2), activeOrders[0].OrderID) - assert.Equal(types.OrderStatusNew, activeOrders[0].Status) - }) -} diff --git a/pkg/strategy/grid2/profit_stats.go b/pkg/strategy/grid2/profit_stats.go index 5f47effc6..cd8367c23 100644 --- a/pkg/strategy/grid2/profit_stats.go +++ b/pkg/strategy/grid2/profit_stats.go @@ -22,7 +22,6 @@ type GridProfitStats struct { TotalFee map[string]fixedpoint.Value `json:"totalFee,omitempty"` Volume fixedpoint.Value `json:"volume,omitempty"` Market types.Market `json:"market,omitempty"` - ProfitEntries []*GridProfit `json:"profitEntries,omitempty"` Since *time.Time `json:"since,omitempty"` InitialOrderID uint64 `json:"initialOrderID"` } @@ -38,7 +37,6 @@ func newGridProfitStats(market types.Market) *GridProfitStats { TotalFee: make(map[string]fixedpoint.Value), Volume: fixedpoint.Zero, Market: market, - ProfitEntries: nil, } } @@ -69,8 +67,6 @@ func (s *GridProfitStats) AddProfit(profit *GridProfit) { case s.Market.BaseCurrency: s.TotalBaseProfit = s.TotalBaseProfit.Add(profit.Profit) } - - s.ProfitEntries = append(s.ProfitEntries, profit) } func (s *GridProfitStats) SlackAttachment() slack.Attachment { diff --git a/pkg/strategy/grid2/recover.go b/pkg/strategy/grid2/recover.go index 945f2c038..03fec35af 100644 --- a/pkg/strategy/grid2/recover.go +++ b/pkg/strategy/grid2/recover.go @@ -6,41 +6,121 @@ import ( "strconv" "time" - "github.com/pkg/errors" - "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" + "github.com/pkg/errors" ) -func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.ExchangeSession) error { - defer func() { - s.updateGridNumOfOrdersMetricsWithLock() - }() +/* + Background knowledge + 1. active orderbook add orders only when receive new order event or call Add/Update method manually + 2. active orderbook remove orders only when receive filled/cancelled event or call Remove/Update method manually - historyService, implemented := session.Exchange.(types.ExchangeTradeHistoryService) + As a result + 1. at the same twin-order-price, there is order in open orders but not in active orderbook + - not receive new order event + => add order into active orderbook + 2. at the same twin-order-price, there is order in active orderbook but not in open orders + - not receive filled event + => query the filled order and call Update method + 3. at the same twin-order-price, there is no order in open orders and no order in active orderbook + - failed to create the order + => query the last order from trades to emit filled, and it will submit again + - not receive new order event and the order filled before we find it. + => query the untracked order (also is the last order) from trades to emit filled and it will submit the reversed order + 4. at the same twin-order-price, there are different orders in open orders and active orderbook + - should not happen !!! + => log error + 5. at the same twin-order-price, there is the same order in open orders and active orderbook + - normal case + => no need to do anything + + After killing pod, active orderbook must be empty. we can think it is the same as not receive new event. + + Process + 1. build twin orderbook with pins and open orders. + 2. build twin orderbook with pins and active orders. + 3. compare above twin orderbooks to add open orders into active orderbook and update active orders. + 4. run grid recover to make sure all the twin price has its order. +*/ + +func (s *Strategy) initializeRecoverC() bool { + s.mu.Lock() + defer s.mu.Unlock() + + isInitialize := false + + if s.recoverC == nil { + s.logger.Info("initializing recover channel") + s.recoverC = make(chan struct{}, 1) + } else { + s.logger.Info("recover channel is already initialized, trigger active orders recover") + isInitialize = true + + select { + case s.recoverC <- struct{}{}: + s.logger.Info("trigger active orders recover") + default: + s.logger.Info("activeOrdersRecoverC is full") + } + } + + return isInitialize +} + +func (s *Strategy) recoverPeriodically(ctx context.Context) { + // every time we activeOrdersRecoverC receive signal, do active orders recover + if isInitialize := s.initializeRecoverC(); isInitialize { + return + } + + // make ticker's interval random in 25 min ~ 35 min + interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) + s.logger.Infof("[Recover] interval: %s", interval) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + if err := s.recover(ctx); err != nil { + s.logger.WithError(err).Error("failed to recover") + } + + case <-s.recoverC: + if err := s.recover(ctx); err != nil { + s.logger.WithError(err).Error("failed to recover") + } + } + } +} + +func (s *Strategy) recover(ctx context.Context) error { + historyService, implemented := s.session.Exchange.(types.ExchangeTradeHistoryService) // if the exchange doesn't support ExchangeTradeHistoryService, do not run recover if !implemented { s.logger.Warn("ExchangeTradeHistoryService is not implemented, can not recover grid") return nil } - openOrders, err := session.Exchange.QueryOpenOrders(ctx, s.Symbol) + activeOrderBook := s.orderExecutor.ActiveMakerOrders() + activeOrders := activeOrderBook.Orders() + + openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol) if err != nil { - return errors.Wrapf(err, "unable to query open orders when recovering") + return err } - s.logger.Infof("found %d open orders left on the %s order book", len(openOrders), s.Symbol) - - if s.GridProfitStats.InitialOrderID != 0 { - s.logger.Info("InitialOrderID is already there, need to recover") - } else if len(openOrders) != 0 { - s.logger.Info("even though InitialOrderID is 0, there are open orders so need to recover") - } else { - s.logger.Info("InitialOrderID is 0 and there is no open orders, query trades to check it") - // initial order id may be new strategy or lost data in redis, so we need to check trades + open orders - // if there are open orders or trades, we need to recover + // check if it's new strategy or need to recover + if len(activeOrders) == 0 && len(openOrders) == 0 && s.GridProfitStats.InitialOrderID == 0 { + // even though there is no open orders and initial orderID is 0 + // we still need to query trades to make sure if we need to recover or not trades, err := historyService.QueryTrades(ctx, s.Symbol, &types.TradeQueryOptions{ // from 1, because some API will ignore 0 last trade id LastTradeID: 1, @@ -53,181 +133,153 @@ func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.Ex } if len(trades) == 0 { - s.logger.Info("0 trades found, it's a new strategy so no need to recover") + s.logger.Info("no open order, no active order, no trade, it's a new strategy so no need to recover") return nil } } - s.logger.Infof("start recovering") - filledOrders, err := s.getFilledOrdersByScanningTrades(ctx, historyService, s.orderQueryService, openOrders) - if err != nil { - return errors.Wrap(err, "grid recover error") - } - s.debugOrders("emit filled orders", filledOrders) + s.logger.Info("start recovering") - // add open orders into avtive maker orders - s.addOrdersToActiveOrderBook(openOrders) - - // emit the filled orders - activeOrderBook := s.orderExecutor.ActiveMakerOrders() - for _, filledOrder := range filledOrders { - activeOrderBook.EmitFilled(filledOrder) + if s.getGrid() == nil { + s.setGrid(s.newGrid()) + } + + s.mu.Lock() + defer s.mu.Unlock() + + pins := s.getGrid().Pins + + activeOrdersInTwinOrderBook, err := s.buildTwinOrderBook(pins, activeOrders) + openOrdersInTwinOrderBook, err := s.buildTwinOrderBook(pins, openOrders) + + s.logger.Infof("active orders' twin orderbook\n%s", activeOrdersInTwinOrderBook.String()) + s.logger.Infof("open orders in twin orderbook\n%s", openOrdersInTwinOrderBook.String()) + + // remove index 0, because twin orderbook's price is from the second one + pins = pins[1:] + var noTwinOrderPins []fixedpoint.Value + + for _, pin := range pins { + v := fixedpoint.Value(pin) + activeOrder := activeOrdersInTwinOrderBook.GetTwinOrder(v) + openOrder := openOrdersInTwinOrderBook.GetTwinOrder(v) + if activeOrder == nil || openOrder == nil { + return fmt.Errorf("there is no any twin order at this pin, can not recover") + } + + var activeOrderID uint64 = 0 + if activeOrder.Exist() { + activeOrderID = activeOrder.GetOrder().OrderID + } + + var openOrderID uint64 = 0 + if openOrder.Exist() { + openOrderID = openOrder.GetOrder().OrderID + } + + // case 3 + if activeOrderID == 0 && openOrderID == 0 { + noTwinOrderPins = append(noTwinOrderPins, v) + continue + } + + // case 1 + if activeOrderID == 0 { + activeOrderBook.Add(openOrder.GetOrder()) + // also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid + activeOrdersInTwinOrderBook.AddTwinOrder(v, openOrder) + continue + } + + // case 2 + if openOrderID == 0 { + syncActiveOrder(ctx, activeOrderBook, s.orderQueryService, activeOrder.GetOrder().OrderID) + continue + } + + // case 4 + if activeOrderID != openOrderID { + return fmt.Errorf("there are two different orders in the same pin, can not recover") + } + + // case 5 + // do nothing + } + + s.logger.Infof("twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String()) + + if err := s.recoverEmptyGrid(ctx, activeOrdersInTwinOrderBook, historyService, s.orderQueryService); err != nil { + s.logger.WithError(err).Error("failed to recover empty grid") + return err + } + + s.logger.Infof("twin orderbook after recovering\n%s", activeOrdersInTwinOrderBook.String()) + + if activeOrdersInTwinOrderBook.EmptyTwinOrderSize() > 0 { + return fmt.Errorf("there is still empty grid in twin orderbook") + } + + for _, pin := range noTwinOrderPins { + twinOrder := activeOrdersInTwinOrderBook.GetTwinOrder(pin) + if twinOrder == nil { + return fmt.Errorf("should not get nil twin order after recovering empty grid, check it") + } + + if !twinOrder.Exist() { + return fmt.Errorf("should not get empty twin order after recovering empty grid, check it") + } + + activeOrderBook.EmitFilled(twinOrder.GetOrder()) + + time.Sleep(100 * time.Millisecond) } - // emit ready after recover s.EmitGridReady() - // debug and send metrics - // wait for the reverse order to be placed time.Sleep(2 * time.Second) debugGrid(s.logger, s.grid, s.orderExecutor.ActiveMakerOrders()) - defer bbgo.Sync(ctx, s) - - if s.EnableProfitFixer { - until := time.Now() - since := until.Add(-7 * 24 * time.Hour) - if s.FixProfitSince != nil { - since = s.FixProfitSince.Time() - } - - fixer := newProfitFixer(s.grid, s.Symbol, historyService) - fixer.SetLogger(s.logger) - - // set initial order ID = 0 instead of s.GridProfitStats.InitialOrderID because the order ID could be incorrect - if err := fixer.Fix(ctx, since, until, 0, s.GridProfitStats); err != nil { - return err - } - - s.logger.Infof("fixed profitStats: %#v", s.GridProfitStats) - - s.EmitGridProfit(s.GridProfitStats, nil) - } + bbgo.Sync(ctx, s) return nil } -func (s *Strategy) getFilledOrdersByScanningTrades(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, openOrdersOnGrid []types.Order) ([]types.Order, error) { - // set grid - grid := s.newGrid() - s.setGrid(grid) +func (s *Strategy) buildTwinOrderBook(pins []Pin, orders []types.Order) (*TwinOrderBook, error) { + book := NewTwinOrderBook(pins) - expectedNumOfOrders := s.GridNum - 1 - numGridOpenOrders := int64(len(openOrdersOnGrid)) - s.debugLog("open orders nums: %d, expected nums: %d", numGridOpenOrders, expectedNumOfOrders) - if expectedNumOfOrders == numGridOpenOrders { - // no need to recover, only need to add open orders back to active order book - return nil, nil - } else if expectedNumOfOrders < numGridOpenOrders { - return nil, fmt.Errorf("amount of grid's open orders should not > amount of expected grid's orders") + for _, order := range orders { + if err := book.AddOrder(order); err != nil { + return nil, err + } } - // 1. build twin-order map - twinOrdersOpen, err := s.buildTwinOrderMap(grid.Pins, openOrdersOnGrid) - if err != nil { - return nil, errors.Wrapf(err, "failed to build pin order map with open orders") - } - - // 2. build the filled twin-order map by querying trades - expectedFilledNum := int(expectedNumOfOrders - numGridOpenOrders) - twinOrdersFilled, err := s.buildFilledTwinOrderMapFromTrades(ctx, queryTradesService, queryOrderService, twinOrdersOpen, expectedFilledNum) - if err != nil { - return nil, errors.Wrapf(err, "failed to build filled pin order map") - } - - // 3. get the filled orders from twin-order map - filledOrders := twinOrdersFilled.AscendingOrders() - - // 4. verify the grid - if err := s.verifyFilledTwinGrid(s.grid.Pins, twinOrdersOpen, filledOrders); err != nil { - return nil, errors.Wrapf(err, "verify grid with error") - } - - return filledOrders, nil + return book, nil } -func (s *Strategy) verifyFilledTwinGrid(pins []Pin, twinOrders TwinOrderMap, filledOrders []types.Order) error { - s.debugLog("verifying filled grid - pins: %+v", pins) - s.debugOrders("verifying filled grid - filled orders", filledOrders) - s.debugLog("verifying filled grid - open twin orders:\n%s", twinOrders.String()) +func syncActiveOrder(ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService, orderID uint64) error { + updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{ + Symbol: activeOrderBook.Symbol, + OrderID: strconv.FormatUint(orderID, 10), + }) - if err := s.addOrdersIntoTwinOrderMap(twinOrders, filledOrders); err != nil { - return errors.Wrapf(err, "verifying filled grid error when add orders into twin order map") + if err != nil { + return err } - s.debugLog("verifying filled grid - filled twin orders:\n%+v", twinOrders.String()) - - for i, pin := range pins { - // we use twinOrderMap to make sure there are no duplicated order at one grid, and we use the sell price as key so we skip the pins[0] which is only for buy price - if i == 0 { - continue - } - - twin, exist := twinOrders[fixedpoint.Value(pin)] - if !exist { - return fmt.Errorf("there is no order at price (%+v)", pin) - } - - if !twin.Exist() { - return fmt.Errorf("all the price need a twin") - } - - if !twin.IsValid() { - return fmt.Errorf("all the twins need to be valid") - } - } + activeOrderBook.Update(*updatedOrder) return nil } -// buildTwinOrderMap build the pin-order map with grid and open orders. -// The keys of this map contains all required pins of this grid. -// If the Order of the pin is empty types.Order (OrderID == 0), it means there is no open orders at this pin. -func (s *Strategy) buildTwinOrderMap(pins []Pin, openOrders []types.Order) (TwinOrderMap, error) { - twinOrderMap := make(TwinOrderMap) - - for i, pin := range pins { - // twin order map only use sell price as key, so skip 0 - if i == 0 { - continue - } - - twinOrderMap[fixedpoint.Value(pin)] = TwinOrder{} +func (s *Strategy) recoverEmptyGrid(ctx context.Context, twinOrderBook *TwinOrderBook, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService) error { + if twinOrderBook.EmptyTwinOrderSize() == 0 { + s.logger.Info("no empty grid") + return nil } - for _, openOrder := range openOrders { - twinKey, err := findTwinOrderMapKey(s.grid, openOrder) - if err != nil { - return nil, errors.Wrapf(err, "failed to build twin order map") - } + existedOrders := twinOrderBook.SyncOrderMap() - twinOrder, exist := twinOrderMap[twinKey] - if !exist { - return nil, fmt.Errorf("the price of the openOrder (id: %d) is not in pins", openOrder.OrderID) - } - - if twinOrder.Exist() { - return nil, fmt.Errorf("there are multiple order in a twin") - } - - twinOrder.SetOrder(openOrder) - twinOrderMap[twinKey] = twinOrder - } - - return twinOrderMap, nil -} - -// buildFilledTwinOrderMapFromTrades will query the trades from last 24 hour and use them to build a pin order map -// It will skip the orders on pins at which open orders are already -func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, twinOrdersOpen TwinOrderMap, expectedFillNum int) (TwinOrderMap, error) { - twinOrdersFilled := make(TwinOrderMap) - - // existedOrders is used to avoid re-query the same orders - existedOrders := twinOrdersOpen.SyncOrderMap() - - // get the filled orders when bbgo is down in order from trades until := time.Now() - // the first query only query the last 1 hour, because mostly shutdown and recovery happens within 1 hour since := until.Add(-1 * time.Hour) // hard limit for recover recoverSinceLimit := time.Date(2023, time.March, 10, 0, 0, 0, 0, time.UTC) @@ -237,15 +289,15 @@ func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryT } for { - if err := s.queryTradesToUpdateTwinOrdersMap(ctx, queryTradesService, queryOrderService, twinOrdersOpen, twinOrdersFilled, existedOrders, since, until); err != nil { - return nil, errors.Wrapf(err, "failed to query trades to update twin orders map") + if err := s.queryTradesToUpdateTwinOrderBook(ctx, twinOrderBook, queryTradesService, queryOrderService, existedOrders, since, until); err != nil { + return errors.Wrapf(err, "failed to query trades to update twin orderbook") } until = since since = until.Add(-6 * time.Hour) - if len(twinOrdersFilled) >= expectedFillNum { - s.logger.Infof("stop querying trades because twin orders filled (%d) >= expected filled nums (%d)", len(twinOrdersFilled), expectedFillNum) + if twinOrderBook.EmptyTwinOrderSize() == 0 { + s.logger.Infof("stop querying trades because there is no empty twin order on twin orderbook") break } @@ -260,10 +312,10 @@ func (s *Strategy) buildFilledTwinOrderMapFromTrades(ctx context.Context, queryT } } - return twinOrdersFilled, nil + return nil } -func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, twinOrdersOpen, twinOrdersFilled TwinOrderMap, existedOrders *types.SyncOrderMap, since, until time.Time) error { +func (s *Strategy) queryTradesToUpdateTwinOrderBook(ctx context.Context, twinOrderBook *TwinOrderBook, queryTradesService types.ExchangeTradeHistoryService, queryOrderService types.ExchangeOrderQueryService, existedOrders *types.SyncOrderMap, since, until time.Time) error { var fromTradeID uint64 = 0 var limit int64 = 1000 for { @@ -275,9 +327,8 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr }) if err != nil { - return errors.Wrapf(err, "failed to query trades to recover the grid with open orders") + return errors.Wrapf(err, "failed to query trades to recover the grid") } - s.debugLog("QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades)) for _, trade := range trades { @@ -306,31 +357,9 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr // add 1 to avoid duplicate fromTradeID = trade.ID + 1 - twinOrderKey, err := findTwinOrderMapKey(s.grid, *order) - if err != nil { - return errors.Wrapf(err, "failed to find grid order map's key when recover") + if err := twinOrderBook.AddOrder(*order); err != nil { + return errors.Wrapf(err, "failed to add queried order into twin orderbook") } - - twinOrderOpen, exist := twinOrdersOpen[twinOrderKey] - if !exist { - return fmt.Errorf("the price of the order with the same GroupID is not in pins") - } - - if twinOrderOpen.Exist() { - continue - } - - if twinOrder, exist := twinOrdersFilled[twinOrderKey]; exist { - to := twinOrder.GetOrder() - if to.UpdateTime.Time().After(order.UpdateTime.Time()) { - s.logger.Infof("twinOrder's update time (%s) should not be after order's update time (%s)", to.UpdateTime, order.UpdateTime) - continue - } - } - - twinOrder := TwinOrder{} - twinOrder.SetOrder(*order) - twinOrdersFilled[twinOrderKey] = twinOrder } // stop condition @@ -339,24 +368,3 @@ func (s *Strategy) queryTradesToUpdateTwinOrdersMap(ctx context.Context, queryTr } } } - -func (s *Strategy) addOrdersIntoTwinOrderMap(twinOrders TwinOrderMap, orders []types.Order) error { - for _, order := range orders { - k, err := findTwinOrderMapKey(s.grid, order) - if err != nil { - return errors.Wrap(err, "failed to add orders into twin order map") - } - - if v, exist := twinOrders[k]; !exist { - return fmt.Errorf("the price (%+v) is not in pins", k) - } else if v.Exist() { - return fmt.Errorf("there is already a twin order at this price (%+v)", k) - } else { - twin := TwinOrder{} - twin.SetOrder(order) - twinOrders[k] = twin - } - } - - return nil -} diff --git a/pkg/strategy/grid2/recover_test.go b/pkg/strategy/grid2/recover_test.go deleted file mode 100644 index 997cac9e5..000000000 --- a/pkg/strategy/grid2/recover_test.go +++ /dev/null @@ -1,295 +0,0 @@ -package grid2 - -import ( - "context" - "encoding/csv" - "encoding/json" - "fmt" - "io" - "io/ioutil" - "os" - "sort" - "strconv" - "testing" - "time" - - "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" -) - -type TestData struct { - Market types.Market `json:"market" yaml:"market"` - Strategy Strategy `json:"strategy" yaml:"strategy"` - OpenOrders []types.Order `json:"openOrders" yaml:"openOrders"` - ClosedOrders []types.Order `json:"closedOrders" yaml:"closedOrders"` - Trades []types.Trade `json:"trades" yaml:"trades"` -} - -type TestDataService struct { - Orders map[string]types.Order - Trades []types.Trade -} - -func (t *TestDataService) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) { - var i int = 0 - if options.LastTradeID != 0 { - for idx, trade := range t.Trades { - if trade.ID < options.LastTradeID { - continue - } - - i = idx - break - } - } - - var trades []types.Trade - l := len(t.Trades) - for ; i < l && len(trades) < int(options.Limit); i++ { - trades = append(trades, t.Trades[i]) - } - - return trades, nil -} - -func (t *TestDataService) QueryOrder(ctx context.Context, q types.OrderQuery) (*types.Order, error) { - if len(q.OrderID) == 0 { - return nil, fmt.Errorf("order id should not be empty") - } - - order, exist := t.Orders[q.OrderID] - if !exist { - return nil, fmt.Errorf("order not found") - } - - return &order, nil -} - -// dummy method for interface -func (t *TestDataService) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) { - return nil, nil -} - -// dummy method for interface -func (t *TestDataService) QueryOrderTrades(ctx context.Context, q types.OrderQuery) ([]types.Trade, error) { - return nil, nil -} - -func NewStrategy(t *TestData) *Strategy { - s := t.Strategy - s.Debug = true - s.Initialize() - s.Market = t.Market - s.Position = types.NewPositionFromMarket(t.Market) - s.orderExecutor = bbgo.NewGeneralOrderExecutor(&bbgo.ExchangeSession{}, t.Market.Symbol, ID, s.InstanceID(), s.Position) - return &s -} - -func NewTestDataService(t *TestData) *TestDataService { - var orders map[string]types.Order = make(map[string]types.Order) - for _, order := range t.OpenOrders { - orders[strconv.FormatUint(order.OrderID, 10)] = order - } - - for _, order := range t.ClosedOrders { - orders[strconv.FormatUint(order.OrderID, 10)] = order - } - - trades := t.Trades - sort.Slice(t.Trades, func(i, j int) bool { - return trades[i].ID < trades[j].ID - }) - - return &TestDataService{ - Orders: orders, - Trades: trades, - } -} - -func readSpec(fileName string) (*TestData, error) { - content, err := ioutil.ReadFile(fileName) - if err != nil { - return nil, err - } - - market := types.Market{} - if err := json.Unmarshal(content, &market); err != nil { - return nil, err - } - - strategy := Strategy{} - if err := json.Unmarshal(content, &strategy); err != nil { - return nil, err - } - - data := TestData{ - Market: market, - Strategy: strategy, - } - return &data, nil -} - -func readOrdersFromCSV(fileName string) ([]types.Order, error) { - csvFile, err := os.Open(fileName) - if err != nil { - return nil, err - } - defer csvFile.Close() - csvReader := csv.NewReader(csvFile) - - keys, err := csvReader.Read() - if err != nil { - return nil, err - } - - var orders []types.Order - for { - row, err := csvReader.Read() - if err == io.EOF { - break - } - - if err != nil { - return nil, err - } - - if len(row) != len(keys) { - return nil, fmt.Errorf("length of row should be equal to length of keys") - } - - var m map[string]interface{} = make(map[string]interface{}) - for i, key := range keys { - if key == "orderID" { - x, err := strconv.ParseUint(row[i], 10, 64) - if err != nil { - return nil, err - } - m[key] = x - } else { - m[key] = row[i] - } - } - - b, err := json.Marshal(m) - if err != nil { - return nil, err - } - - order := types.Order{} - if err = json.Unmarshal(b, &order); err != nil { - return nil, err - } - - orders = append(orders, order) - } - - return orders, nil -} - -func readTradesFromCSV(fileName string) ([]types.Trade, error) { - csvFile, err := os.Open(fileName) - if err != nil { - return nil, err - } - defer csvFile.Close() - csvReader := csv.NewReader(csvFile) - - keys, err := csvReader.Read() - if err != nil { - return nil, err - } - - var trades []types.Trade - for { - row, err := csvReader.Read() - if err == io.EOF { - break - } - - if err != nil { - return nil, err - } - - if len(row) != len(keys) { - return nil, fmt.Errorf("length of row should be equal to length of keys") - } - - var m map[string]interface{} = make(map[string]interface{}) - for i, key := range keys { - switch key { - case "id", "orderID": - x, err := strconv.ParseUint(row[i], 10, 64) - if err != nil { - return nil, err - } - m[key] = x - default: - m[key] = row[i] - } - } - - b, err := json.Marshal(m) - if err != nil { - return nil, err - } - - trade := types.Trade{} - if err = json.Unmarshal(b, &trade); err != nil { - return nil, err - } - - trades = append(trades, trade) - } - - return trades, nil -} - -func readTestDataFrom(fileDir string) (*TestData, error) { - data, err := readSpec(fmt.Sprintf("%s/spec", fileDir)) - if err != nil { - return nil, err - } - - openOrders, err := readOrdersFromCSV(fmt.Sprintf("%s/open_orders.csv", fileDir)) - if err != nil { - return nil, err - } - - closedOrders, err := readOrdersFromCSV(fmt.Sprintf("%s/closed_orders.csv", fileDir)) - if err != nil { - return nil, err - } - - trades, err := readTradesFromCSV(fmt.Sprintf("%s/trades.csv", fileDir)) - if err != nil { - return nil, err - } - - data.OpenOrders = openOrders - data.ClosedOrders = closedOrders - data.Trades = trades - return data, nil -} - -func TestRecoverByScanningTrades(t *testing.T) { - assert := assert.New(t) - - t.Run("test case 1", func(t *testing.T) { - fileDir := "recovery_testcase/testcase1/" - - data, err := readTestDataFrom(fileDir) - if !assert.NoError(err) { - return - } - - testService := NewTestDataService(data) - strategy := NewStrategy(data) - filledOrders, err := strategy.getFilledOrdersByScanningTrades(context.Background(), testService, testService, data.OpenOrders) - if !assert.NoError(err) { - return - } - - assert.Len(filledOrders, 0) - }) -} diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 736b1ad67..8b6bac86d 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -204,7 +204,7 @@ type Strategy struct { tradingCtx, writeCtx context.Context cancelWrite context.CancelFunc - activeOrdersRecoverC chan struct{} + recoverC chan struct{} // this ensures that bbgo.Sync to lock the object sync.Mutex @@ -1953,7 +1953,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. return } - s.recoverActiveOrdersPeriodically(ctx) + s.recoverPeriodically(ctx) }) } else { s.startProcess(ctx, session) @@ -1968,7 +1968,7 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi if s.RecoverOrdersWhenStart { // do recover only when triggerPrice is not set and not in the back-test mode s.logger.Infof("recoverWhenStart is set, trying to recover grid orders...") - if err := s.recoverGrid(ctx, session); err != nil { + if err := s.recover(ctx); err != nil { // if recover fail, return and do not open grid s.logger.WithError(err).Error("failed to start process, recover error") s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error")) @@ -1985,6 +1985,7 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi return nil } +/* func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error { if s.RecoverGridByScanningTrades { s.debugLog("recovering grid by scanning trades") @@ -1994,6 +1995,7 @@ func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSessio s.debugLog("recovering grid by scanning orders") return s.recoverByScanningOrders(ctx, session) } +*/ func (s *Strategy) recoverByScanningOrders(ctx context.Context, session *bbgo.ExchangeSession) error { openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol) diff --git a/pkg/strategy/grid2/twin_order.go b/pkg/strategy/grid2/twin_order.go index adeeb5263..88f9df50c 100644 --- a/pkg/strategy/grid2/twin_order.go +++ b/pkg/strategy/grid2/twin_order.go @@ -111,3 +111,128 @@ func (m TwinOrderMap) String() string { sb.WriteString("================== END OF PIN ORDER MAP ==================\n") return sb.String() } + +type TwinOrderBook struct { + // sort in asc order + pins []fixedpoint.Value + + // pin index, use to find the next or last pin in desc order + pinIdx map[fixedpoint.Value]int + + // orderbook + m map[fixedpoint.Value]*TwinOrder + + size int +} + +func NewTwinOrderBook(pins []Pin) *TwinOrderBook { + var v []fixedpoint.Value + for _, pin := range pins { + v = append(v, fixedpoint.Value(pin)) + } + + // sort it in asc order + sort.Slice(v, func(i, j int) bool { + return v[j].Compare(v[i]) > 0 + }) + + pinIdx := make(map[fixedpoint.Value]int) + m := make(map[fixedpoint.Value]*TwinOrder) + for i, pin := range v { + m[pin] = &TwinOrder{} + pinIdx[pin] = i + } + + ob := TwinOrderBook{ + pins: v, + pinIdx: pinIdx, + m: m, + size: 0, + } + + return &ob +} + +func (book *TwinOrderBook) String() string { + var sb strings.Builder + + sb.WriteString("================== TWIN ORDER MAP ==================\n") + for _, pin := range book.pins { + twin := book.m[fixedpoint.Value(pin)] + twinOrder := twin.GetOrder() + sb.WriteString(fmt.Sprintf("-> %8s) %s\n", pin, twinOrder.String())) + } + sb.WriteString("================== END OF PIN ORDER MAP ==================\n") + return sb.String() +} + +func (book *TwinOrderBook) GetTwinOrderPin(order types.Order) (fixedpoint.Value, error) { + idx, exist := book.pinIdx[order.Price] + if !exist { + return fixedpoint.Zero, fmt.Errorf("the order's (%d) price (%s) is not in pins", order.OrderID, order.Price) + } + + if order.Side == types.SideTypeBuy { + idx++ + if idx >= len(book.pins) { + return fixedpoint.Zero, fmt.Errorf("this order's twin order price is not in pins, %+v", order) + } + } else if order.Side == types.SideTypeSell { + if idx == 0 { + return fixedpoint.Zero, fmt.Errorf("this order's twin order price is at zero index, %+v", order) + } + // do nothing + } else { + // should not happen + return fixedpoint.Zero, fmt.Errorf("the order's (%d) side (%s) is not supported", order.OrderID, order.Side) + } + + return book.pins[idx], nil +} + +func (book *TwinOrderBook) AddOrder(order types.Order) error { + pin, err := book.GetTwinOrderPin(order) + if err != nil { + return err + } + + twinOrder, exist := book.m[pin] + if !exist { + // should not happen + return fmt.Errorf("no any empty twin order at pins, should not happen, check it") + } + + if !twinOrder.Exist() { + book.size++ + } + twinOrder.SetOrder(order) + + return nil +} + +func (book *TwinOrderBook) GetTwinOrder(pin fixedpoint.Value) *TwinOrder { + return book.m[pin] +} + +func (book *TwinOrderBook) AddTwinOrder(pin fixedpoint.Value, order *TwinOrder) { + book.m[pin] = order +} + +func (book *TwinOrderBook) Size() int { + return book.size +} + +func (book *TwinOrderBook) EmptyTwinOrderSize() int { + return len(book.pins) - 1 - book.size +} + +func (book *TwinOrderBook) SyncOrderMap() *types.SyncOrderMap { + orderMap := types.NewSyncOrderMap() + for _, twin := range book.m { + if twin.Exist() { + orderMap.Add(twin.GetOrder()) + } + } + + return orderMap +}