FEATURE: merge recover logic and run periodically

This commit is contained in:
kbearXD 2024-06-27 16:11:19 +08:00
parent ad5674d9cb
commit 3735499753
5 changed files with 128 additions and 152 deletions

View File

@ -1,98 +0,0 @@
package grid2
import (
"context"
"time"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
func (s *Strategy) initializeRecoverC() bool {
s.mu.Lock()
defer s.mu.Unlock()
isInitialize := false
if s.recoverC == nil {
s.logger.Info("initializing recover channel")
s.recoverC = make(chan struct{}, 1)
} else {
s.logger.Info("recover channel is already initialized, trigger active orders recover")
isInitialize = true
select {
case s.recoverC <- struct{}{}:
s.logger.Info("trigger active orders recover")
default:
s.logger.Info("activeOrdersRecoverC is full")
}
}
return isInitialize
}
func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
// every time we activeOrdersRecoverC receive signal, do active orders recover
if isInitialize := s.initializeRecoverC(); isInitialize {
return
}
// make ticker's interval random in 25 min ~ 35 min
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[ActiveOrderRecover] interval: %s", interval)
metricsLabel := s.newPrometheusLabels()
orderQueryService, ok := s.session.Exchange.(types.ExchangeOrderQueryService)
if !ok {
s.logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService, please check it", s.session.ExchangeName)
return
}
opts := common.SyncActiveOrdersOpts{
Logger: s.logger,
Exchange: s.session.Exchange,
OrderQueryService: orderQueryService,
ActiveOrderBook: s.orderExecutor.ActiveMakerOrders(),
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastRecoverTime time.Time
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.recoverC <- struct{}{}
case <-s.recoverC:
if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) {
continue
}
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.session.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
continue
}
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(metricsLabel).Set(float64(len(openOrders)))
}
opts.OpenOrders = openOrders
if err := common.SyncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
} else {
lastRecoverTime = time.Now()
}
}
}
}

View File

