bbgo_origin/pkg/strategy/grid2/active_order_recover.go

160 lines
4.7 KiB
Go
Raw Normal View History

2023-10-02 03:56:15 +00:00
package grid2
import (
"context"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
2023-10-02 03:56:15 +00:00
)
type SyncActiveOrdersOpts struct {
logger *logrus.Entry
metricsLabels prometheus.Labels
activeOrderBook *bbgo.ActiveOrderBook
orderQueryService types.ExchangeOrderQueryService
exchange types.Exchange
}
2023-10-23 05:17:20 +00:00
func (s *Strategy) initializeRecoverC() bool {
2023-10-16 08:02:43 +00:00
s.mu.Lock()
defer s.mu.Unlock()
2023-10-17 08:13:05 +00:00
isInitialize := false
2023-10-16 08:02:43 +00:00
2023-10-23 05:17:20 +00:00
if s.recoverC == nil {
2023-10-17 08:13:05 +00:00
s.logger.Info("initializing recover channel")
2023-10-23 05:17:20 +00:00
s.recoverC = make(chan struct{}, 1)
2023-10-16 08:02:43 +00:00
} else {
2023-10-17 08:13:05 +00:00
s.logger.Info("recover channel is already initialized, trigger active orders recover")
isInitialize = true
select {
2023-10-23 05:17:20 +00:00
case s.recoverC <- struct{}{}:
2023-10-17 08:13:05 +00:00
s.logger.Info("trigger active orders recover")
default:
s.logger.Info("activeOrdersRecoverC is full")
}
2023-10-16 08:02:43 +00:00
}
2023-10-17 08:13:05 +00:00
return isInitialize
2023-10-16 08:02:43 +00:00
}
func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
2023-10-17 08:13:05 +00:00
// every time we activeOrdersRecoverC receive signal, do active orders recover
2023-10-23 05:17:20 +00:00
if isInitialize := s.initializeRecoverC(); isInitialize {
2023-10-16 08:02:43 +00:00
return
}
2023-10-02 03:56:15 +00:00
// make ticker's interval random in 25 min ~ 35 min
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[ActiveOrderRecover] interval: %s", interval)
ticker := time.NewTicker(interval)
2023-10-02 03:56:15 +00:00
defer ticker.Stop()
opts := SyncActiveOrdersOpts{
logger: s.logger,
metricsLabels: s.newPrometheusLabels(),
activeOrderBook: s.orderExecutor.ActiveMakerOrders(),
orderQueryService: s.orderQueryService,
exchange: s.session.Exchange,
}
2023-11-07 02:56:19 +00:00
var lastRecoverTime time.Time
2023-10-02 03:56:15 +00:00
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
2023-11-07 02:56:19 +00:00
s.recoverC <- struct{}{}
case <-s.recoverC:
if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) {
continue
}
if err := syncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
2023-11-07 02:56:19 +00:00
} else {
lastRecoverTime = time.Now()
}
}
}
}
func isMaxExchange(ex interface{}) bool {
_, yes := ex.(*max.Exchange)
return yes
}
func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders")
2023-10-09 09:09:41 +00:00
2023-10-30 09:17:36 +00:00
// only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice
syncBefore := time.Now().Add(-3 * time.Minute)
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, opts.exchange, opts.activeOrderBook.Symbol)
if err != nil {
opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time")
}
2023-10-17 07:20:28 +00:00
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders)))
}
2023-10-11 09:36:18 +00:00
activeOrders := opts.activeOrderBook.Orders()
2023-10-02 03:56:15 +00:00
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrdersMap[openOrder.OrderID] = openOrder
2023-10-02 03:56:15 +00:00
}
var errs error
2023-10-02 03:56:15 +00:00
// update active orders not in open orders
for _, activeOrder := range activeOrders {
2023-10-09 09:10:11 +00:00
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
2023-10-09 09:10:11 +00:00
delete(openOrdersMap, activeOrder.OrderID)
} else {
2023-11-07 09:00:29 +00:00
opts.logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
2023-11-07 09:00:29 +00:00
isActiveOrderBookUpdated, err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID, syncBefore)
if err != nil {
opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
2023-11-07 09:00:29 +00:00
continue
}
if !isActiveOrderBookUpdated {
opts.logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID)
2023-10-02 03:56:15 +00:00
}
}
}
// update open orders not in active orders
for _, openOrder := range openOrdersMap {
opts.logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID)
// we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice.
2023-10-30 09:17:36 +00:00
if openOrder.UpdateTime.After(syncBefore) {
opts.logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID)
continue
}
opts.activeOrderBook.Add(openOrder)
// opts.activeOrderBook.Update(openOrder)
2023-10-02 03:56:15 +00:00
}
return errs
2023-10-02 03:56:15 +00:00
}