remove checker

This commit is contained in:
chiahung 2023-10-02 11:56:15 +08:00
parent ca80bdb282
commit 4c9b1e78fe
3 changed files with 95 additions and 110 deletions

View File

@ -0,0 +1,94 @@
package grid2
import (
"context"
"strconv"
"sync/atomic"
"time"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
func (s *Strategy) recoverActiveOrdersWithOpenOrdersPeriodically(ctx context.Context) {
// sleep for a while to wait for recovered
time.Sleep(util.MillisecondsJitter(5*time.Second, 1000*10))
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
} else {
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
}
ticker := time.NewTicker(util.MillisecondsJitter(40*time.Minute, 30*60*1000))
defer ticker.Stop()
for {
select {
case <-ticker.C:
if openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, s.session.Exchange, s.Symbol); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
} else {
if err := s.recoverActiveOrdersWithOpenOrders(ctx, openOrders); err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
}
case <-ctx.Done():
return
}
}
}
func (s *Strategy) recoverActiveOrdersWithOpenOrders(ctx context.Context, openOrders []types.Order) error {
recovered := atomic.LoadInt32(&s.recovered)
if recovered == 0 {
s.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready")
return nil
}
s.logger.Infof("[ActiveOrderRecover] recovering active orders with open orders")
if s.getGrid() == nil {
return nil
}
s.mu.Lock()
defer s.mu.Unlock()
activeOrderBook := s.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrders[openOrder.OrderID] = openOrder
}
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; !exist {
s.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID)
delete(openOrdersMap, activeOrder.OrderID)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{
Symbol: activeOrder.Symbol,
OrderID: strconv.FormatUint(activeOrder.OrderID, 10),
})
if err != nil {
s.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order (%d)", activeOrder.OrderID)
continue
}
activeOrderBook.Update(*updatedOrder)
}
}
// update open orders not in active orders
for _, openOrders := range openOrdersMap {
activeOrderBook.Update(openOrders)
}
return nil
}

View File

@ -1,105 +0,0 @@
package grid2
import (
"context"
"strconv"
"sync/atomic"
"time"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
)
type ActiveOrderRecover struct {
strategy *Strategy
interval time.Duration
}
func NewActiveOrderRecover(strategy *Strategy, interval time.Duration) *ActiveOrderRecover {
return &ActiveOrderRecover{
strategy: strategy,
interval: interval,
}
}
func (c *ActiveOrderRecover) Run(ctx context.Context) {
// sleep for a while to wait for recovered
time.Sleep(util.MillisecondsJitter(5*time.Second, 1000*10))
if err := c.recover(ctx); err != nil {
c.strategy.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
ticker := time.NewTicker(util.MillisecondsJitter(c.interval, 30*60*1000))
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.recover(ctx); err != nil {
c.strategy.logger.WithError(err).Error("[ActiveOrderRecover] failed to recover avtive orderbook")
}
case <-ctx.Done():
return
}
}
}
func (c *ActiveOrderRecover) recover(ctx context.Context) error {
recovered := atomic.LoadInt32(&c.strategy.recovered)
if recovered == 0 {
c.strategy.logger.Infof("[ActiveOrderRecover] skip recovering active orders because recover not ready")
return nil
}
c.strategy.logger.Infof("[ActiveOrderRecover] recovering active orders with open orders")
if c.strategy.getGrid() == nil {
return nil
}
c.strategy.mu.Lock()
defer c.strategy.mu.Unlock()
openOrders, err := c.strategy.session.Exchange.QueryOpenOrders(ctx, c.strategy.Symbol)
if err != nil {
return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders")
}
activeOrderBook := c.strategy.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrders[openOrder.OrderID] = openOrder
}
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; !exist {
c.strategy.logger.Infof("find active order (%d) not in open orders, updating...", activeOrder.OrderID)
delete(openOrdersMap, activeOrder.OrderID)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, c.strategy.orderQueryService, types.OrderQuery{
Symbol: activeOrder.Symbol,
OrderID: strconv.FormatUint(activeOrder.OrderID, 10),
})
if err != nil {
c.strategy.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order (%d)", activeOrder.OrderID)
continue
}
activeOrderBook.Update(*updatedOrder)
}
}
// update open orders not in active orders
for _, openOrders := range openOrdersMap {
activeOrderBook.Update(openOrders)
}
return nil
}

View File

@ -171,9 +171,6 @@ type Strategy struct {
RecoverGridByScanningTrades bool `json:"recoverGridByScanningTrades"` RecoverGridByScanningTrades bool `json:"recoverGridByScanningTrades"`
RecoverGridWithin time.Duration `json:"recoverGridWithin"` RecoverGridWithin time.Duration `json:"recoverGridWithin"`
// activeOrderRecover periodically check the open orders is the same as active orderbook and recover it
activeOrderRecover *ActiveOrderRecover
EnableProfitFixer bool `json:"enableProfitFixer"` EnableProfitFixer bool `json:"enableProfitFixer"`
FixProfitSince *types.Time `json:"fixProfitSince"` FixProfitSince *types.Time `json:"fixProfitSince"`
@ -1989,10 +1986,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}) })
} }
s.activeOrderRecover = NewActiveOrderRecover(s, 40*time.Minute)
session.UserDataStream.OnAuth(func() { session.UserDataStream.OnAuth(func() {
if !bbgo.IsBackTesting { if !bbgo.IsBackTesting {
go s.activeOrderRecover.Run(ctx) go s.recoverActiveOrdersWithOpenOrdersPeriodically(ctx)
/* /*
// callback may block the stream execution, so we spawn the recover function to the background // callback may block the stream execution, so we spawn the recover function to the background
// add (5 seconds + random <10 seconds jitter) delay // add (5 seconds + random <10 seconds jitter) delay