From 1a3f9ed4b2ec64fafe2872d80dcd43ae7ed84bb8 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 23 Dec 2021 01:54:53 +0800 Subject: [PATCH] kucoin: use returned ping interval instead of default --- doc/development/new-exchange.md | 6 ++++ pkg/exchange/kucoin/kucoinapi/bullet.go | 9 ++++++ pkg/exchange/kucoin/kucoinapi/websocket.go | 2 +- pkg/exchange/kucoin/stream.go | 34 +++++++++++++++------- 4 files changed, 39 insertions(+), 12 deletions(-) diff --git a/doc/development/new-exchange.md b/doc/development/new-exchange.md index d0b6deb4f..c64e12bea 100644 --- a/doc/development/new-exchange.md +++ b/doc/development/new-exchange.md @@ -91,3 +91,9 @@ func NewExchangeStandard(n types.ExchangeName, key, secret, passphrase, subAccou } ``` + +## Testing user data stream + +```shell +go run ./cmd/bbgo --config config/bbgo.yaml userdatastream --session kucoin +``` \ No newline at end of file diff --git a/pkg/exchange/kucoin/kucoinapi/bullet.go b/pkg/exchange/kucoin/kucoinapi/bullet.go index 95f40d68e..8a51bad0f 100644 --- a/pkg/exchange/kucoin/kucoinapi/bullet.go +++ b/pkg/exchange/kucoin/kucoinapi/bullet.go @@ -4,6 +4,7 @@ import ( "context" "net/http" "net/url" + "time" "github.com/c9s/bbgo/pkg/util" "github.com/pkg/errors" @@ -49,6 +50,14 @@ type Bullet struct { Token string `json:"token"` } +func (b *Bullet) PingInterval() time.Duration { + return time.Duration(b.InstanceServers[0].PingInterval) * time.Millisecond +} + +func (b *Bullet) PingTimeout() time.Duration { + return time.Duration(b.InstanceServers[0].PingTimeout) * time.Millisecond +} + func (b *Bullet) URL() (*url.URL, error) { if len(b.InstanceServers) == 0 { return nil, errors.New("InstanceServers is empty") diff --git a/pkg/exchange/kucoin/kucoinapi/websocket.go b/pkg/exchange/kucoin/kucoinapi/websocket.go index 7c9050adb..c6b3ba4df 100644 --- a/pkg/exchange/kucoin/kucoinapi/websocket.go +++ b/pkg/exchange/kucoin/kucoinapi/websocket.go @@ -14,4 +14,4 @@ func (c *WebSocketCommand) JSON() ([]byte, error) { type tt WebSocketCommand var a = (*tt)(c) return json.Marshal(a) -} \ No newline at end of file +} diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index c9d8df2fd..29498ed12 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -12,8 +12,7 @@ import ( "github.com/pkg/errors" ) -const readTimeout = 15 * time.Second -const pingInterval = 18000 * time.Millisecond +const readTimeout = 20 * time.Second type WebsocketOp struct { Op string `json:"op"` @@ -31,13 +30,13 @@ type WebsocketLogin struct { type Stream struct { types.StandardStream - client *kucoinapi.RestClient - conn *websocket.Conn - connLock sync.Mutex + client *kucoinapi.RestClient + conn *websocket.Conn + connLock sync.Mutex connCtx context.Context connCancel context.CancelFunc - bullet *kucoinapi.Bullet + bullet *kucoinapi.Bullet publicOnly bool } @@ -60,7 +59,7 @@ func (s *Stream) sendSubscriptions() error { } for _, cmd := range cmds { - if err := s.conn.WriteJSON(cmd) ; err != nil { + if err := s.conn.WriteJSON(cmd); err != nil { return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd) } } @@ -70,7 +69,7 @@ func (s *Stream) sendSubscriptions() error { func (s *Stream) handleConnect() { if s.publicOnly { - if err := s.sendSubscriptions() ; err != nil { + if err := s.sendSubscriptions(); err != nil { log.WithError(err).Errorf("subscription error") return } @@ -164,9 +163,11 @@ func (s *Stream) connect(ctx context.Context) error { // create a new context s.connCtx, s.connCancel = context.WithCancel(ctx) - conn.SetReadDeadline(time.Now().Add(readTimeout)) + + pingTimeout := s.bullet.PingTimeout() + conn.SetReadDeadline(time.Now().Add(pingTimeout)) conn.SetPongHandler(func(string) error { - conn.SetReadDeadline(time.Now().Add(readTimeout)) + conn.SetReadDeadline(time.Now().Add(pingTimeout)) return nil }) @@ -176,7 +177,7 @@ func (s *Stream) connect(ctx context.Context) error { s.EmitConnect() go s.read(s.connCtx) - go ping(s.connCtx, s, pingInterval) + go ping(s.connCtx, s, s.bullet.PingInterval()) return nil } @@ -268,6 +269,8 @@ type WebSocketConnector interface { } func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) { + log.Infof("starting ping worker with interval %s", interval) + pingTicker := time.NewTicker(interval) defer pingTicker.Stop() @@ -280,6 +283,15 @@ func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) { case <-pingTicker.C: conn := w.Conn() + + if err := conn.WriteJSON(kucoinapi.WebSocketCommand{ + Id: time.Now().UnixMilli(), + Type: "ping", + }); err != nil { + log.WithError(err).Error("websocket ping error", err) + w.Reconnect() + } + if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { log.WithError(err).Error("ping error", err) w.Reconnect()