From 671772a76775f9321e7e098d00031fecbfdcfb09 Mon Sep 17 00:00:00 2001 From: chiahung Date: Mon, 30 Oct 2023 16:28:34 +0800 Subject: [PATCH 1/2] FIX: retry to get open orders only for 5 times and do not sync orders updated in 3 min --- pkg/exchange/retry/order.go | 19 +++++++++++++++++++ pkg/strategy/grid2/active_order_recover.go | 15 +++++++++++---- pkg/strategy/grid2/recover.go | 2 +- 3 files changed, 31 insertions(+), 5 deletions(-) diff --git a/pkg/exchange/retry/order.go b/pkg/exchange/retry/order.go index 15b59c6af..5a8c8855b 100644 --- a/pkg/exchange/retry/order.go +++ b/pkg/exchange/retry/order.go @@ -47,6 +47,15 @@ func GeneralBackoff(ctx context.Context, op backoff2.Operation) (err error) { return err } +func GeneralBackoffLite(ctx context.Context, op backoff2.Operation) (err error) { + err = backoff2.Retry(op, backoff2.WithContext( + backoff2.WithMaxRetries( + backoff2.NewExponentialBackOff(), + 5), + ctx)) + return err +} + func QueryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) { var op = func() (err2 error) { openOrders, err2 = ex.QueryOpenOrders(ctx, symbol) @@ -57,6 +66,16 @@ func QueryOpenOrdersUntilSuccessful(ctx context.Context, ex types.Exchange, symb return openOrders, err } +func QueryOpenOrdersUntilSuccessfulLite(ctx context.Context, ex types.Exchange, symbol string) (openOrders []types.Order, err error) { + var op = func() (err2 error) { + openOrders, err2 = ex.QueryOpenOrders(ctx, symbol) + return err2 + } + + err = GeneralBackoffLite(ctx, op) + return openOrders, err +} + func QueryOrderUntilSuccessful(ctx context.Context, query types.ExchangeOrderQueryService, opts types.OrderQuery) (order *types.Order, err error) { var op = func() (err2 error) { order, err2 = query.QueryOrder(ctx, opts) diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index 5042084fc..82269832e 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -89,9 +89,10 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders") - notAddNonExistingOpenOrdersAfter := time.Now().Add(-5 * time.Minute) + // do not sync orders which is updated in 3 min, because we may receive from websocket and handle it twice + doNotSyncAfter := time.Now().Add(-3 * time.Minute) - openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, opts.exchange, opts.activeOrderBook.Symbol) + openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, opts.exchange, opts.activeOrderBook.Symbol) if err != nil { opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time") @@ -116,6 +117,10 @@ func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { delete(openOrdersMap, activeOrder.OrderID) } else { opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID) + if activeOrder.UpdateTime.After(doNotSyncAfter) { + opts.logger.Infof("active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID) + continue + } // sleep 100ms to avoid DDOS time.Sleep(100 * time.Millisecond) @@ -130,8 +135,10 @@ func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { // update open orders not in active orders for _, openOrder := range openOrdersMap { - // we don't add open orders into active orderbook if updated in 5 min - if openOrder.UpdateTime.After(notAddNonExistingOpenOrdersAfter) { + opts.logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID) + // we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice. + if openOrder.UpdateTime.After(doNotSyncAfter) { + opts.logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID) continue } diff --git a/pkg/strategy/grid2/recover.go b/pkg/strategy/grid2/recover.go index 80a19008a..0634b72d4 100644 --- a/pkg/strategy/grid2/recover.go +++ b/pkg/strategy/grid2/recover.go @@ -54,7 +54,7 @@ func (s *Strategy) recover(ctx context.Context) error { activeOrderBook := s.orderExecutor.ActiveMakerOrders() activeOrders := activeOrderBook.Orders() - openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol) + openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.session.Exchange, s.Symbol) if err != nil { return err } From d33240ec83781ed92010941b7fc3104e38e3d3fb Mon Sep 17 00:00:00 2001 From: chiahung Date: Mon, 30 Oct 2023 17:17:36 +0800 Subject: [PATCH 2/2] rename and simplify import --- pkg/exchange/retry/order.go | 26 +++++++++++----------- pkg/strategy/grid2/active_order_recover.go | 8 +++---- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/pkg/exchange/retry/order.go b/pkg/exchange/retry/order.go index 5a8c8855b..df6ef6b59 100644 --- a/pkg/exchange/retry/order.go +++ b/pkg/exchange/retry/order.go @@ -5,10 +5,9 @@ import ( "errors" "strconv" - backoff2 "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v4" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util/backoff" ) type advancedOrderCancelService interface { @@ -18,7 +17,7 @@ type advancedOrderCancelService interface { } func QueryOrderUntilFilled(ctx context.Context, queryOrderService types.ExchangeOrderQueryService, symbol string, orderId uint64) (o *types.Order, err error) { - err = backoff.RetryGeneral(ctx, func() (err2 error) { + var op = func() (err2 error) { o, err2 = queryOrderService.QueryOrder(ctx, types.OrderQuery{ Symbol: symbol, OrderID: strconv.FormatUint(orderId, 10), @@ -33,24 +32,25 @@ func QueryOrderUntilFilled(ctx context.Context, queryOrderService types.Exchange } return err2 - }) + } + err = GeneralBackoff(ctx, op) return o, err } -func GeneralBackoff(ctx context.Context, op backoff2.Operation) (err error) { - err = backoff2.Retry(op, backoff2.WithContext( - backoff2.WithMaxRetries( - backoff2.NewExponentialBackOff(), +func GeneralBackoff(ctx context.Context, op backoff.Operation) (err error) { + err = backoff.Retry(op, backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), 101), ctx)) return err } -func GeneralBackoffLite(ctx context.Context, op backoff2.Operation) (err error) { - err = backoff2.Retry(op, backoff2.WithContext( - backoff2.WithMaxRetries( - backoff2.NewExponentialBackOff(), +func GeneralLiteBackoff(ctx context.Context, op backoff.Operation) (err error) { + err = backoff.Retry(op, backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewExponentialBackOff(), 5), ctx)) return err @@ -72,7 +72,7 @@ func QueryOpenOrdersUntilSuccessfulLite(ctx context.Context, ex types.Exchange, return err2 } - err = GeneralBackoffLite(ctx, op) + err = GeneralLiteBackoff(ctx, op) return openOrders, err } diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index 82269832e..cfdaeac80 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -89,8 +89,8 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders") - // do not sync orders which is updated in 3 min, because we may receive from websocket and handle it twice - doNotSyncAfter := time.Now().Add(-3 * time.Minute) + // only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice + syncBefore := time.Now().Add(-3 * time.Minute) openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, opts.exchange, opts.activeOrderBook.Symbol) if err != nil { @@ -117,7 +117,7 @@ func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { delete(openOrdersMap, activeOrder.OrderID) } else { opts.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID) - if activeOrder.UpdateTime.After(doNotSyncAfter) { + if activeOrder.UpdateTime.After(syncBefore) { opts.logger.Infof("active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID) continue } @@ -137,7 +137,7 @@ func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error { for _, openOrder := range openOrdersMap { opts.logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID) // we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice. - if openOrder.UpdateTime.After(doNotSyncAfter) { + if openOrder.UpdateTime.After(syncBefore) { opts.logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID) continue }