adjust read timeout

This commit is contained in:
c9s 2021-05-29 20:40:47 +08:00
parent 70284a8c0f
commit d962dbe542

View File

@ -19,6 +19,8 @@ import (
var debugBinanceDepth bool var debugBinanceDepth bool
const readTimeout = 30 * time.Second
func init() { func init() {
// randomize pulling // randomize pulling
rand.Seed(time.Now().UnixNano()) rand.Seed(time.Now().UnixNano())
@ -238,6 +240,10 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
} }
// use the default ping handler // use the default ping handler
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from the connection within a 10 minute period,
// the connection will be disconnected.
// Unsolicited pong frames are allowed.
conn.SetPingHandler(nil) conn.SetPingHandler(nil)
return conn, nil return conn, nil
@ -346,9 +352,9 @@ func (s *Stream) connect(ctx context.Context) error {
log.Infof("websocket connected") log.Infof("websocket connected")
conn.SetReadDeadline(time.Now().Add(15 * time.Second)) conn.SetReadDeadline(time.Now().Add(readTimeout))
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(15 * time.Second)) conn.SetReadDeadline(time.Now().Add(readTimeout))
return nil return nil
}) })
@ -374,7 +380,11 @@ func (s *Stream) ping(ctx context.Context) {
return return
case <-pingTicker.C: case <-pingTicker.C:
if err := s.Conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { s.ConnLock.Lock()
conn := s.Conn
s.ConnLock.Unlock()
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
s.Reconnect() s.Reconnect()
} }
@ -382,13 +392,16 @@ func (s *Stream) ping(ctx context.Context) {
} }
} }
// From Binance
// Keepalive a user data stream to prevent a time out. User data streams will close after 60 minutes.
// It's recommended to send a ping about every 30 minutes.
func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) { func (s *Stream) listenKeyKeepAlive(ctx context.Context, listenKey string) {
keepAliveTicker := time.NewTicker(5 * time.Minute) keepAliveTicker := time.NewTicker(5 * time.Minute)
defer keepAliveTicker.Stop() defer keepAliveTicker.Stop()
// if we exit, we should invalidate the existing listen key // if we exit, we should invalidate the existing listen key
defer func() { defer func() {
log.Info("keepalive worker stopped") log.Debugf("keepalive worker stopped")
if err := s.invalidateListenKey(context.Background(), listenKey); err != nil { if err := s.invalidateListenKey(context.Background(), listenKey); err != nil {
log.WithError(err).Error("invalidate listen key error") log.WithError(err).Error("invalidate listen key error")
} }
@ -426,12 +439,16 @@ func (s *Stream) read(ctx context.Context) {
return return
default: default:
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil {
s.ConnLock.Lock()
conn := s.Conn
s.ConnLock.Unlock()
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := conn.ReadMessage()
if err != nil { if err != nil {
// if it's a network timeout error, we should re-connect // if it's a network timeout error, we should re-connect
switch err := err.(type) { switch err := err.(type) {
@ -539,5 +556,3 @@ func maskListenKey(listenKey string) string {
maskKey := listenKey[0:5] maskKey := listenKey[0:5]
return maskKey + strings.Repeat("*", len(listenKey)-1-5) return maskKey + strings.Repeat("*", len(listenKey)-1-5)
} }
//go:generate callbackgen -type DepthFrame