all: handle AllAuthedC timeout from the caller
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

This commit is contained in:
c9s 2024-11-15 02:03:22 +08:00
parent e5c57bf8ba
commit e379e4c97d
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
4 changed files with 16 additions and 14 deletions

View File

@ -620,7 +620,9 @@ func (s *Strategy) CrossRun(
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-connGroup.AllAuthedC(ctx, time.Minute): case <-time.After(3 * time.Minute):
log.Panicf("authentication timeout, exiting...")
case <-connGroup.AllAuthedC(ctx):
} }
log.Infof("user data stream authenticated, start placing orders...") log.Infof("user data stream authenticated, start placing orders...")

View File

@ -1844,7 +1844,11 @@ func (s *Strategy) CrossRun(
s.logger.Infof("waiting for authentication connections to be ready...") s.logger.Infof("waiting for authentication connections to be ready...")
select { select {
case <-ctx.Done(): case <-ctx.Done():
case <-s.connectivityGroup.AllAuthedC(ctx, 3*time.Minute):
case <-time.After(3 * time.Minute):
s.logger.Panicf("authentication timeout, exiting...")
case <-s.connectivityGroup.AllAuthedC(ctx):
} }
s.logger.Infof("all user data streams are connected, starting workers...") s.logger.Infof("all user data streams are connected, starting workers...")

View File

@ -194,16 +194,12 @@ func (g *ConnectivityGroup) AnyDisconnected(ctx context.Context) bool {
return false return false
} }
func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}, allTimeoutDuration time.Duration) { func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{}) {
allTimeout := time.After(allTimeoutDuration)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-allTimeout:
return
default: default:
state := g.GetState() state := g.GetState()
if state == ConnectivityStateAuthed { if state == ConnectivityStateAuthed {
@ -219,8 +215,8 @@ func (g *ConnectivityGroup) waitAllAuthed(ctx context.Context, c chan struct{},
// AllAuthedC returns a channel that will be closed when all connections are authenticated // AllAuthedC returns a channel that will be closed when all connections are authenticated
// the returned channel will be closed when all connections are authenticated // the returned channel will be closed when all connections are authenticated
// and the channel can only be used once (because we can't close a channel twice) // and the channel can only be used once (because we can't close a channel twice)
func (g *ConnectivityGroup) AllAuthedC(ctx context.Context, timeout time.Duration) <-chan struct{} { func (g *ConnectivityGroup) AllAuthedC(ctx context.Context) <-chan struct{} {
c := make(chan struct{}) c := make(chan struct{})
go g.waitAllAuthed(ctx, c, timeout) go g.waitAllAuthed(ctx, c)
return c return c
} }

View File

@ -16,7 +16,7 @@ func TestConnectivityGroupAuthC(t *testing.T) {
conn1 := NewConnectivity() conn1 := NewConnectivity()
conn2 := NewConnectivity() conn2 := NewConnectivity()
group := NewConnectivityGroup(conn1, conn2) group := NewConnectivityGroup(conn1, conn2)
allAuthedC := group.AllAuthedC(ctx, time.Second) allAuthedC := group.AllAuthedC(ctx)
time.Sleep(delay) time.Sleep(delay)
conn1.setConnect() conn1.setConnect()
@ -124,10 +124,10 @@ func TestConnectivityGroup(t *testing.T) {
}() }()
authed1 := false authed1 := false
authedC1 := group.AllAuthedC(ctx, 3*time.Second) authedC1 := group.AllAuthedC(ctx)
authed2 := false authed2 := false
authedC2 := group.AllAuthedC(ctx, 3*time.Second) authedC2 := group.AllAuthedC(ctx)
select { select {
case <-authedC1: case <-authedC1:
@ -185,10 +185,10 @@ func TestConnectivityGroup(t *testing.T) {
}() }()
authed1 := false authed1 := false
authedC1 := group1.AllAuthedC(ctx, 3*time.Second) authedC1 := group1.AllAuthedC(ctx)
authed2 := false authed2 := false
authedC2 := group2.AllAuthedC(ctx, 3*time.Second) authedC2 := group2.AllAuthedC(ctx)
select { select {
case <-authedC1: case <-authedC1: