diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index df1c12d0e..8f5364350 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -326,6 +326,9 @@ func (s *Stream) connect(ctx context.Context) error { log.Infof("listen key is created: %s", MaskKey(listenKey)) } + // should only start one connection one time, so we lock the mutex + s.ConnLock.Lock() + // when in public mode, the listen key is an empty string conn, err := s.dial(listenKey) if err != nil { @@ -334,8 +337,6 @@ func (s *Stream) connect(ctx context.Context) error { log.Infof("websocket connected") - // should only start one connection one time, so we lock the mutex - s.ConnLock.Lock() // ensure the previous context is cancelled if s.connCancel != nil { diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 39329fd2c..221ce9d21 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -241,6 +241,9 @@ func (s *Stream) connect(ctx context.Context) error { url = okexapi.PrivateWebSocketURL } + // should only start one connection one time, so we lock the mutex + s.connLock.Lock() + conn, err := s.StandardStream.Dial(url) if err != nil { return err @@ -248,8 +251,6 @@ func (s *Stream) connect(ctx context.Context) error { log.Infof("websocket connected: %s", url) - // should only start one connection one time, so we lock the mutex - s.connLock.Lock() // ensure the previous context is cancelled if s.connCancel != nil {