Merge pull request #1645 from c9s/kbearXD/dca2/flexible-recovery

FEATURE: [dca2] make the take-profit order of round from order to orders
This commit is contained in:
kbearXD 2024-06-13 18:19:44 +08:00 committed by GitHub
commit 5098c3ac35
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 201 additions and 83 deletions

View File

@ -2,7 +2,6 @@ package dca2
import (
"context"
"fmt"
"strconv"
"time"
@ -11,15 +10,22 @@ import (
"github.com/sirupsen/logrus"
)
// Round contains the open-position orders and the take-profit orders
// 1. len(OpenPositionOrders) == 0 -> not open position
// 2. len(TakeProfitOrders) == 0 -> not in the take-profit stage
// 3. There are take-profit orders only when open-position orders are cancelled
// 4. We need to make sure the order: open-position (BUY) -> take-profit (SELL) -> open-position (BUY) -> take-profit (SELL) -> ...
// 5. When there is one filled take-profit order, this round must be finished. We need to verify all take-profit orders are not active
type Round struct {
OpenPositionOrders []types.Order
TakeProfitOrder types.Order
TakeProfitOrders []types.Order
}
type Collector struct {
logger *logrus.Entry
symbol string
groupID uint32
filterGroupID bool
// service
ex types.Exchange
@ -29,7 +35,7 @@ type Collector struct {
queryClosedOrderDesc descendingClosedOrderQueryService
}
func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *Collector {
func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, filterGroupID bool, ex types.Exchange) *Collector {
historyService, ok := ex.(types.ExchangeTradeHistoryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name())
@ -58,6 +64,7 @@ func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.
logger: logger,
symbol: symbol,
groupID: groupID,
filterGroupID: filterGroupID,
ex: ex,
historyService: historyService,
queryService: queryService,
@ -94,7 +101,7 @@ func (rc Collector) CollectCurrentRound(ctx context.Context) (Round, error) {
lastSide := takeProfitSide
for _, order := range allOrders {
// group id filter is used for debug when local running
if order.GroupID != rc.groupID {
if rc.filterGroupID && order.GroupID != rc.groupID {
continue
}
@ -106,10 +113,7 @@ func (rc Collector) CollectCurrentRound(ctx context.Context) (Round, error) {
case openPositionSide:
currentRound.OpenPositionOrders = append(currentRound.OpenPositionOrders, order)
case takeProfitSide:
if currentRound.TakeProfitOrder.OrderID != 0 {
return currentRound, fmt.Errorf("there are two take-profit orders in one round, please check it")
}
currentRound.TakeProfitOrder = order
currentRound.TakeProfitOrders = append(currentRound.TakeProfitOrders, order)
default:
}
@ -133,7 +137,7 @@ func (rc *Collector) CollectFinishRounds(ctx context.Context, fromOrderID uint64
var round Round
for _, order := range orders {
// skip not this strategy order
if order.GroupID != rc.groupID {
if rc.filterGroupID && order.GroupID != rc.groupID {
continue
}
@ -141,12 +145,20 @@ func (rc *Collector) CollectFinishRounds(ctx context.Context, fromOrderID uint64
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
round.TakeProfitOrders = append(round.TakeProfitOrders, order)
if order.Status != types.OrderStatusFilled {
rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
rc.logger.Infof("take-profit order is not filled (%s), so this round is not finished. Keep collecting", order.Status)
continue
}
round.TakeProfitOrder = order
for _, o := range round.TakeProfitOrders {
if types.IsActiveOrder(o) {
// Should not happen ! but we only log it
rc.logger.Errorf("unexpected error, there is at least one take-profit order #%d is still active, please check it. %s", o.OrderID, o.String())
}
}
rounds = append(rounds, round)
round = Round{}
default:
@ -164,10 +176,7 @@ func (rc *Collector) CollectRoundTrades(ctx context.Context, round Round) ([]typ
var roundTrades []types.Trade
var roundOrders []types.Order = round.OpenPositionOrders
// if the take-profit order's OrderID == 0 -> no take-profit order.
if round.TakeProfitOrder.OrderID != 0 {
roundOrders = append(roundOrders, round.TakeProfitOrder)
}
roundOrders = append(roundOrders, round.TakeProfitOrders...)
for _, order := range roundOrders {
if order.ExecutedQuantity.IsZero() {

View File

@ -0,0 +1,72 @@
package dca2
import (
"testing"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/stretchr/testify/assert"
"go.uber.org/mock/gomock"
)
func Test_NewCollector(t *testing.T) {
symbol := "ETHUSDT"
logger := log.WithField("strategy", ID)
t.Run("return nil if the exchange doesn't support ExchangeTradeHistoryService", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockEx := mocks.NewMockExchange(mockCtrl)
mockEx.EXPECT().Name().Return(types.ExchangeMax)
collector := NewCollector(logger, symbol, 0, false, mockEx)
assert.Nil(t, collector)
})
t.Run("return nil if the exchange doesn't support ExchangeOrderQueryService", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockEx := mocks.NewMockExchange(mockCtrl)
mockEx.EXPECT().Name().Return(types.ExchangeMax)
mockTradeHistoryService := mocks.NewMockExchangeTradeHistoryService(mockCtrl)
type TestEx struct {
types.Exchange
types.ExchangeTradeHistoryService
}
ex := TestEx{
Exchange: mockEx,
ExchangeTradeHistoryService: mockTradeHistoryService,
}
collector := NewCollector(logger, symbol, 0, false, ex)
assert.Nil(t, collector)
})
t.Run("return nil if the exchange doesn't support descendingClosedOrderQueryService", func(t *testing.T) {
mockCtrl := gomock.NewController(t)
mockEx := mocks.NewMockExchange(mockCtrl)
mockEx.EXPECT().Name().Return(types.ExchangeMax)
mockTradeHistoryService := mocks.NewMockExchangeTradeHistoryService(mockCtrl)
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
type TestEx struct {
types.Exchange
types.ExchangeTradeHistoryService
types.ExchangeOrderQueryService
}
ex := TestEx{
Exchange: mockEx,
ExchangeTradeHistoryService: mockTradeHistoryService,
ExchangeOrderQueryService: mockOrderQueryService,
}
collector := NewCollector(logger, symbol, 0, false, ex)
assert.Nil(t, collector)
})
}

View File

@ -22,7 +22,9 @@ func (s *Strategy) debugOrders(submitOrders []types.Order) {
func debugRoundOrders(logger *logrus.Entry, roundName string, round Round) {
var sb strings.Builder
sb.WriteString("ROUND " + roundName + " [\n")
sb.WriteString(round.TakeProfitOrder.String() + "\n")
for i, order := range round.TakeProfitOrders {
sb.WriteString(fmt.Sprintf("%3d) ", i+1) + order.String() + "\n")
}
sb.WriteString("------------------------------------------------\n")
for i, order := range round.OpenPositionOrders {
sb.WriteString(fmt.Sprintf("%3d) ", i+1) + order.String() + "\n")

View File

@ -64,63 +64,55 @@ func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, or
orderStore := orderExecutor.OrderStore()
// dca stop at take-profit order stage
if currentRound.TakeProfitOrder.OrderID != 0 {
// the number of open-positions orders may not be equal to maxOrderCount, because the notional may not enough to open maxOrderCount orders
if len(currentRound.OpenPositionOrders) > maxOrderCount {
return None, fmt.Errorf("there is take-profit order but the number of open-position orders (%d) is greater than maxOrderCount(%d). Please check it", len(currentRound.OpenPositionOrders), maxOrderCount)
if len(currentRound.TakeProfitOrders) > 0 {
openedOrders, cancelledOrders, filledOrders, unexpectedOrders := classifyOrders(currentRound.TakeProfitOrders)
if len(unexpectedOrders) > 0 {
return None, fmt.Errorf("there is unexpected status in orders %+v", unexpectedOrders)
}
takeProfitOrder := currentRound.TakeProfitOrder
if takeProfitOrder.Status == types.OrderStatusFilled {
if len(filledOrders) > 0 && len(openedOrders) == 0 {
return WaitToOpenPosition, nil
} else if types.IsActiveOrder(takeProfitOrder) {
activeOrderBook.Add(takeProfitOrder)
orderStore.Add(takeProfitOrder)
return TakeProfitReady, nil
} else {
return None, fmt.Errorf("the status of take-profit order is %s. Please check it", takeProfitOrder.Status)
}
if len(filledOrders) == 0 && len(openedOrders) > 0 {
// add opened order into order store
for _, order := range openedOrders {
activeOrderBook.Add(order)
orderStore.Add(order)
}
return TakeProfitReady, nil
}
return None, fmt.Errorf("the classify orders count is not expected (opened: %d, cancelled: %d, filled: %d)", len(openedOrders), len(cancelledOrders), len(filledOrders))
}
// dca stop at no take-profit order stage
openPositionOrders := currentRound.OpenPositionOrders
numOpenPositionOrders := len(openPositionOrders)
// new strategy
if len(openPositionOrders) == 0 {
return WaitToOpenPosition, nil
}
// should not happen
if numOpenPositionOrders > maxOrderCount {
return None, fmt.Errorf("the number of open-position orders (%d) is > max order number", numOpenPositionOrders)
}
// collect open-position orders' status
var openedCnt, filledCnt, cancelledCnt int64
for _, order := range currentRound.OpenPositionOrders {
switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled:
openedOrders, cancelledOrders, filledOrders, unexpectedOrders := classifyOrders(currentRound.OpenPositionOrders)
if len(unexpectedOrders) > 0 {
return None, fmt.Errorf("there is unexpected status of orders %+v", unexpectedOrders)
}
for _, order := range openedOrders {
activeOrderBook.Add(order)
orderStore.Add(order)
openedCnt++
case types.OrderStatusFilled:
filledCnt++
case types.OrderStatusCanceled:
cancelledCnt++
default:
return None, fmt.Errorf("there is unexpected status %s of order %s", order.Status, order)
}
}
// no order is filled -> OpenPositionReady
if filledCnt == 0 {
if len(filledOrders) == 0 {
return OpenPositionReady, nil
}
// there are at least one open-position orders filled
if cancelledCnt == 0 {
if openedCnt > 0 {
if len(cancelledOrders) == 0 {
if len(openedOrders) > 0 {
return OpenPositionOrderFilled, nil
} else {
// all open-position orders filled, change to cancelling and place the take-profit order
@ -141,13 +133,18 @@ func recoverPosition(ctx context.Context, position *types.Position, currentRound
position.Reset()
var positionOrders []types.Order
if currentRound.TakeProfitOrder.OrderID != 0 {
// if the take-profit order is already filled, the position is 0
if !types.IsActiveOrder(currentRound.TakeProfitOrder) {
return nil
var filledCnt int64
for _, order := range currentRound.TakeProfitOrders {
if !types.IsActiveOrder(order) {
filledCnt++
}
positionOrders = append(positionOrders, order)
}
positionOrders = append(positionOrders, currentRound.TakeProfitOrder)
// all take-profit orders are filled
if len(currentRound.TakeProfitOrders) > 0 && filledCnt == int64(len(currentRound.TakeProfitOrders)) {
return nil
}
for _, order := range currentRound.OpenPositionOrders {
@ -184,9 +181,30 @@ func recoverProfitStats(ctx context.Context, strategy *Strategy) error {
}
func recoverStartTimeOfNextRound(ctx context.Context, currentRound Round, coolDownInterval types.Duration) time.Time {
if currentRound.TakeProfitOrder.OrderID != 0 && currentRound.TakeProfitOrder.Status == types.OrderStatusFilled {
return currentRound.TakeProfitOrder.UpdateTime.Time().Add(coolDownInterval.Duration())
var startTimeOfNextRound time.Time
for _, order := range currentRound.TakeProfitOrders {
if t := order.UpdateTime.Time().Add(coolDownInterval.Duration()); t.After(startTimeOfNextRound) {
startTimeOfNextRound = t
}
}
return time.Time{}
return startTimeOfNextRound
}
func classifyOrders(orders []types.Order) (opened, cancelled, filled, unexpected []types.Order) {
for _, order := range orders {
switch order.Status {
case types.OrderStatusNew, types.OrderStatusPartiallyFilled:
opened = append(opened, order)
case types.OrderStatusFilled:
filled = append(filled, order)
case types.OrderStatusCanceled:
cancelled = append(cancelled, order)
default:
unexpected = append(unexpected, order)
}
}
return opened, cancelled, filled, unexpected
}

View File

@ -23,19 +23,6 @@ func generateTestOrder(side types.SideType, status types.OrderStatus, createdAt
}
type MockQueryOrders struct {
OpenOrders []types.Order
ClosedOrders []types.Order
}
func (m *MockQueryOrders) QueryOpenOrders(ctx context.Context, symbol string) ([]types.Order, error) {
return m.OpenOrders, nil
}
func (m *MockQueryOrders) QueryClosedOrdersDesc(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) ([]types.Order, error) {
return m.ClosedOrders, nil
}
func Test_RecoverState(t *testing.T) {
strategy := newTestStrategy()
@ -123,7 +110,9 @@ func Test_RecoverState(t *testing.T) {
t.Run("at take profit stage, and not filled yet", func(t *testing.T) {
now := time.Now()
currentRound := Round{
TakeProfitOrder: generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now),
TakeProfitOrders: []types.Order{
generateTestOrder(types.SideTypeSell, types.OrderStatusNew, now),
},
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
@ -142,7 +131,9 @@ func Test_RecoverState(t *testing.T) {
t.Run("at take profit stage, take-profit order filled", func(t *testing.T) {
now := time.Now()
currentRound := Round{
TakeProfitOrder: generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now),
TakeProfitOrders: []types.Order{
generateTestOrder(types.SideTypeSell, types.OrderStatusFilled, now),
},
OpenPositionOrders: []types.Order{
generateTestOrder(types.SideTypeBuy, types.OrderStatusFilled, now.Add(-1*time.Second)),
generateTestOrder(types.SideTypeBuy, types.OrderStatusCanceled, now.Add(-2*time.Second)),
@ -158,3 +149,24 @@ func Test_RecoverState(t *testing.T) {
assert.Equal(t, WaitToOpenPosition, state)
})
}
func Test_classifyOrders(t *testing.T) {
orders := []types.Order{
types.Order{Status: types.OrderStatusCanceled},
types.Order{Status: types.OrderStatusFilled},
types.Order{Status: types.OrderStatusCanceled},
types.Order{Status: types.OrderStatusFilled},
types.Order{Status: types.OrderStatusPartiallyFilled},
types.Order{Status: types.OrderStatusCanceled},
types.Order{Status: types.OrderStatusPartiallyFilled},
types.Order{Status: types.OrderStatusNew},
types.Order{Status: types.OrderStatusRejected},
types.Order{Status: types.OrderStatusCanceled},
}
opened, cancelled, filled, unexpected := classifyOrders(orders)
assert.Equal(t, 3, len(opened))
assert.Equal(t, 4, len(cancelled))
assert.Equal(t, 2, len(filled))
assert.Equal(t, 1, len(unexpected))
}

View File

@ -64,6 +64,7 @@ type Strategy struct {
// OrderGroupID is the group ID used for the strategy instance for canceling orders
OrderGroupID uint32 `json:"orderGroupID"`
DisableOrderGroupIDFilter bool `json:"disableOrderGroupIDFilter"`
// RecoverWhenStart option is used for recovering dca states
RecoverWhenStart bool `json:"recoverWhenStart"`
@ -185,7 +186,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}
// collector
s.collector = NewCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange)
s.collector = NewCollector(s.logger, s.Symbol, s.OrderGroupID, !s.DisableOrderGroupIDFilter, s.ExchangeSession.Exchange)
if s.collector == nil {
return fmt.Errorf("failed to initialize collector")
}
@ -481,7 +482,11 @@ func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) {
}
// update profit stats FromOrderID to make sure we will not collect duplicated rounds
s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1
for _, order := range round.TakeProfitOrders {
if order.OrderID >= s.ProfitStats.FromOrderID {
s.ProfitStats.FromOrderID = order.OrderID + 1
}
}
// update quote investment
s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit)

View File

@ -17,8 +17,8 @@ func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
return errors.Wrap(err, "failed to place the take-profit order when collecting current round")
}
if currentRound.TakeProfitOrder.OrderID != 0 {
return fmt.Errorf("there is a take-profit order before placing the take-profit order, please check it")
if len(currentRound.TakeProfitOrders) > 0 {
return fmt.Errorf("there is a take-profit order before placing the take-profit order, please check it and manually fix it")
}
trades, err := s.collector.CollectRoundTrades(ctx, currentRound)