adjust keep alive interval

This commit is contained in:
c9s 2021-12-29 17:27:37 +08:00
parent eec699cbc9
commit b637d46c83

View File

@ -25,8 +25,8 @@ var debugBinanceDepth bool
var defaultDialer = &websocket.Dialer{ var defaultDialer = &websocket.Dialer{
Proxy: http.ProxyFromEnvironment, Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second, HandshakeTimeout: 10 * time.Second,
ReadBufferSize: 4096 * 2, ReadBufferSize: 4096,
} }
// from Binance document: // from Binance document:
@ -39,10 +39,8 @@ var defaultDialer = &websocket.Dialer{
// A PONG frame // A PONG frame
// A JSON controlled message (e.g. subscribe, unsubscribe) // A JSON controlled message (e.g. subscribe, unsubscribe)
const readTimeout = 3 * time.Minute const readTimeout = 3 * time.Minute
const writeTimeout = 10 * time.Second
const pongWaitTime = 15 * time.Second const listenKeyKeepAliveInterval = 10 * time.Minute
const listenKeyKeepAliveInterval = 1 * time.Minute
func init() { func init() {
// randomize pulling // randomize pulling
@ -310,6 +308,12 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
// the connection will be disconnected. // the connection will be disconnected.
// Unsolicited pong frames are allowed. // Unsolicited pong frames are allowed.
conn.SetPingHandler(nil) conn.SetPingHandler(nil)
conn.SetPongHandler(func(string) error {
if err := conn.SetReadDeadline(time.Now().Add(readTimeout * 2)); err != nil {
log.WithError(err).Error("pong handler can not set read deadline")
}
return nil
})
return conn, nil return conn, nil
} }
@ -417,17 +421,9 @@ func (s *Stream) connect(ctx context.Context) error {
// create a new context // create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx) s.connCtx, s.connCancel = context.WithCancel(ctx)
conn.SetPongHandler(func(string) error {
log.Infof("pong")
if err := conn.SetReadDeadline(time.Now().Add(readTimeout * 2)); err != nil {
log.WithError(err).Error("pong handler can not set read deadline")
}
return nil
})
s.Conn = conn s.Conn = conn
s.ConnLock.Unlock() s.ConnLock.Unlock()
s.EmitConnect() s.EmitConnect()
if !s.PublicOnly { if !s.PublicOnly {
@ -456,7 +452,7 @@ func (s *Stream) ping(ctx context.Context) {
s.ConnLock.Unlock() s.ConnLock.Unlock()
log.Infof("websocket ping") log.Infof("websocket ping")
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(pongWaitTime)); err != nil { if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
s.Reconnect() s.Reconnect()
} }