diff --git a/bbgo/exchange/binance/stream.go b/bbgo/exchange/binance/stream.go index 4603f7244..8d6c60487 100644 --- a/bbgo/exchange/binance/stream.go +++ b/bbgo/exchange/binance/stream.go @@ -95,8 +95,11 @@ func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) er func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { defer close(eventC) - ticker := time.NewTicker(10 * time.Minute) - defer ticker.Stop() + pingTicker := time.NewTicker(1 * time.Minute) + defer pingTicker.Stop() + + keepAliveTicker := time.NewTicker(5 * time.Minute) + defer keepAliveTicker.Stop() for { select { @@ -104,17 +107,17 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { case <-ctx.Done(): return - case <-ticker.C: + case <-keepAliveTicker.C: + err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx) + if err != nil { + log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey)) + } + + case <-pingTicker.C: if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(1*time.Second)); err != nil { log.WithError(err).Error("ping error", err) } - err := s.Client.NewKeepaliveUserStreamService().ListenKey(s.ListenKey).Do(ctx) - if err != nil { - maskKey := s.ListenKey[0:5] - maskKey = maskKey + strings.Repeat("*", len(s.ListenKey)-1-5) - log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskKey) - } default: if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { @@ -229,3 +232,8 @@ func (s *PrivateStream) Close() error { log.Infof("[binance] user data stream closed") return err } + +func maskListenKey(listenKey string) string { + maskKey := listenKey[0:5] + return maskKey + strings.Repeat("*", len(listenKey)-1-5) +}