mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-10 09:11:55 +00:00
split go routine for keep alive and ping tickers
This commit is contained in:
parent
d4774f5f0e
commit
f505c0e2c6
|
@ -192,12 +192,15 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
|
|||
|
||||
func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
|
||||
if s.useMargin {
|
||||
log.Infof("margin mode is enabled, requesting margin user stream listen key...")
|
||||
req := s.Client.NewStartMarginUserStreamService()
|
||||
if s.useMarginIsolated {
|
||||
req.Isolated(s.useMarginIsolatedSymbol)
|
||||
log.Infof("isolated margin %s is enabled, requesting margin user stream listen key...", s.useMarginIsolatedSymbol)
|
||||
req := s.Client.NewStartIsolatedMarginUserStreamService()
|
||||
req.Symbol(s.useMarginIsolatedSymbol)
|
||||
return req.Do(ctx)
|
||||
}
|
||||
|
||||
log.Infof("margin mode is enabled, requesting margin user stream listen key...")
|
||||
req := s.Client.NewStartMarginUserStreamService()
|
||||
return req.Do(ctx)
|
||||
}
|
||||
|
||||
|
@ -206,11 +209,13 @@ func (s *Stream) fetchListenKey(ctx context.Context) (string, error) {
|
|||
|
||||
func (s *Stream) keepaliveListenKey(ctx context.Context, listenKey string) error {
|
||||
if s.useMargin {
|
||||
req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey)
|
||||
if s.useMarginIsolated {
|
||||
req.Isolated(s.useMarginIsolatedSymbol)
|
||||
req := s.Client.NewKeepaliveIsolatedMarginUserStreamService().ListenKey(listenKey)
|
||||
req.Symbol(s.useMarginIsolatedSymbol)
|
||||
return req.Do(ctx)
|
||||
}
|
||||
|
||||
req := s.Client.NewKeepaliveMarginUserStreamService().ListenKey(listenKey)
|
||||
return req.Do(ctx)
|
||||
}
|
||||
|
||||
|
@ -271,32 +276,43 @@ func (s *Stream) Connect(ctx context.Context) error {
|
|||
|
||||
func (s *Stream) read(ctx context.Context) {
|
||||
|
||||
pingTicker := time.NewTicker(1 * time.Minute)
|
||||
pingTicker := time.NewTicker(20 * time.Second)
|
||||
defer pingTicker.Stop()
|
||||
|
||||
keepAliveTicker := time.NewTicker(5 * time.Minute)
|
||||
defer keepAliveTicker.Stop()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-pingTicker.C:
|
||||
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(1*time.Second)); err != nil {
|
||||
log.WithError(err).Error("ping error", err)
|
||||
}
|
||||
|
||||
case <-keepAliveTicker.C:
|
||||
if !s.publicOnly {
|
||||
if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil {
|
||||
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey))
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-keepAliveTicker.C:
|
||||
if !s.publicOnly {
|
||||
if err := s.keepaliveListenKey(ctx, s.ListenKey); err != nil {
|
||||
log.WithError(err).Errorf("listen key keep-alive error: %v key: %s", err, maskListenKey(s.ListenKey))
|
||||
}
|
||||
}
|
||||
|
||||
case <-pingTicker.C:
|
||||
if err := s.Conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(1*time.Second)); err != nil {
|
||||
log.WithError(err).Error("ping error", err)
|
||||
}
|
||||
|
||||
default:
|
||||
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
|
||||
if err := s.Conn.SetReadDeadline(time.Now().Add(1 * time.Minute)); err != nil {
|
||||
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
|
||||
}
|
||||
|
||||
|
@ -374,12 +390,15 @@ func (s *Stream) invalidateListenKey(ctx context.Context, listenKey string) (err
|
|||
log.Info("closing listen key")
|
||||
|
||||
if s.useMargin {
|
||||
req := s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey)
|
||||
if s.useMarginIsolated {
|
||||
req.Isolated(s.useMarginIsolatedSymbol)
|
||||
req := s.Client.NewCloseIsolatedMarginUserStreamService().ListenKey(listenKey)
|
||||
req.Symbol(s.useMarginIsolatedSymbol)
|
||||
err = req.Do(ctx)
|
||||
} else {
|
||||
req := s.Client.NewCloseMarginUserStreamService().ListenKey(listenKey)
|
||||
err = req.Do(ctx)
|
||||
}
|
||||
|
||||
err = req.Do(ctx)
|
||||
} else {
|
||||
err = s.Client.NewCloseUserStreamService().ListenKey(listenKey).Do(ctx)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue
Block a user