mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
grid2: refactor recoverActiveOrdersPeriodically
This commit is contained in:
parent
27294ac9b6
commit
1347c8ef87
|
@ -11,7 +11,7 @@ import (
|
|||
"github.com/c9s/bbgo/pkg/util"
|
||||
)
|
||||
|
||||
func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Context) {
|
||||
func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
|
||||
// every time we activeOrdersRecoverCh receive signal, do active orders recover
|
||||
s.activeOrdersRecoverCh = make(chan struct{}, 1)
|
||||
|
||||
|
@ -23,27 +23,25 @@ func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Con
|
|||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
s.queryOpenOrdersThenRecoverActiveOrders(ctx)
|
||||
case <-s.activeOrdersRecoverCh:
|
||||
s.queryOpenOrdersThenRecoverActiveOrders(ctx)
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
if err := s.syncActiveOrders(ctx); err != nil {
|
||||
log.WithError(err).Errorf("unable to sync active orders")
|
||||
}
|
||||
|
||||
case <-s.activeOrdersRecoverCh:
|
||||
if err := s.syncActiveOrders(ctx); err != nil {
|
||||
log.WithError(err).Errorf("unable to sync active orders")
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Strategy) queryOpenOrdersThenRecoverActiveOrders(ctx context.Context) {
|
||||
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil {
|
||||
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
|
||||
} else {
|
||||
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil {
|
||||
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOrders []types.Order) error {
|
||||
func (s *Strategy) syncActiveOrders(ctx context.Context) error {
|
||||
recovered := atomic.LoadInt32(&s.recovered)
|
||||
if recovered == 0 {
|
||||
s.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready")
|
||||
|
@ -56,6 +54,13 @@ func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOr
|
|||
return nil
|
||||
}
|
||||
|
||||
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol)
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
|
||||
return err
|
||||
}
|
||||
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
|
@ -70,14 +75,16 @@ func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOr
|
|||
// update active orders not in open orders
|
||||
for _, activeOrder := range activeOrders {
|
||||
if _, exist := openOrdersMap[activeOrder.OrderID]; !exist {
|
||||
s.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID)
|
||||
|
||||
s.logger.Infof("found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
|
||||
|
||||
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{
|
||||
Symbol: activeOrder.Symbol,
|
||||
OrderID: strconv.FormatUint(activeOrder.OrderID, 10),
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order (%d)", activeOrder.OrderID)
|
||||
s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
|
||||
continue
|
||||
}
|
||||
|
||||
|
|
|
@ -1983,7 +1983,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
|
|||
return
|
||||
}
|
||||
|
||||
s.recoverActiveOrdersWithOpenOrdersPeriodically(ctx)
|
||||
s.recoverActiveOrdersPeriodically(ctx)
|
||||
})
|
||||
} else {
|
||||
s.startProcess(ctx, session)
|
||||
|
|
Loading…
Reference in New Issue
Block a user