binance: fix binance stream graceful shutdown

This commit is contained in:
c9s 2021-12-30 16:47:39 +08:00
parent ff87fb007e
commit 35e0b1d146

View File

@ -50,20 +50,17 @@ var defaultDialer = &websocket.Dialer{
// - the re-connector calls connect() to create the new connection object, go to the 1 step. // - the re-connector calls connect() to create the new connection object, go to the 1 step.
// When stream.Close() is called, a close message must be written to the websocket connection. // When stream.Close() is called, a close message must be written to the websocket connection.
const readTimeout = 3 * time.Minute const readTimeout = 1 * time.Minute
const writeTimeout = 10 * time.Second const writeTimeout = 10 * time.Second
const pingInterval = readTimeout / 3
const listenKeyKeepAliveInterval = 10 * time.Minute const listenKeyKeepAliveInterval = 10 * time.Minute
const reconnectCoolDownPeriod = 15 * time.Second
func init() { func init() {
debugBinanceDepth, _ = strconv.ParseBool(os.Getenv("DEBUG_BINANCE_DEPTH")) debugBinanceDepth, _ = strconv.ParseBool(os.Getenv("DEBUG_BINANCE_DEPTH"))
if debugBinanceDepth { if debugBinanceDepth {
log.Info("binance depth debugging is enabled") log.Info("binance depth debugging is enabled")
} }
debugBinanceStream, _ := strconv.ParseBool(os.Getenv("DEBUG_BINANCE_STREAM"))
if debugBinanceStream {
log.Level = logrus.DebugLevel
}
} }
type StreamRequest struct { type StreamRequest struct {
@ -371,11 +368,13 @@ func (s *Stream) reconnector(ctx context.Context) {
case <-s.ReconnectC: case <-s.ReconnectC:
log.Warnf("received reconnect signal") log.Warnf("received reconnect signal")
time.Sleep(3 * time.Second) time.Sleep(reconnectCoolDownPeriod)
log.Warnf("re-connecting...") log.Warnf("re-connecting...")
if err := s.connect(ctx); err != nil { if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...") log.WithError(err).Errorf("re-connect error, try to reconnect after %s...", reconnectCoolDownPeriod)
// re-emit the re-connect signal if error
s.Reconnect() s.Reconnect()
} }
} }
@ -424,7 +423,7 @@ func (s *Stream) connect(ctx context.Context) error {
} }
go s.read(s.connCtx, conn) go s.read(s.connCtx, conn)
go s.ping(s.connCtx, conn, readTimeout/3) go s.ping(s.connCtx, conn, pingInterval)
return nil return nil
} }
@ -453,7 +452,6 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, interval time.D
} }
} }
func (s *Stream) read(ctx context.Context, conn *websocket.Conn) { func (s *Stream) read(ctx context.Context, conn *websocket.Conn) {
defer func() { defer func() {
// if we failed to read, we need to cancel the context // if we failed to read, we need to cancel the context
@ -588,8 +586,9 @@ func (s *Stream) Close() error {
return errors.Wrap(err, "websocket write close message error") return errors.Wrap(err, "websocket write close message error")
} }
err = conn.Close() // let the reader close the connection
return errors.Wrap(err, "websocket connection close error") <-time.After(time.Second)
return nil
} }
func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
@ -632,7 +631,7 @@ func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error
return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx) return s.Client.NewKeepaliveUserStreamService().ListenKey(listenKey).Do(ctx)
} }
func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err error) { func (s *Stream) closeListenKey(ctx context.Context, listenKey string) (err error) {
// should use background context to invalidate the user stream // should use background context to invalidate the user stream
log.Debugf("closing listen key: %s", util.MaskKey(listenKey)) log.Debugf("closing listen key: %s", util.MaskKey(listenKey))
@ -672,7 +671,7 @@ func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
// if we exit, we should invalidate the existing listen key // if we exit, we should invalidate the existing listen key
defer func() { defer func() {
log.Debugf("keepalive worker stopped") log.Debugf("keepalive worker stopped")
if err := s.invalidateListenKey(context.Background(), listenKey); err != nil { if err := s.closeListenKey(context.Background(), listenKey); err != nil {
log.WithError(err).Errorf("invalidate listen key error: %v key: %s", err, util.MaskKey(listenKey)) log.WithError(err).Errorf("invalidate listen key error: %v key: %s", err, util.MaskKey(listenKey))
} }
}() }()