From 5ff3828ec1ac57fa99b51cd41eaab279bb090abc Mon Sep 17 00:00:00 2001 From: chiahung Date: Mon, 16 Oct 2023 16:02:43 +0800 Subject: [PATCH] move to onAuth --- pkg/strategy/grid2/active_order_recover.go | 22 +++++++++++++++++++++- pkg/strategy/grid2/strategy.go | 21 ++------------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/pkg/strategy/grid2/active_order_recover.go b/pkg/strategy/grid2/active_order_recover.go index 2d8dab19c..60774d229 100644 --- a/pkg/strategy/grid2/active_order_recover.go +++ b/pkg/strategy/grid2/active_order_recover.go @@ -23,9 +23,29 @@ type SyncActiveOrdersOpts struct { exchange types.Exchange } +func (s *Strategy) initializeRecoverCh() bool { + s.mu.Lock() + defer s.mu.Unlock() + + alreadyInitialize := false + + if s.activeOrdersRecoverCh == nil { + s.logger.Info("initialize recover channel") + s.activeOrdersRecoverCh = make(chan struct{}, 1) + } else { + s.logger.Info("already initialize recover channel, trigger active orders recover") + alreadyInitialize = true + s.activeOrdersRecoverCh <- struct{}{} + } + + return alreadyInitialize +} + func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) { // every time we activeOrdersRecoverCh receive signal, do active orders recover - s.activeOrdersRecoverCh = make(chan struct{}, 1) + if alreadyInitialize := s.initializeRecoverCh(); alreadyInitialize { + return + } // make ticker's interval random in 25 min ~ 35 min interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index cb4ef25a3..8c389cd5b 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1967,14 +1967,8 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. // if TriggerPrice is zero, that means we need to open the grid when start up if s.TriggerPrice.IsZero() { - // must call the openGrid method inside the OnStart callback because - // it needs to receive the trades from the user data stream - // - // should try to avoid blocking the user data stream - // callbacks are blocking operation - session.UserDataStream.OnStart(func() { - s.logger.Infof("user data stream started, initializing grid...") - + session.UserDataStream.OnAuth(func() { + s.logger.Infof("user data stream authenticated, start the process") if !bbgo.IsBackTesting { time.AfterFunc(3*time.Second, func() { if err := s.startProcess(ctx, session); err != nil { @@ -1989,17 +1983,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) } - session.UserDataStream.OnAuth(func() { - time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { - select { - case s.activeOrdersRecoverCh <- struct{}{}: - s.logger.Info("trigger active orders recover when on auth") - default: - s.logger.Warn("failed to trigger active orders recover when on auth") - } - }) - }) - return nil }