From f505c0e2c6c71904914a35dcb6d95325ac53abac Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 15 Jan 2021 14:56:11 +0800 Subject: [PATCH] split go routine for keep alive and ping tickers --- pkg/exchange/binance/stream.go | 63 ++++++++++++++++++++++------------ 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index c79592868..4974a2adc 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -192,12 +192,15 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) { func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { if s.useMargin { - log.Infof("margin mode is enabled, requesting margin user stream listen key...") - req := s.Client.NewStartMarginUserStreamService() if s.useMarginIsolated { - req.Isolated(s.useMarginIsolatedSymbol) + log.Infof("isolated margin %s is enabled, requesting margin user stream listen key...", s.useMarginIsolatedSymbol) + req := s.Client.NewStartIsolatedMarginUserStreamService() + req.Symbol(s.useMarginIsolatedSymbol) + return req.Do(ctx) } + log.Infof("margin mode is enabled, requesting margin user stream listen key...") + req := s.Client.NewStartMarginUserStreamService() return req.Do(ctx) } @@ -206,11 +209,13 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) { func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error { if s.useMargin { - req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey) if s.useMarginIsolated { - req.Isolated(s.useMarginIsolatedSymbol) + req := s.Client.NewKeepaliveIsolatedMarginUserStreamService().ListenKey(listenKey) + req.Symbol(s.useMarginIsolatedSymbol) + return req.Do(ctx) } + req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey) return req.Do(ctx) } @@ -271,32 +276,43 @@ func (s *Stream) Connect(ctx context.Context) error { func (s *Stream) read(ctx context.Context) { - pingTicker := time.NewTicker(1 * time.Minute) + pingTicker := time.NewTicker(20 * time.Second) defer pingTicker.Stop() keepAliveTicker := time.NewTicker(5 * time.Minute) defer keepAliveTicker.Stop() + go func() { + for { + select { + + case <-ctx.Done(): + return + + case <-pingTicker.C: + if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(1*time.Second)); err != nil { + log.WithError(err).Error("ping error", err) + } + + case <-keepAliveTicker.C: + if !s.publicOnly { + if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { + log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey)) + } + } + + } + } + }() + for { select { case <-ctx.Done(): return - case <-keepAliveTicker.C: - if !s.publicOnly { - if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil { - log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey)) - } - } - - case <-pingTicker.C: - if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(1*time.Second)); err != nil { - log.WithError(err).Error("ping error", err) - } - default: - if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { + if err := s.Conn.SetReadDeadline(time.Now().Add(1 * time.Minute)); err != nil { log.WithError(err).Errorf("set read deadline error: %s", err.Error()) } @@ -374,12 +390,15 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err log.Info("closing listen key") if s.useMargin { - req := s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey) if s.useMarginIsolated { - req.Isolated(s.useMarginIsolatedSymbol) + req := s.Client.NewCloseIsolatedMarginUserStreamService().ListenKey(listenKey) + req.Symbol(s.useMarginIsolatedSymbol) + err = req.Do(ctx) + } else { + req := s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey) + err = req.Do(ctx) } - err = req.Do(ctx) } else { err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx) }