diff --git a/pkg/strategy/dca2/collector.go b/pkg/strategy/dca2/collector.go index add007c93..9d4201d9f 100644 --- a/pkg/strategy/dca2/collector.go +++ b/pkg/strategy/dca2/collector.go @@ -2,7 +2,6 @@ package dca2 import ( "context" - "fmt" "strconv" "time" @@ -11,15 +10,22 @@ import ( "github.com/sirupsen/logrus" ) +// Round contains the open-position orders and the take-profit orders +// 1. len(OpenPositionOrders) == 0 -> not open position +// 2. len(TakeProfitOrders) == 0 -> not in the take-profit stage +// 3. There are take-profit orders only when open-position orders are cancelled +// 4. We need to make sure the order: open-position (BUY) -> take-profit (SELL) -> open-position (BUY) -> take-profit (SELL) -> ... +// 5. When there is one filled take-profit order, this round must be finished. We need to verify all take-profit orders are not active type Round struct { OpenPositionOrders []types.Order - TakeProfitOrder types.Order + TakeProfitOrders []types.Order } type Collector struct { - logger *logrus.Entry - symbol string - groupID uint32 + logger *logrus.Entry + symbol string + groupID uint32 + filterGroupID bool // service ex types.Exchange @@ -29,7 +35,7 @@ type Collector struct { queryClosedOrderDesc descendingClosedOrderQueryService } -func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *Collector { +func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, filterGroupID bool, ex types.Exchange) *Collector { historyService, ok := ex.(types.ExchangeTradeHistoryService) if !ok { logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name()) @@ -58,6 +64,7 @@ func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types. logger: logger, symbol: symbol, groupID: groupID, + filterGroupID: filterGroupID, ex: ex, historyService: historyService, queryService: queryService, @@ -94,7 +101,7 @@ func (rc Collector) CollectCurrentRound(ctx context.Context) (Round, error) { lastSide := takeProfitSide for _, order := range allOrders { // group id filter is used for debug when local running - if order.GroupID != rc.groupID { + if rc.filterGroupID && order.GroupID != rc.groupID { continue } @@ -106,10 +113,7 @@ func (rc Collector) CollectCurrentRound(ctx context.Context) (Round, error) { case openPositionSide: currentRound.OpenPositionOrders = append(currentRound.OpenPositionOrders, order) case takeProfitSide: - if currentRound.TakeProfitOrder.OrderID != 0 { - return currentRound, fmt.Errorf("there are two take-profit orders in one round, please check it") - } - currentRound.TakeProfitOrder = order + currentRound.TakeProfitOrders = append(currentRound.TakeProfitOrders, order) default: } @@ -133,7 +137,7 @@ func (rc *Collector) CollectFinishRounds(ctx context.Context, fromOrderID uint64 var round Round for _, order := range orders { // skip not this strategy order - if order.GroupID != rc.groupID { + if rc.filterGroupID && order.GroupID != rc.groupID { continue } @@ -141,12 +145,20 @@ func (rc *Collector) CollectFinishRounds(ctx context.Context, fromOrderID uint64 case types.SideTypeBuy: round.OpenPositionOrders = append(round.OpenPositionOrders, order) case types.SideTypeSell: + round.TakeProfitOrders = append(round.TakeProfitOrders, order) + if order.Status != types.OrderStatusFilled { - rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status) + rc.logger.Infof("take-profit order is not filled (%s), so this round is not finished. Keep collecting", order.Status) continue } - round.TakeProfitOrder = order + for _, o := range round.TakeProfitOrders { + if types.IsActiveOrder(o) { + // Should not happen ! but we only log it + rc.logger.Errorf("unexpected error, there is at least one take-profit order #%d is still active, please check it. %s", o.OrderID, o.String()) + } + } + rounds = append(rounds, round) round = Round{} default: @@ -164,10 +176,7 @@ func (rc *Collector) CollectRoundTrades(ctx context.Context, round Round) ([]typ var roundTrades []types.Trade var roundOrders []types.Order = round.OpenPositionOrders - // if the take-profit order's OrderID == 0 -> no take-profit order. - if round.TakeProfitOrder.OrderID != 0 { - roundOrders = append(roundOrders, round.TakeProfitOrder) - } + roundOrders = append(roundOrders, round.TakeProfitOrders...) for _, order := range roundOrders { if order.ExecutedQuantity.IsZero() { diff --git a/pkg/strategy/dca2/collector_test.go b/pkg/strategy/dca2/collector_test.go new file mode 100644 index 000000000..6818428a5 --- /dev/null +++ b/pkg/strategy/dca2/collector_test.go @@ -0,0 +1,72 @@ +package dca2 + +import ( + "testing" + + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/types/mocks" + "github.com/stretchr/testify/assert" + "go.uber.org/mock/gomock" +) + +func Test_NewCollector(t *testing.T) { + symbol := "ETHUSDT" + logger := log.WithField("strategy", ID) + + t.Run("return nil if the exchange doesn't support ExchangeTradeHistoryService", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockEx := mocks.NewMockExchange(mockCtrl) + mockEx.EXPECT().Name().Return(types.ExchangeMax) + + collector := NewCollector(logger, symbol, 0, false, mockEx) + + assert.Nil(t, collector) + }) + + t.Run("return nil if the exchange doesn't support ExchangeOrderQueryService", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockEx := mocks.NewMockExchange(mockCtrl) + mockEx.EXPECT().Name().Return(types.ExchangeMax) + + mockTradeHistoryService := mocks.NewMockExchangeTradeHistoryService(mockCtrl) + + type TestEx struct { + types.Exchange + types.ExchangeTradeHistoryService + } + + ex := TestEx{ + Exchange: mockEx, + ExchangeTradeHistoryService: mockTradeHistoryService, + } + + collector := NewCollector(logger, symbol, 0, false, ex) + + assert.Nil(t, collector) + }) + + t.Run("return nil if the exchange doesn't support descendingClosedOrderQueryService", func(t *testing.T) { + mockCtrl := gomock.NewController(t) + mockEx := mocks.NewMockExchange(mockCtrl) + mockEx.EXPECT().Name().Return(types.ExchangeMax) + + mockTradeHistoryService := mocks.NewMockExchangeTradeHistoryService(mockCtrl) + mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl) + + type TestEx struct { + types.Exchange + types.ExchangeTradeHistoryService + types.ExchangeOrderQueryService + } + + ex := TestEx{ + Exchange: mockEx, + ExchangeTradeHistoryService: mockTradeHistoryService, + ExchangeOrderQueryService: mockOrderQueryService, + } + + collector := NewCollector(logger, symbol, 0, false, ex) + + assert.Nil(t, collector) + }) +} diff --git a/pkg/strategy/dca2/debug.go b/pkg/strategy/dca2/debug.go index cf474080c..de383826c 100644 --- a/pkg/strategy/dca2/debug.go +++ b/pkg/strategy/dca2/debug.go @@ -22,7 +22,9 @@ func (s *Strategy) debugOrders(submitOrders []types.Order) { func debugRoundOrders(logger *logrus.Entry, roundName string, round Round) { var sb strings.Builder sb.WriteString("ROUND " + roundName + " [\n") - sb.WriteString(round.TakeProfitOrder.String() + "\n") + for i, order := range round.TakeProfitOrders { + sb.WriteString(fmt.Sprintf("%3d) ", i+1) + order.String() + "\n") + } sb.WriteString("------------------------------------------------\n") for i, order := range round.OpenPositionOrders { sb.WriteString(fmt.Sprintf("%3d) ", i+1) + order.String() + "\n") diff --git a/pkg/strategy/dca2/recover.go b/pkg/strategy/dca2/recover.go index f28abb154..b3c0c2093 100644 --- a/pkg/strategy/dca2/recover.go +++ b/pkg/strategy/dca2/recover.go @@ -64,63 +64,55 @@ func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, or orderStore := orderExecutor.OrderStore() // dca stop at take-profit order stage - if currentRound.TakeProfitOrder.OrderID != 0 { - // the number of open-positions orders may not be equal to maxOrderCount, because the notional may not enough to open maxOrderCount orders - if len(currentRound.OpenPositionOrders) > maxOrderCount { - return None, fmt.Errorf("there is take-profit order but the number of open-position orders (%d) is greater than maxOrderCount(%d). Please check it", len(currentRound.OpenPositionOrders), maxOrderCount) + if len(currentRound.TakeProfitOrders) > 0 { + openedOrders, cancelledOrders, filledOrders, unexpectedOrders := classifyOrders(currentRound.TakeProfitOrders) + + if len(unexpectedOrders) > 0 { + return None, fmt.Errorf("there is unexpected status in orders %+v", unexpectedOrders) } - takeProfitOrder := currentRound.TakeProfitOrder - if takeProfitOrder.Status == types.OrderStatusFilled { + if len(filledOrders) > 0 && len(openedOrders) == 0 { return WaitToOpenPosition, nil - } else if types.IsActiveOrder(takeProfitOrder) { - activeOrderBook.Add(takeProfitOrder) - orderStore.Add(takeProfitOrder) - return TakeProfitReady, nil - } else { - return None, fmt.Errorf("the status of take-profit order is %s. Please check it", takeProfitOrder.Status) } + + if len(filledOrders) == 0 && len(openedOrders) > 0 { + // add opened order into order store + for _, order := range openedOrders { + activeOrderBook.Add(order) + orderStore.Add(order) + } + return TakeProfitReady, nil + } + + return None, fmt.Errorf("the classify orders count is not expected (opened: %d, cancelled: %d, filled: %d)", len(openedOrders), len(cancelledOrders), len(filledOrders)) } // dca stop at no take-profit order stage openPositionOrders := currentRound.OpenPositionOrders - numOpenPositionOrders := len(openPositionOrders) // new strategy if len(openPositionOrders) == 0 { return WaitToOpenPosition, nil } - // should not happen - if numOpenPositionOrders > maxOrderCount { - return None, fmt.Errorf("the number of open-position orders (%d) is > max order number", numOpenPositionOrders) - } - // collect open-position orders' status - var openedCnt, filledCnt, cancelledCnt int64 - for _, order := range currentRound.OpenPositionOrders { - switch order.Status { - case types.OrderStatusNew, types.OrderStatusPartiallyFilled: - activeOrderBook.Add(order) - orderStore.Add(order) - openedCnt++ - case types.OrderStatusFilled: - filledCnt++ - case types.OrderStatusCanceled: - cancelledCnt++ - default: - return None, fmt.Errorf("there is unexpected status %s of order %s", order.Status, order) - } + openedOrders, cancelledOrders, filledOrders, unexpectedOrders := classifyOrders(currentRound.OpenPositionOrders) + if len(unexpectedOrders) > 0 { + return None, fmt.Errorf("there is unexpected status of orders %+v", unexpectedOrders) + } + for _, order := range openedOrders { + activeOrderBook.Add(order) + orderStore.Add(order) } // no order is filled -> OpenPositionReady - if filledCnt == 0 { + if len(filledOrders) == 0 { return OpenPositionReady, nil } // there are at least one open-position orders filled - if cancelledCnt == 0 { - if openedCnt > 0 { + if len(cancelledOrders) == 0 { + if len(openedOrders) > 0 { return OpenPositionOrderFilled, nil } else { // all open-position orders filled, change to cancelling and place the take-profit order @@ -141,13 +133,18 @@ func recoverPosition(ctx context.Context, position *types.Position, currentRound position.Reset() var positionOrders []types.Order - if currentRound.TakeProfitOrder.OrderID != 0 { - // if the take-profit order is already filled, the position is 0 - if !types.IsActiveOrder(currentRound.TakeProfitOrder) { - return nil - } - positionOrders = append(positionOrders, currentRound.TakeProfitOrder) + var filledCnt int64 + for _, order := range currentRound.TakeProfitOrders { + if !types.IsActiveOrder(order) { + filledCnt++ + } + positionOrders = append(positionOrders, order) + } + + // all take-profit orders are filled + if len(currentRound.TakeProfitOrders) > 0 && filledCnt == int64(len(currentRound.TakeProfitOrders)) { + return nil } for _, order := range currentRound.OpenPositionOrders { @@ -184,9 +181,30 @@ func recoverProfitStats(ctx context.Context, strategy *Strategy) error { } func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time { - if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled { - return currentRound.TakeProfitOrder.UpdateTime.Time().Add(coolDownInterval.Duration()) + var startTimeOfNextRound time.Time + + for _, order := range currentRound.TakeProfitOrders { + if t := order.UpdateTime.Time().Add(coolDownInterval.Duration()); t.After(startTimeOfNextRound) { + startTimeOfNextRound = t + } } - return time.Time{} + return startTimeOfNextRound +} + +func classifyOrders(orders []types.Order) (opened, cancelled, filled, unexpected []types.Order) { + for _, order := range orders { + switch order.Status { + case types.OrderStatusNew, types.OrderStatusPartiallyFilled: + opened = append(opened, order) + case types.OrderStatusFilled: + filled = append(filled, order) + case types.OrderStatusCanceled: + cancelled = append(cancelled, order) + default: + unexpected = append(unexpected, order) + } + } + + return opened, cancelled, filled, unexpected } diff --git a/pkg/strategy/dca2/recover_test.go b/pkg/strategy/dca2/recover_test.go index db9f9ea78..472ecee77 100644 --- a/pkg/strategy/dca2/recover_test.go +++ b/pkg/strategy/dca2/recover_test.go @@ -23,19 +23,6 @@ func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt } -type MockQueryOrders struct { - OpenOrders []types.Order - ClosedOrders []types.Order -} - -func (m *MockQueryOrders) QueryOpenOrders(ctx context.Context, symbol string) ([]types.Order, error) { - return m.OpenOrders, nil -} - -func (m *MockQueryOrders) QueryClosedOrdersDesc(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) ([]types.Order, error) { - return m.ClosedOrders, nil -} - func Test_RecoverState(t *testing.T) { strategy := newTestStrategy() @@ -123,7 +110,9 @@ func Test_RecoverState(t *testing.T) { t.Run("at take profit stage, and not filled yet", func(t *testing.T) { now := time.Now() currentRound := Round{ - TakeProfitOrder: generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now), + TakeProfitOrders: []types.Order{ + generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now), + }, OpenPositionOrders: []types.Order{ generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)), generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)), @@ -142,7 +131,9 @@ func Test_RecoverState(t *testing.T) { t.Run("at take profit stage, take-profit order filled", func(t *testing.T) { now := time.Now() currentRound := Round{ - TakeProfitOrder: generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now), + TakeProfitOrders: []types.Order{ + generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now), + }, OpenPositionOrders: []types.Order{ generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)), generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)), @@ -158,3 +149,24 @@ func Test_RecoverState(t *testing.T) { assert.Equal(t, WaitToOpenPosition, state) }) } + +func Test_classifyOrders(t *testing.T) { + orders := []types.Order{ + types.Order{Status: types.OrderStatusCanceled}, + types.Order{Status: types.OrderStatusFilled}, + types.Order{Status: types.OrderStatusCanceled}, + types.Order{Status: types.OrderStatusFilled}, + types.Order{Status: types.OrderStatusPartiallyFilled}, + types.Order{Status: types.OrderStatusCanceled}, + types.Order{Status: types.OrderStatusPartiallyFilled}, + types.Order{Status: types.OrderStatusNew}, + types.Order{Status: types.OrderStatusRejected}, + types.Order{Status: types.OrderStatusCanceled}, + } + + opened, cancelled, filled, unexpected := classifyOrders(orders) + assert.Equal(t, 3, len(opened)) + assert.Equal(t, 4, len(cancelled)) + assert.Equal(t, 2, len(filled)) + assert.Equal(t, 1, len(unexpected)) +} diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index 02a029887..a43830700 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -63,7 +63,8 @@ type Strategy struct { CoolDownInterval types.Duration `json:"coolDownInterval"` // OrderGroupID is the group ID used for the strategy instance for canceling orders - OrderGroupID uint32 `json:"orderGroupID"` + OrderGroupID uint32 `json:"orderGroupID"` + DisableOrderGroupIDFilter bool `json:"disableOrderGroupIDFilter"` // RecoverWhenStart option is used for recovering dca states RecoverWhenStart bool `json:"recoverWhenStart"` @@ -185,7 +186,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. } // collector - s.collector = NewCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange) + s.collector = NewCollector(s.logger, s.Symbol, s.OrderGroupID, !s.DisableOrderGroupIDFilter, s.ExchangeSession.Exchange) if s.collector == nil { return fmt.Errorf("failed to initialize collector") } @@ -481,7 +482,11 @@ func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) { } // update profit stats FromOrderID to make sure we will not collect duplicated rounds - s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1 + for _, order := range round.TakeProfitOrders { + if order.OrderID >= s.ProfitStats.FromOrderID { + s.ProfitStats.FromOrderID = order.OrderID + 1 + } + } // update quote investment s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit) diff --git a/pkg/strategy/dca2/take_profit.go b/pkg/strategy/dca2/take_profit.go index a5255bb45..543a251ce 100644 --- a/pkg/strategy/dca2/take_profit.go +++ b/pkg/strategy/dca2/take_profit.go @@ -17,8 +17,8 @@ func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error { return errors.Wrap(err, "failed to place the take-profit order when collecting current round") } - if currentRound.TakeProfitOrder.OrderID != 0 { - return fmt.Errorf("there is a take-profit order before placing the take-profit order, please check it") + if len(currentRound.TakeProfitOrders) > 0 { + return fmt.Errorf("there is a take-profit order before placing the take-profit order, please check it and manually fix it") } trades, err := s.collector.CollectRoundTrades(ctx, currentRound)