Merge pull request #1313 from c9s/fix/grid2-active-orders

FIX: [grid2] only do active order update when grid is recovered
This commit is contained in:
c9s 2023-09-25 17:53:07 +08:00 committed by GitHub
commit 8f40478c74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -9,6 +9,7 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/google/uuid" "github.com/google/uuid"
@ -205,6 +206,8 @@ type Strategy struct {
tradingCtx, writeCtx context.Context tradingCtx, writeCtx context.Context
cancelWrite context.CancelFunc cancelWrite context.CancelFunc
recovered int32
// this ensures that bbgo.Sync to lock the object // this ensures that bbgo.Sync to lock the object
sync.Mutex sync.Mutex
} }
@ -1982,11 +1985,16 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}) })
} }
session.UserDataStream.OnConnect(func() { session.UserDataStream.OnAuth(func() {
if !bbgo.IsBackTesting { if !bbgo.IsBackTesting {
// callback may block the stream execution, so we spawn the recover function to the background // callback may block the stream execution, so we spawn the recover function to the background
// add (5 seconds + random <10 seconds jitter) delay // add (5 seconds + random <10 seconds jitter) delay
go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() {
recovered := atomic.LoadInt32(&s.recovered)
if recovered == 0 {
return
}
s.recoverActiveOrders(ctx, session) s.recoverActiveOrders(ctx, session)
}) })
} }
@ -2016,6 +2024,10 @@ func (s *Strategy) startProcess(ctx context.Context, session *bbgo.ExchangeSessi
} }
func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) recoverGrid(ctx context.Context, session *bbgo.ExchangeSession) error {
defer func() {
atomic.AddInt32(&s.recovered, 1)
}()
if s.RecoverGridByScanningTrades { if s.RecoverGridByScanningTrades {
s.debugLog("recovering grid by scanning trades") s.debugLog("recovering grid by scanning trades")
return s.recoverByScanningTrades(ctx, session) return s.recoverByScanningTrades(ctx, session)
@ -2156,8 +2168,8 @@ func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.Exchan
} }
s.logger.Infof("found %d active orders to update...", len(activeOrders)) s.logger.Infof("found %d active orders to update...", len(activeOrders))
for _, o := range activeOrders { for i, o := range activeOrders {
s.logger.Infof("updating %d order...", o.OrderID) s.logger.Infof("updating %d/%d order #%d...", i+1, len(activeOrders), o.OrderID)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{ updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, s.orderQueryService, types.OrderQuery{
Symbol: o.Symbol, Symbol: o.Symbol,
@ -2169,6 +2181,7 @@ func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.Exchan
return return
} }
s.logger.Infof("triggering updated order #%d: %s", o.OrderID, o.String())
activeOrderBook.Update(*updatedOrder) activeOrderBook.Update(*updatedOrder)
} }
} }