Merge pull request #1617 from c9s/kbearXD/dca2/refactor-and-limitlog

MINOR: [dca2] refactor and make open-position interval longer
This commit is contained in:
kbearXD 2024-04-16 17:00:47 +08:00 committed by GitHub
commit fe53319817
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 33 additions and 50 deletions

View File

@ -54,7 +54,7 @@ func (s *Strategy) recoverActiveOrders(ctx context.Context) error {
opts := common.SyncActiveOrdersOpts{
Logger: s.logger,
Exchange: s.ExchangeSession.Exchange,
OrderQueryService: s.roundCollector.queryService,
OrderQueryService: s.collector.queryService,
ActiveOrderBook: activeOrders,
OpenOrders: openOrders,
}

View File

@ -6,18 +6,15 @@ import (
"strconv"
"time"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)
type RoundCollector struct {
type Collector struct {
logger *logrus.Entry
symbol string
groupID uint32
isMax bool
// service
ex types.Exchange
@ -27,8 +24,7 @@ type RoundCollector struct {
queryClosedOrderDesc descendingClosedOrderQueryService
}
func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *RoundCollector {
isMax := exchange.IsMaxExchange(ex)
func NewCollector(logger *logrus.Entry, symbol string, groupID uint32, ex types.Exchange) *Collector {
historyService, ok := ex.(types.ExchangeTradeHistoryService)
if !ok {
logger.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", ex.Name())
@ -53,11 +49,10 @@ func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex t
return nil
}
return &RoundCollector{
return &Collector{
logger: logger,
symbol: symbol,
groupID: groupID,
isMax: isMax,
ex: ex,
historyService: historyService,
queryService: queryService,
@ -66,7 +61,7 @@ func NewRoundCollector(logger *logrus.Entry, symbol string, groupID uint32, ex t
}
}
func (rc RoundCollector) CollectCurrentRound(ctx context.Context) (Round, error) {
func (rc Collector) CollectCurrentRound(ctx context.Context) (Round, error) {
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, rc.ex, rc.symbol)
if err != nil {
return Round{}, err
@ -119,7 +114,7 @@ func (rc RoundCollector) CollectCurrentRound(ctx context.Context) (Round, error)
return currentRound, nil
}
func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID uint64) ([]Round, error) {
func (rc *Collector) CollectFinishRounds(ctx context.Context, fromOrderID uint64) ([]Round, error) {
// TODO: pagination for it
// query the orders
rc.logger.Infof("query %s closed orders from order id #%d", rc.symbol, fromOrderID)
@ -141,16 +136,9 @@ func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID u
case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if !rc.isMax {
if order.Status != types.OrderStatusFilled {
rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
} else {
if !maxapi.IsFilledOrderState(maxapi.OrderState(order.OriginalStatus)) {
rc.logger.Infof("isMax and take-profit order is %s not done or finalizing, so this round is not finished. Skip it", order.OriginalStatus)
continue
}
if order.Status != types.OrderStatusFilled {
rc.logger.Infof("take-profit order is %s not filled, so this round is not finished. Skip it", order.Status)
continue
}
round.TakeProfitOrder = order
@ -164,7 +152,7 @@ func (rc *RoundCollector) CollectFinishRounds(ctx context.Context, fromOrderID u
return rounds, nil
}
func (rc *RoundCollector) CollectRoundTrades(ctx context.Context, round Round) ([]types.Trade, error) {
func (rc *Collector) CollectRoundTrades(ctx context.Context, round Round) ([]types.Trade, error) {
debugRoundOrders(rc.logger, "collect round trades", round)
var roundTrades []types.Trade
@ -176,7 +164,6 @@ func (rc *RoundCollector) CollectRoundTrades(ctx context.Context, round Round) (
}
for _, order := range roundOrders {
rc.logger.Infof("collect trades from order: %s", order.String())
if order.ExecutedQuantity.IsZero() {
rc.logger.Info("collect trads from order but no executed quantity ", order.String())
continue

View File

@ -16,15 +16,9 @@ type descendingClosedOrderQueryService interface {
QueryClosedOrdersDesc(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) ([]types.Order, error)
}
type RecoverApiQueryService interface {
types.ExchangeOrderQueryService
types.ExchangeTradeService
descendingClosedOrderQueryService
}
func (s *Strategy) recover(ctx context.Context) error {
s.logger.Info("[DCA] recover")
currentRound, err := s.roundCollector.CollectCurrentRound(ctx)
currentRound, err := s.collector.CollectCurrentRound(ctx)
debugRoundOrders(s.logger, "current", currentRound)
// recover profit stats
@ -41,12 +35,7 @@ func (s *Strategy) recover(ctx context.Context) error {
if s.DisablePositionRecover {
s.logger.Info("disablePositionRecover is set, skip position recovery")
} else {
queryService, ok := s.ExchangeSession.Exchange.(RecoverApiQueryService)
if !ok {
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.ExchangeSession.ExchangeName)
}
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
if err := recoverPosition(ctx, s.Position, currentRound, s.collector.queryService); err != nil {
return err
}
s.logger.Info("recover position DONE")
@ -145,7 +134,7 @@ func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, or
return None, fmt.Errorf("unexpected order status combination (opened, filled, cancelled) = (%d, %d, %d)", openedCnt, filledCnt, cancelledCnt)
}
func recoverPosition(ctx context.Context, position *types.Position, queryService RecoverApiQueryService, currentRound Round) error {
func recoverPosition(ctx context.Context, position *types.Position, currentRound Round, queryService types.ExchangeOrderQueryService) error {
if position == nil {
return fmt.Errorf("position is nil, please check it")
}

View File

@ -7,6 +7,10 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
)
const (
openPositionRetryInterval = 10 * time.Minute
)
type State int64
const (
@ -142,7 +146,6 @@ func (s *Strategy) runWaitToOpenPositionState(ctx context.Context, next State) {
}
if time.Now().Before(s.startTimeOfNextRound) {
s.logger.Infof("[State] WaitToOpenPosition - before the startTimeOfNextRound %s", s.startTimeOfNextRound.String())
return
}
@ -152,11 +155,13 @@ func (s *Strategy) runWaitToOpenPositionState(ctx context.Context, next State) {
func (s *Strategy) runPositionOpening(ctx context.Context, next State) {
s.logger.Info("[State] PositionOpening - start placing open-position orders")
if err := s.placeOpenPositionOrders(ctx); err != nil {
s.logger.WithError(err).Error("failed to place dca orders, please check it.")
s.logger.WithError(err).Error("failed to place open-position orders, please check it.")
// try after 1 minute when failed to placing orders
s.startTimeOfNextRound = s.startTimeOfNextRound.Add(1 * time.Minute)
s.startTimeOfNextRound = s.startTimeOfNextRound.Add(openPositionRetryInterval)
s.logger.Infof("reset startTimeOfNextRound to %s", s.startTimeOfNextRound.String())
s.updateState(WaitToOpenPosition)
return
}
@ -224,5 +229,5 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) {
// set the start time of the next round
s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration())
s.updateState(WaitToOpenPosition)
s.logger.Info("[State] TakeProfitReady -> WaitToOpenPosition")
s.logger.Infof("[State] TakeProfitReady -> WaitToOpenPosition (startTimeOfNextRound: %s)", s.startTimeOfNextRound.String())
}

View File

@ -93,7 +93,7 @@ type Strategy struct {
mu sync.Mutex
nextStateC chan State
state State
roundCollector *RoundCollector
collector *Collector
takeProfitPrice fixedpoint.Value
startTimeOfNextRound time.Time
nextRoundPaused bool
@ -192,10 +192,10 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.OrderGroupID = util.FNV32(instanceID) % math.MaxInt32
}
// round collector
s.roundCollector = NewRoundCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange)
if s.roundCollector == nil {
return fmt.Errorf("failed to initialize round collector")
// collector
s.collector = NewCollector(s.logger, s.Symbol, s.OrderGroupID, s.ExchangeSession.Exchange)
if s.collector == nil {
return fmt.Errorf("failed to initialize collector")
}
// prometheus
@ -218,6 +218,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}
s.OrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.OrderExecutor.SetMaxRetries(10)
s.OrderExecutor.BindEnvironment(s.Environment)
s.OrderExecutor.Bind()
@ -448,14 +449,14 @@ func (s *Strategy) UpdateProfitStatsUntilSuccessful(ctx context.Context) error {
// return false, nil -> there is no finished round!
// return true, error -> At least one round update profit stats successfully but there is error when collecting other rounds
func (s *Strategy) UpdateProfitStats(ctx context.Context) (bool, error) {
rounds, err := s.roundCollector.CollectFinishRounds(ctx, s.ProfitStats.FromOrderID)
rounds, err := s.collector.CollectFinishRounds(ctx, s.ProfitStats.FromOrderID)
if err != nil {
return false, errors.Wrapf(err, "failed to collect finish rounds from #%d", s.ProfitStats.FromOrderID)
}
var updated bool = false
for _, round := range rounds {
trades, err := s.roundCollector.CollectRoundTrades(ctx, round)
trades, err := s.collector.CollectRoundTrades(ctx, round)
if err != nil {
return updated, errors.Wrapf(err, "failed to collect the trades of round")
}

View File

@ -11,12 +11,12 @@ import (
func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
s.logger.Info("start placing take profit orders")
currentRound, err := s.roundCollector.CollectCurrentRound(ctx)
currentRound, err := s.collector.CollectCurrentRound(ctx)
if currentRound.TakeProfitOrder.OrderID != 0 {
return fmt.Errorf("there is a take-profit order before placing the take-profit order, please check it")
}
trades, err := s.roundCollector.CollectRoundTrades(ctx, currentRound)
trades, err := s.collector.CollectRoundTrades(ctx, currentRound)
if err != nil {
return errors.Wrap(err, "failed to place the take-profit order when collecting round trades")
}
@ -24,6 +24,7 @@ func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
roundPosition := types.NewPositionFromMarket(s.Market)
for _, trade := range trades {
s.logger.Infof("add trade into the position of this round %s", trade.String())
if trade.FeeProcessing {
return fmt.Errorf("failed to place the take-profit order because there is a trade's fee not ready")
}