okex: pull read timeout and adjust to 30 seconds

This commit is contained in:
c9s 2021-05-30 00:32:06 +08:00
parent 9a68cfd288
commit 1a05f6fbd4
2 changed files with 13 additions and 7 deletions

View File

@ -369,7 +369,7 @@ func (s *Stream) connect(ctx context.Context) error {
} }
func (s *Stream) ping(ctx context.Context) { func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(5 * time.Second) pingTicker := time.NewTicker(readTimeout / 2)
defer pingTicker.Stop() defer pingTicker.Stop()
for { for {

View File

@ -12,6 +12,8 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
const readTimeout = 15 * time.Second
type WebsocketOp struct { type WebsocketOp struct {
Op string `json:"op"` Op string `json:"op"`
Args interface{} `json:"args"` Args interface{} `json:"args"`
@ -264,9 +266,9 @@ func (s *Stream) connect(ctx context.Context) error {
} }
log.Infof("websocket connected") log.Infof("websocket connected")
conn.SetReadDeadline(time.Now().Add(15 * time.Second)) conn.SetReadDeadline(time.Now().Add(readTimeout))
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(15 * time.Second)) conn.SetReadDeadline(time.Now().Add(readTimeout))
return nil return nil
}) })
@ -295,11 +297,15 @@ func (s *Stream) read(ctx context.Context) {
return return
default: default:
if err := s.Conn.SetReadDeadline(time.Now().Add(30 * time.Second)); err != nil { s.connLock.Lock()
conn := s.Conn
s.connLock.Unlock()
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error()) log.WithError(err).Errorf("set read deadline error: %s", err.Error())
} }
mt, message, err := s.Conn.ReadMessage() mt, message, err := conn.ReadMessage()
if err != nil { if err != nil {
// if it's a network timeout error, we should re-connect // if it's a network timeout error, we should re-connect
switch err := err.(type) { switch err := err.(type) {
@ -361,7 +367,7 @@ func (s *Stream) read(ctx context.Context) {
} }
func (s *Stream) ping(ctx context.Context) { func (s *Stream) ping(ctx context.Context) {
pingTicker := time.NewTicker(5 * time.Second) pingTicker := time.NewTicker(readTimeout / 2)
defer pingTicker.Stop() defer pingTicker.Stop()
for { for {
@ -376,7 +382,7 @@ func (s *Stream) ping(ctx context.Context) {
conn := s.Conn conn := s.Conn
s.connLock.Unlock() s.connLock.Unlock()
if err := conn.WriteControl(websocket.PingMessage, []byte("hb"), time.Now().Add(3*time.Second)); err != nil { if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
s.Reconnect() s.Reconnect()
} }