From 35e0b1d14688e4bd1a03b858e82167f676cb8a3e Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 30 Dec 2021 16:47:39 +0800 Subject: [PATCH] binance: fix binance stream graceful shutdown --- pkg/exchange/binance/stream.go | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index a6a256c66..cfc5084f5 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -50,20 +50,17 @@ var defaultDialer = &websocket.Dialer{ // - the re-connector calls connect() to create the new connection object, go to the 1 step. // When stream.Close() is called, a close message must be written to the websocket connection. -const readTimeout = 3 * time.Minute +const readTimeout = 1 * time.Minute const writeTimeout = 10 * time.Second +const pingInterval = readTimeout / 3 const listenKeyKeepAliveInterval = 10 * time.Minute +const reconnectCoolDownPeriod = 15 * time.Second func init() { debugBinanceDepth, _ = strconv.ParseBool(os.Getenv("DEBUG_BINANCE_DEPTH")) if debugBinanceDepth { log.Info("binance depth debugging is enabled") } - - debugBinanceStream, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM")) - if debugBinanceStream { - log.Level = logrus.DebugLevel - } } type StreamRequest struct { @@ -371,11 +368,13 @@ func (s *Stream) reconnector(ctx context.Context) { case <-s.ReconnectC: log.Warnf("received reconnect signal") - time.Sleep(3 * time.Second) + time.Sleep(reconnectCoolDownPeriod) log.Warnf("re-connecting...") if err := s.connect(ctx); err != nil { - log.WithError(err).Errorf("connect error, try to reconnect again...") + log.WithError(err).Errorf("re-connect error, try to reconnect after %s...", reconnectCoolDownPeriod) + + // re-emit the re-connect signal if error s.Reconnect() } } @@ -424,7 +423,7 @@ func (s *Stream) connect(ctx context.Context) error { } go s.read(s.connCtx, conn) - go s.ping(s.connCtx, conn, readTimeout/3) + go s.ping(s.connCtx, conn, pingInterval) return nil } @@ -453,7 +452,6 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, interval time.D } } - func (s *Stream) read(ctx context.Context, conn *websocket.Conn) { defer func() { // if we failed to read, we need to cancel the context @@ -588,8 +586,9 @@ func (s *Stream) Close() error { return errors.Wrap(err, "websocket write close message error") } - err = conn.Close() - return errors.Wrap(err, "websocket connection close error") + // let the reader close the connection + <-time.After(time.Second) + return nil } func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { @@ -632,7 +631,7 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) } -func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) { +func (s *Stream) closeListenKey(ctx context.Context, listenKey string) (err error) { // should use background context to invalidate the user stream log.Debugf("closing listen key: %s", util.MaskKey(listenKey)) @@ -672,7 +671,7 @@ func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) { // if we exit, we should invalidate the existing listen key defer func() { log.Debugf("keepalive worker stopped") - if err := s.invalidateListenKey(context.Background(), listenKey); err != nil { + if err := s.closeListenKey(context.Background(), listenKey); err != nil { log.WithError(err).Errorf("invalidate listen key error: %v key: %s", err, util.MaskKey(listenKey)) } }()