binance: adjust read timeout and increase read buffer size

This commit is contained in:
c9s 2021-06-07 16:57:49 +08:00
parent 3fd170a4ff
commit ec6c10a96a

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"math/rand" "math/rand"
"net" "net"
"net/http"
"os" "os"
"strconv" "strconv"
"strings" "strings"
@ -18,7 +19,24 @@ import (
var debugBinanceDepth bool var debugBinanceDepth bool
const readTimeout = 30 * time.Second var defaultDialer = &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 45 * time.Second,
ReadBufferSize: 4096 * 2,
}
// from Binance document:
// The websocket server will send a ping frame every 3 minutes.
// If the websocket server does not receive a pong frame back from the connection within a 10 minute period, the connection will be disconnected.
// Unsolicited pong frames are allowed.
// WebSocket connections have a limit of 5 incoming messages per second. A message is considered:
// A PING frame
// A PONG frame
// A JSON controlled message (e.g. subscribe, unsubscribe)
const readTimeout = 60 * time.Second
const pongWaitTime = 10 * time.Second
func init() { func init() {
// randomize pulling // randomize pulling
@ -230,7 +248,7 @@ func (s *Stream) dial(listenKey string) (*websocket.Conn, error) {
url = "wss://stream.binance.com:9443/ws/" + listenKey url = "wss://stream.binance.com:9443/ws/" + listenKey
} }
conn, _, err := websocket.DefaultDialer.Dial(url, nil) conn, _, err := defaultDialer.Dial(url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -342,9 +360,10 @@ func (s *Stream) connect(ctx context.Context) error {
// create a new context // create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx) s.connCtx, s.connCancel = context.WithCancel(ctx)
conn.SetReadDeadline(time.Now().Add(readTimeout))
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(readTimeout)) if err := conn.SetReadDeadline(time.Now().Add(readTimeout * 2)); err != nil {
log.WithError(err).Error("pong handler can not set read deadline")
}
return nil return nil
}) })
@ -378,7 +397,7 @@ func (s *Stream) ping(ctx context.Context) {
conn := s.Conn conn := s.Conn
s.ConnLock.Unlock() s.ConnLock.Unlock()
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(pongWaitTime)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
s.Reconnect() s.Reconnect()
} }
@ -453,18 +472,21 @@ func (s *Stream) read(ctx context.Context) {
return return
} }
_ = conn.Close()
// for unexpected close error, we should re-connect // for unexpected close error, we should re-connect
// emit reconnect to start a new connection // emit reconnect to start a new connection
s.Reconnect() s.Reconnect()
return return
case net.Error: case net.Error:
log.WithError(err).Error("network error") log.WithError(err).Error("websocket network error")
_ = conn.Close()
s.Reconnect() s.Reconnect()
return return
default: default:
log.WithError(err).Error("unexpected connection error") log.WithError(err).Error("unexpected connection error")
_ = conn.Close()
s.Reconnect() s.Reconnect()
return return
} }