FIX: fix some error and use chan to trigger active orders recover when on auth

This commit is contained in:
chiahung 2023-10-06 18:04:57 +08:00
parent 4c9b1e78fe
commit 27294ac9b6
2 changed files with 46 additions and 43 deletions

View File

@ -12,36 +12,37 @@ import (
) )
func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Context) { func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Context) {
// sleep for a while to wait for recovered // every time we activeOrdersRecoverCh receive signal, do active orders recover
time.Sleep(util.MillisecondsJitter(5*time.Second, 1000*10)) s.activeOrdersRecoverCh = make(chan struct{}, 1)
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil { // make ticker's interval random in 40 min ~ 70 min
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") interval := util.MillisecondsJitter(40*time.Minute, 30*60*1000)
} else { s.logger.Infof("[ActiveOrderRecover] interval: %s", interval)
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil { ticker := time.NewTicker(interval)
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
}
ticker := time.NewTicker(util.MillisecondsJitter(40*time.Minute, 30*60*1000))
defer ticker.Stop() defer ticker.Stop()
for { for {
select { select {
case <-ticker.C: case <-ticker.C:
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil { s.queryOpenOrdersThenRecoverActiveOrders(ctx)
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time") case <-s.activeOrdersRecoverCh:
} else { s.queryOpenOrdersThenRecoverActiveOrders(ctx)
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
}
case <-ctx.Done(): case <-ctx.Done():
return return
} }
} }
} }
func (s *Strategy) queryOpenOrdersThenRecoverActiveOrders(ctx context.Context) {
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
} else {
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
}
}
func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOrders []types.Order) error { func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOrders []types.Order) error {
recovered := atomic.LoadInt32(&s.recovered) recovered := atomic.LoadInt32(&s.recovered)
if recovered == 0 { if recovered == 0 {
@ -63,14 +64,13 @@ func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOr
openOrdersMap := make(map[uint64]types.Order) openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders { for _, openOrder := range openOrders {
openOrders[openOrder.OrderID] = openOrder openOrdersMap[openOrder.OrderID] = openOrder
} }
// update active orders not in open orders // update active orders not in open orders
for _, activeOrder := range activeOrders { for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; !exist { if _, exist := openOrdersMap[activeOrder.OrderID]; !exist {
s.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID) s.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID)
delete(openOrdersMap, activeOrder.OrderID)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{ updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{
Symbol: activeOrder.Symbol, Symbol: activeOrder.Symbol,
OrderID: strconv.FormatUint(activeOrder.OrderID, 10), OrderID: strconv.FormatUint(activeOrder.OrderID, 10),
@ -82,12 +82,15 @@ func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOr
} }
activeOrderBook.Update(*updatedOrder) activeOrderBook.Update(*updatedOrder)
} else {
delete(openOrdersMap, activeOrder.OrderID)
} }
} }
// TODO: should we add open orders back into active orderbook ?
// update open orders not in active orders // update open orders not in active orders
for _, openOrders := range openOrdersMap { for _, openOrder := range openOrdersMap {
activeOrderBook.Update(openOrders) activeOrderBook.Update(openOrder)
} }
return nil return nil

View File

@ -206,7 +206,8 @@ type Strategy struct {
tradingCtx, writeCtx context.Context tradingCtx, writeCtx context.Context
cancelWrite context.CancelFunc cancelWrite context.CancelFunc
recovered int32 recovered int32
activeOrdersRecoverCh chan struct{}
// this ensures that bbgo.Sync to lock the object // this ensures that bbgo.Sync to lock the object
sync.Mutex sync.Mutex
@ -1977,8 +1978,12 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.logger.Infof("user data stream started, initializing grid...") s.logger.Infof("user data stream started, initializing grid...")
if !bbgo.IsBackTesting { if !bbgo.IsBackTesting {
go time.AfterFunc(3*time.Second, func() { time.AfterFunc(3*time.Second, func() {
s.startProcess(ctx, session) if err := s.startProcess(ctx, session); err != nil {
return
}
s.recoverActiveOrdersWithOpenOrdersPeriodically(ctx)
}) })
} else { } else {
s.startProcess(ctx, session) s.startProcess(ctx, session)
@ -1987,27 +1992,20 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
} }
session.UserDataStream.OnAuth(func() { session.UserDataStream.OnAuth(func() {
if !bbgo.IsBackTesting { time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() {
go s.recoverActiveOrdersWithOpenOrdersPeriodically(ctx) select {
/* case s.activeOrdersRecoverCh <- struct{}{}:
// callback may block the stream execution, so we spawn the recover function to the background s.logger.Info("trigger active orders recover when on auth")
// add (5 seconds + random <10 seconds jitter) delay default:
go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { s.logger.Warn("failed to trigger active orders recover when on auth")
recovered := atomic.LoadInt32(&s.recovered) }
if recovered == 0 { })
return
}
s.recoverActiveOrders(ctx, session)
})
*/
}
}) })
return nil return nil
} }
func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSession) { func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSession) error {
s.debugGridProfitStats("startProcess") s.debugGridProfitStats("startProcess")
if s.RecoverOrdersWhenStart { if s.RecoverOrdersWhenStart {
// do recover only when triggerPrice is not set and not in the back-test mode // do recover only when triggerPrice is not set and not in the back-test mode
@ -2016,15 +2014,17 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
// if recover fail, return and do not open grid // if recover fail, return and do not open grid
s.logger.WithError(err).Error("failed to start process, recover error") s.logger.WithError(err).Error("failed to start process, recover error")
s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error")) s.EmitGridError(errors.Wrapf(err, "failed to start process, recover error"))
return return err
} }
} }
// avoid using goroutine here for back-test // avoid using goroutine here for back-test
if err := s.openGrid(ctx, session); err != nil { if err := s.openGrid(ctx, session); err != nil {
s.EmitGridError(errors.Wrapf(err, "failed to start process, setup grid orders error")) s.EmitGridError(errors.Wrapf(err, "failed to start process, setup grid orders error"))
return return err
} }
return nil
} }
func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error {