diff --git a/pkg/strategy/xdepthmaker/strategy.go b/pkg/strategy/xdepthmaker/strategy.go index b075b2edb..e18f4ab0b 100644 --- a/pkg/strategy/xdepthmaker/strategy.go +++ b/pkg/strategy/xdepthmaker/strategy.go @@ -620,7 +620,9 @@ func (s *Strategy) CrossRun( select { case <-ctx.Done(): return - case <-connGroup.AllAuthedC(ctx, time.Minute): + case <-time.After(3 * time.Minute): + log.Panicf("authentication timeout, exiting...") + case <-connGroup.AllAuthedC(ctx): } log.Infof("user data stream authenticated, start placing orders...") diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 9a874ee79..a30aba6e4 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -1844,7 +1844,11 @@ func (s *Strategy) CrossRun( s.logger.Infof("waiting for authentication connections to be ready...") select { case <-ctx.Done(): - case <-s.connectivityGroup.AllAuthedC(ctx, 3*time.Minute): + + case <-time.After(3 * time.Minute): + s.logger.Panicf("authentication timeout, exiting...") + + case <-s.connectivityGroup.AllAuthedC(ctx): } s.logger.Infof("all user data streams are connected, starting workers...") diff --git a/pkg/types/connectivitygroup.go b/pkg/types/connectivitygroup.go index c2acd1604..7cc177381 100644 --- a/pkg/types/connectivitygroup.go +++ b/pkg/types/connectivitygroup.go @@ -194,16 +194,12 @@ func (g *ConnectivityGroup) AnyDisconnected(ctx context.Context) bool { return false } -func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}, allTimeoutDuration time.Duration) { - allTimeout := time.After(allTimeoutDuration) +func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}) { for { select { case <-ctx.Done(): return - case <-allTimeout: - return - default: state := g.GetState() if state == ConnectivityStateAuthed { @@ -219,8 +215,8 @@ func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}, // AllAuthedC returns a channel that will be closed when all connections are authenticated // the returned channel will be closed when all connections are authenticated // and the channel can only be used once (because we can't close a channel twice) -func (g *ConnectivityGroup) AllAuthedC(ctx context.Context, timeout time.Duration) <-chan struct{} { +func (g *ConnectivityGroup) AllAuthedC(ctx context.Context) <-chan struct{} { c := make(chan struct{}) - go g.waitAllAuthed(ctx, c, timeout) + go g.waitAllAuthed(ctx, c) return c } diff --git a/pkg/types/connectivitygroup_test.go b/pkg/types/connectivitygroup_test.go index 3400dfad9..ff6005296 100644 --- a/pkg/types/connectivitygroup_test.go +++ b/pkg/types/connectivitygroup_test.go @@ -16,7 +16,7 @@ func TestConnectivityGroupAuthC(t *testing.T) { conn1 := NewConnectivity() conn2 := NewConnectivity() group := NewConnectivityGroup(conn1, conn2) - allAuthedC := group.AllAuthedC(ctx, time.Second) + allAuthedC := group.AllAuthedC(ctx) time.Sleep(delay) conn1.setConnect() @@ -124,10 +124,10 @@ func TestConnectivityGroup(t *testing.T) { }() authed1 := false - authedC1 := group.AllAuthedC(ctx, 3*time.Second) + authedC1 := group.AllAuthedC(ctx) authed2 := false - authedC2 := group.AllAuthedC(ctx, 3*time.Second) + authedC2 := group.AllAuthedC(ctx) select { case <-authedC1: @@ -185,10 +185,10 @@ func TestConnectivityGroup(t *testing.T) { }() authed1 := false - authedC1 := group1.AllAuthedC(ctx, 3*time.Second) + authedC1 := group1.AllAuthedC(ctx) authed2 := false - authedC2 := group2.AllAuthedC(ctx, 3*time.Second) + authedC2 := group2.AllAuthedC(ctx) select { case <-authedC1: