Merge pull request #1576 from c9s/kbearXD/dca2/retry-and-add-log

dca2: add more log and retry
This commit is contained in:
kbearXD 2024-03-12 15:44:58 +08:00 committed by GitHub
commit 2bcf6ef290
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 88 additions and 22 deletions

View File

@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strconv"
"time"
"github.com/cenkalti/backoff/v4"
@ -119,6 +120,54 @@ func QueryOpenOrdersUntilSuccessfulLite(
return openOrders, err
}
func QueryClosedOrdersUntilSuccessful(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}
err = GeneralBackoff(ctx, op)
return closedOrders, err
}
func QueryClosedOrdersUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, since, until time.Time, lastOrderID uint64,
) (closedOrders []types.Order, err error) {
var op = func() (err2 error) {
closedOrders, err2 = ex.QueryClosedOrders(ctx, symbol, since, until, lastOrderID)
return err2
}
err = GeneralLiteBackoff(ctx, op)
return closedOrders, err
}
func QueryOrderTradesUntilSuccessful(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}
err = GeneralBackoff(ctx, op)
return trades, err
}
func QueryOrderTradesUntilSuccessfulLite(
ctx context.Context, ex types.ExchangeOrderQueryService, q types.OrderQuery,
) (trades []types.Trade, err error) {
var op = func() (err2 error) {
trades, err2 = ex.QueryOrderTrades(ctx, q)
return err2
}
err = GeneralLiteBackoff(ctx, op)
return trades, err
}
func QueryAccountUntilSuccessful(
ctx context.Context, ex types.ExchangeAccountService,
) (account *types.Account, err error) {

View File

@ -10,7 +10,7 @@ import (
)
func (s *Strategy) recoverPeriodically(ctx context.Context) {
s.logger.Info("[DCA] monitor and recover periodically")
s.logger.Info("monitor and recover periodically")
interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()

View File

@ -15,7 +15,7 @@ type cancelOrdersByGroupIDApi interface {
}
func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error {
s.logger.Infof("[DCA] start placing open position orders")
s.logger.Infof("start placing open position orders")
price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
return err

View File

@ -203,7 +203,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
// order executor
s.OrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
s.logger.Infof("[DCA] POSITION UPDATE: %s", s.Position.String())
s.logger.Infof("POSITION UPDATE: %s", s.Position.String())
bbgo.Sync(ctx, s)
// update take profit price here
@ -211,7 +211,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
})
s.OrderExecutor.ActiveMakerOrders().OnFilled(func(o types.Order) {
s.logger.Infof("[DCA] FILLED ORDER: %s", o.String())
s.logger.Infof("FILLED ORDER: %s", o.String())
openPositionSide := types.SideTypeBuy
takeProfitSide := types.SideTypeSell
@ -221,7 +221,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
case takeProfitSide:
s.emitNextState(WaitToOpenPosition)
default:
s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o)
s.logger.Infof("unsupported side (%s) of order: %s", o.Side, o)
}
// update metrics when filled
@ -244,7 +244,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
})
session.UserDataStream.OnAuth(func() {
s.logger.Info("[DCA] user data stream authenticated")
s.logger.Info("user data stream authenticated")
time.AfterFunc(3*time.Second, func() {
if isInitialize := s.initializeNextStateC(); !isInitialize {
@ -255,16 +255,29 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.updateState(WaitToOpenPosition)
} else {
// recover
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("[DCA] something wrong when state recovering")
return
maxTry := 3
for try := 1; try <= maxTry; try++ {
s.logger.Infof("try #%d recover", try)
err := s.recover(ctx)
if err == nil {
s.logger.Infof("recover successfully at #%d", try)
break
}
s.logger.WithError(err).Warnf("failed to recover at #%d", try)
if try == 3 {
s.logger.Errorf("failed to recover after %d trying, please check it", maxTry)
return
}
}
}
s.logger.Infof("[DCA] state: %d", s.state)
s.logger.Infof("[DCA] position %s", s.Position.String())
s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String())
s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound)
s.logger.Infof("state: %d", s.state)
s.logger.Infof("position %s", s.Position.String())
s.logger.Infof("profit stats %s", s.ProfitStats.String())
s.logger.Infof("startTimeOfNextRound %s", s.startTimeOfNextRound)
s.updateTakeProfitPrice()
@ -299,17 +312,17 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
func (s *Strategy) updateTakeProfitPrice() {
takeProfitRatio := s.TakeProfitRatio
s.takeProfitPrice = s.Market.TruncatePrice(s.Position.AverageCost.Mul(fixedpoint.One.Add(takeProfitRatio)))
s.logger.Infof("[DCA] cost: %s, ratio: %s, price: %s", s.Position.AverageCost, takeProfitRatio, s.takeProfitPrice)
s.logger.Infof("cost: %s, ratio: %s, price: %s", s.Position.AverageCost.String(), takeProfitRatio.String(), s.takeProfitPrice.String())
}
func (s *Strategy) Close(ctx context.Context) error {
s.logger.Infof("[DCA] closing %s dca2", s.Symbol)
s.logger.Infof("closing %s dca2", s.Symbol)
defer s.EmitClosed()
err := s.OrderExecutor.GracefulCancel(ctx)
if err != nil {
s.logger.WithError(err).Errorf("[DCA] there are errors when cancelling orders at close")
s.logger.WithError(err).Errorf("there are errors when cancelling orders at close")
}
bbgo.Sync(ctx, s)
@ -370,10 +383,12 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
// TODO: pagination for it
// query the orders
orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
s.logger.Infof("query %s closed orders from order id #%d", s.Symbol, s.ProfitStats.FromOrderID)
orders, err := retry.QueryClosedOrdersUntilSuccessfulLite(ctx, historyService, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
if err != nil {
return err
}
s.logger.Infof("there are %d closed orders from order id #%d", len(orders), s.ProfitStats.FromOrderID)
var rounds []Round
var round Round
@ -398,18 +413,20 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}
}
s.logger.Infof("there are %d rounds from order id #%d", len(rounds), s.ProfitStats.FromOrderID)
for _, round := range rounds {
debugRoundOrders(s.logger, "calculate", round)
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String())
s.logger.Infof("calculate profit stats from order: %s", order.String())
// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}
trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{
trades, err := retry.QueryOrderTradesUntilSuccessfulLite(ctx, queryService, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
})
@ -419,7 +436,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
}
for _, trade := range trades {
s.logger.Infof("[DCA] calculate profit stats from trade: %s", trade.String())
s.logger.Infof("calculate profit stats from trade: %s", trade.String())
s.ProfitStats.AddTrade(trade)
}
}
@ -430,7 +447,7 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
// store into persistence
bbgo.Sync(ctx, s)
s.logger.Infof("[DCA] profit stats:\n%s", s.ProfitStats.String())
s.logger.Infof("profit stats:\n%s", s.ProfitStats.String())
// emit profit
s.EmitProfit(s.ProfitStats)

View File

@ -8,7 +8,7 @@ import (
)
func (s *Strategy) placeTakeProfitOrders(ctx context.Context) error {
s.logger.Info("[DCA] start placing take profit orders")
s.logger.Info("start placing take profit orders")
order := generateTakeProfitOrder(s.Market, s.TakeProfitRatio, s.Position, s.OrderGroupID)
createdOrders, err := s.OrderExecutor.SubmitOrders(ctx, order)
if err != nil {