@ -14,26 +14,84 @@ import (
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
var syncWindow = -3 * time.Minute
func (s *Strategy) initializeRecoverC() bool {
s.mu.Lock()
defer s.mu.Unlock()
isInitialize := false
if s.recoverC == nil {
s.logger.Info("[Recover] initializing recover channel")
s.recoverC = make(chan struct{}, 1)
} else {
s.logger.Info("[Recover] recover channel is already initialized, trigger active orders recover")
isInitialize = true
select {
case s.recoverC <- struct{}{}:
s.logger.Info("[Recover] trigger active orders recover")
default:
s.logger.Info("[Recover] activeOrdersRecoverC is full")
}
}
return isInitialize
}
func (s *Strategy) recoverPeriodically(ctx context.Context) {
if isInitialize := s.initializeRecoverC(); isInitialize {
return
}
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[Recover] interval: %s", interval)
ticker := time.NewTicker(interval)
defer ticker.Stop()
var lastRecoverTime time.Time
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.recoverC <- struct{}{}
case <-s.recoverC:
// if we already recovered in 10 min, we should skip to avoid recovering too frequently
if !time.Now().After(lastRecoverTime.Add(10 * time.Minute)) {
continue
}
if err := s.recover(ctx); err != nil {
s.logger.WithError(err).Error("failed to recover")
} else {
lastRecoverTime = time.Now()
}
}
}
}
/*
Background knowledge
1. active orderbook add orders only when receive new order event or call Add/Update method manually
2. active orderbook remove orders only when receive filled/cancelled event or call Remove/Update method manually
As a result
1. at the same twin-order-price, there is order in open orders but not in active orderbook
- not receive new order event
=> add order into active orderbook
2. at the same twin-order-price, there is order in active orderbook but not in open orders
- not receive filled event
=> query the filled order and call Update method
3. at the same twin-order-price, there is no order in open orders and no order in active orderbook
1. at the same twin-order-price, there is no order in open orders and no order in active orderbook
- failed to create the order
=> query the last order from trades to emit filled, and it will submit again
- not receive new order event and the order filled before we find it.
=> query the untracked order (also is the last order) from trades to emit filled and it will submit the reversed order
2. at the same twin-order-price, there is order in open orders but not in active orderbook
- not receive new order event
=> add order into active orderbook
3. at the same twin-order-price, there is order in active orderbook but not in open orders
- not receive filled event
=> query the filled order and call Update method
4. at the same twin-order-price, there are different orders in open orders and active orderbook
- should not happen !!!
=> log error
@ -49,10 +107,11 @@ var syncWindow = -3 * time.Minute
*/
func (s *Strategy) recover(ctx context.Context) error {
s.logger.Info("[Recover] try to recover")
historyService, implemented := s.session.Exchange.(types.ExchangeTradeHistoryService)
// if the exchange doesn't support ExchangeTradeHistoryService, do not run recover
if !implemented {
s.logger.Warn("ExchangeTradeHistoryService is not implemented, can not recover grid")
s.logger.Warn("[Recover] ExchangeTradeHistoryService is not implemented, can not recover grid")
return nil
}
@ -76,28 +135,28 @@ func (s *Strategy) recover(ctx context.Context) error {
})
if err != nil {
return errors.Wrapf(err, "unable to query trades when recovering")
return errors.Wrapf(err, "[Recover] unable to query trades when recovering")
}
if len(trades) == 0 {
s.logger.Info("no open order, no active order, no trade, it's a new strategy so no need to recover")
s.logger.Info("[Recover] no open order, no active order, no trade, it's a new strategy so no need to recover")
return nil
}
}
s.logger.Info("start recovering")
s.logger.Info("[Recover] start recovering")
if s.getGrid() == nil {
s.setGrid(s.newGrid())
}
s.mu.Lock()
defer s.mu.Unlock()
pins := s.getGrid().Pins
syncBefore := time.Now().Add(syncWindow)
s.mu.Lock()
defer s.mu.Unlock()
activeOrdersInTwinOrderBook, err := buildTwinOrderBook(pins, activeOrders)
openOrdersInTwinOrderBook, err := buildTwinOrderBook(pins, openOrders)
@ -113,7 +172,7 @@ func (s *Strategy) recover(ctx context.Context) error {
activeOrder := activeOrdersInTwinOrderBook.GetTwinOrder(v)
openOrder := openOrdersInTwinOrderBook.GetTwinOrder(v)
if activeOrder == nil || openOrder == nil {
return fmt.Errorf("there is no any twin order at this pin, can not recover")
return fmt.Errorf("this pin (%s) is invalid. Please check it.", v.String())
}
var activeOrderID uint64 = 0
@ -126,23 +185,28 @@ func (s *Strategy) recover(ctx context.Context) error {
openOrderID = openOrder.GetOrder().OrderID
}
// case 3
// case 1
if activeOrderID == 0 && openOrderID == 0 {
noTwinOrderPins = append(noTwinOrderPins, v)
continue
}
// case 1
// case 2
if activeOrderID == 0 {
order := openOrder.GetOrder()
s.logger.Infof("[Recover] found open order #%d is not in the active orderbook, adding...", order.OrderID)
activeOrderBook.Add(order)
// also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid
activeOrdersInTwinOrderBook.AddTwinOrder(v, openOrder)
if order.UpdateTime.Before(syncBefore) {
activeOrderBook.Add(order)
// also add open orders into active order's twin orderbook, we will use this active orderbook to recover empty price grid
activeOrdersInTwinOrderBook.AddOrder(order, true)
} else {
s.logger.Infof("[Recover] open order #%d is updated in 3 min, skip adding...", order.OrderID)
}
continue
}
// case 2
// case 3
if openOrderID == 0 {
order := activeOrder.GetOrder()
s.logger.Infof("[Recover] found active order #%d is not in the open orders, updating...", order.OrderID)
@ -155,43 +219,50 @@ func (s *Strategy) recover(ctx context.Context) error {
if !isActiveOrderBookUpdated {
s.logger.Infof("[Recover] active order #%d is updated in 3 min, skip updating...", order.OrderID)
}
continue
}
// case 4
if activeOrderID != openOrderID {
return fmt.Errorf("there are two different orders in the same pin, can not recover")
return fmt.Errorf("[Recover] there are two different orders in the same pin, can not recover")
}
// case 5
// do nothing
}
s.logger.Infof("twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String())
s.logger.Infof("[Recover] twin orderbook after adding open orders\n%s", activeOrdersInTwinOrderBook.String())
s.logger.Infof("[Recover] pins without twin orders: %+v", noTwinOrderPins)
if len(noTwinOrderPins) != 0 {
if err := s.recoverEmptyGridOnTwinOrderBook(ctx, activeOrdersInTwinOrderBook, historyService, s.orderQueryService); err != nil {
s.logger.WithError(err).Error("failed to recover empty grid")
s.logger.WithError(err).Error("[Recover] failed to recover empty grid")
return err
}
s.logger.Infof("twin orderbook after recovering no twin order on grid\n%s", activeOrdersInTwinOrderBook.String())
s.logger.Infof("[Recover] twin orderbook after recovering no twin order on grid\n%s", activeOrdersInTwinOrderBook.String())
if activeOrdersInTwinOrderBook.EmptyTwinOrderSize() > 0 {
return fmt.Errorf("there is still empty grid in twin orderbook")
return fmt.Errorf("[Recover] there is still empty grid in twin orderbook")
}
for _, pin := range noTwinOrderPins {
twinOrder := activeOrdersInTwinOrderBook.GetTwinOrder(pin)
if twinOrder == nil {
return fmt.Errorf("should not get nil twin order after recovering empty grid, check it")
return fmt.Errorf("[Recover] should not get nil twin order after recovering empty grid, check it")
}
if !twinOrder.Exist() {
return fmt.Errorf("should not get empty twin order after recovering empty grid, check it")
return fmt.Errorf("[Recover] should not get empty twin order after recovering empty grid, check it")
}
filledOrder := twinOrder.GetOrder()
s.logger.Infof("[Recover] find filled order #%d (status: %s)", filledOrder.OrderID, filledOrder.Status)
if filledOrder.Status != types.OrderStatusFilled {
return fmt.Errorf("[Recover] should not get non-filled status, check it")
}
s.logger.Infof("[Recover] emit filled order %s", filledOrder)
activeOrderBook.EmitFilled(twinOrder.GetOrder())
time.Sleep(100 * time.Millisecond)
@ -216,7 +287,7 @@ func (s *Strategy) recoverEmptyGridOnTwinOrderBook(
queryOrderService types.ExchangeOrderQueryService,
) error {
if twinOrderBook.EmptyTwinOrderSize() == 0 {
s.logger.Info("no empty grid")
s.logger.Info("[Recover] no empty grid")
return nil
}
@ -233,24 +304,24 @@ func (s *Strategy) recoverEmptyGridOnTwinOrderBook(
for {
if err := queryTradesToUpdateTwinOrderBook(ctx, s.Symbol, twinOrderBook, queryTradesService, queryOrderService, existedOrders, since, until, s.debugLog); err != nil {
return errors.Wrapf(err, "failed to query trades to update twin orderbook")
return errors.Wrapf(err, "[Recover] failed to query trades to update twin orderbook")
}
until = since
since = until.Add(-6 * time.Hour)
if twinOrderBook.EmptyTwinOrderSize() == 0 {
s.logger.Infof("stop querying trades because there is no empty twin order on twin orderbook")
s.logger.Infof("[Recover] stop querying trades because there is no empty twin order on twin orderbook")
break
}
if s.GridProfitStats != nil && s.GridProfitStats.Since != nil && until.Before(*s.GridProfitStats.Since) {
s.logger.Infof("stop querying trades because the time range is out of the strategy's since (%s)", *s.GridProfitStats.Since)
s.logger.Infof("[Recover] stop querying trades because the time range is out of the strategy's since (%s)", *s.GridProfitStats.Since)
break
}
if until.Before(recoverSinceLimit) {
s.logger.Infof("stop querying trades because the time range is out of the limit (%s)", recoverSinceLimit)
s.logger.Infof("[Recover] stop querying trades because the time range is out of the limit (%s)", recoverSinceLimit)
break
}
}
@ -262,7 +333,7 @@ func buildTwinOrderBook(pins []Pin, orders []types.Order) (*TwinOrderBook, error
book := newTwinOrderBook(pins)
for _, order := range orders {
if err := book.AddOrder(order); err != nil {
if err := book.AddOrder(order, true); err != nil {
return nil, err
}
}
@ -311,7 +382,7 @@ func queryTradesToUpdateTwinOrderBook(
logger func(format string, args ...interface{}),
) error {
if twinOrderBook == nil {
return fmt.Errorf("twin orderbook should not be nil, please check it")
return fmt.Errorf("[Recover] twin orderbook should not be nil, please check it")
}
var fromTradeID uint64 = 0
@ -325,11 +396,11 @@ func queryTradesToUpdateTwinOrderBook(
})
if err != nil {
return errors.Wrapf(err, "failed to query trades to recover the grid")
return errors.Wrapf(err, "[Recover] failed to query trades to recover the grid")
}
if logger != nil {
logger("QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades))
logger("[Recover] QueryTrades from %s <-> %s (from: %d) return %d trades", since, until, fromTradeID, len(trades))
}
for _, trade := range trades {
@ -338,7 +409,7 @@ func queryTradesToUpdateTwinOrderBook(
}
if logger != nil {
logger(trade.String())
logger("[Recover] " + trade.String())
}
if existedOrders.Exists(trade.OrderID) {
@ -351,19 +422,19 @@ func queryTradesToUpdateTwinOrderBook(
})
if err != nil {
return errors.Wrapf(err, "failed to query order by trade (trade id: %d, order id: %d)", trade.ID, trade.OrderID)
return errors.Wrapf(err, "[Recover] failed to query order by trade (trade id: %d, order id: %d)", trade.ID, trade.OrderID)
}
if logger != nil {
logger(order.String())
logger("[Recover] " + order.String())
}
// avoid query this order again
existedOrders.Add(*order)
// add 1 to avoid duplicate
fromTradeID = trade.ID + 1
if err := twinOrderBook.AddOrder(*order); err != nil {
return errors.Wrapf(err, "failed to add queried order into twin orderbook")
if err := twinOrderBook.AddOrder(*order, true); err != nil {
return errors.Wrapf(err, "[Recover] failed to add queried order into twin orderbook: %s", order.String())
}
}

View File

@ -1993,7 +1993,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
return
}
s.recoverActiveOrdersPeriodically(ctx)
s.recoverPeriodically(ctx)
})
} else {
s.startProcess(ctx, session)
@ -2008,12 +2008,14 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
if s.RecoverOrdersWhenStart {
// do recover only when triggerPrice is not set and not in the back-test mode
s.logger.Infof("recoverWhenStart is set, trying to recover grid orders...")
if err := s.recoverGrid(ctx, session); err != nil {
if err := s.recover(ctx); err != nil {
// if recover fail, return and do not open grid
s.logger.WithError(err).Error("failed to start process, recover error")
s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error"))
return err
}
s.EmitGridReady()
}
// avoid using goroutine here for back-test

View File

@ -175,6 +175,9 @@ func (b *TwinOrderBook) String() string {
sb.WriteString("================== TWIN ORDERBOOK ==================\n")
for _, pin := range b.pins {
twin := b.m[fixedpoint.Value(pin)]
if twin == nil {
continue
}
twinOrder := twin.GetOrder()
sb.WriteString(fmt.Sprintf("-> %8s) %s\n", pin, twinOrder.String()))
}
@ -217,7 +220,7 @@ func (b *TwinOrderBook) GetTwinOrderPin(order types.Order) (fixedpoint.Value, er
return b.pins[idx], nil
}
func (b *TwinOrderBook) AddOrder(order types.Order) error {
func (b *TwinOrderBook) AddOrder(order types.Order, checkUpdateTime bool) error {
b.mu.Lock()
defer b.mu.Unlock()
@ -238,7 +241,12 @@ func (b *TwinOrderBook) AddOrder(order types.Order) error {
// Exist == false means there is no twin order on this pin
if !twinOrder.Exist() {
b.size++
} else {
if checkUpdateTime && twinOrder.GetOrder().UpdateTime.After(order.UpdateTime.Time()) {
return nil
}
}
if b.size >= len(b.pins) {
return fmt.Errorf("the maximum size of twin orderbook is len(pins) - 1, need to check it")
}
@ -251,13 +259,6 @@ func (b *TwinOrderBook) GetTwinOrder(pin fixedpoint.Value) *TwinOrder {
return b.m[pin]
}
func (b *TwinOrderBook) AddTwinOrder(pin fixedpoint.Value, order *TwinOrder) {
b.mu.Lock()
defer b.mu.Unlock()
b.m[pin] = order
}
// Size is the valid twin order on grid.
func (b *TwinOrderBook) Size() int {
return b.size

View File

@ -53,7 +53,7 @@ func TestTwinOrderBook(t *testing.T) {
}
for _, order := range orders {
assert.NoError(book.AddOrder(order))
assert.NoError(book.AddOrder(order, false))
}
assert.Equal(2, book.Size())
assert.Equal(2, book.EmptyTwinOrderSize())