kucoin: use returned ping interval instead of default

This commit is contained in:
c9s 2021-12-23 01:54:53 +08:00
parent 730ce31e67
commit 1a3f9ed4b2
4 changed files with 39 additions and 12 deletions

View File

@ -91,3 +91,9 @@ func NewExchangeStandard(n types.ExchangeName, key, secret, passphrase, subAccou
} }
``` ```
## Testing user data stream
```shell
go run ./cmd/bbgo --config config/bbgo.yaml userdatastream --session kucoin
```

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"net/http" "net/http"
"net/url" "net/url"
"time"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -49,6 +50,14 @@ type Bullet struct {
Token string `json:"token"` Token string `json:"token"`
} }
func (b *Bullet) PingInterval() time.Duration {
return time.Duration(b.InstanceServers[0].PingInterval) * time.Millisecond
}
func (b *Bullet) PingTimeout() time.Duration {
return time.Duration(b.InstanceServers[0].PingTimeout) * time.Millisecond
}
func (b *Bullet) URL() (*url.URL, error) { func (b *Bullet) URL() (*url.URL, error) {
if len(b.InstanceServers) == 0 { if len(b.InstanceServers) == 0 {
return nil, errors.New("InstanceServers is empty") return nil, errors.New("InstanceServers is empty")

View File

@ -12,8 +12,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
) )
const readTimeout = 15 * time.Second const readTimeout = 20 * time.Second
const pingInterval = 18000 * time.Millisecond
type WebsocketOp struct { type WebsocketOp struct {
Op string `json:"op"` Op string `json:"op"`
@ -31,13 +30,13 @@ type WebsocketLogin struct {
type Stream struct { type Stream struct {
types.StandardStream types.StandardStream
client *kucoinapi.RestClient client *kucoinapi.RestClient
conn *websocket.Conn conn *websocket.Conn
connLock sync.Mutex connLock sync.Mutex
connCtx context.Context connCtx context.Context
connCancel context.CancelFunc connCancel context.CancelFunc
bullet *kucoinapi.Bullet bullet *kucoinapi.Bullet
publicOnly bool publicOnly bool
} }
@ -60,7 +59,7 @@ func (s *Stream) sendSubscriptions() error {
} }
for _, cmd := range cmds { for _, cmd := range cmds {
if err := s.conn.WriteJSON(cmd) ; err != nil { if err := s.conn.WriteJSON(cmd); err != nil {
return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd) return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd)
} }
} }
@ -70,7 +69,7 @@ func (s *Stream) sendSubscriptions() error {
func (s *Stream) handleConnect() { func (s *Stream) handleConnect() {
if s.publicOnly { if s.publicOnly {
if err := s.sendSubscriptions() ; err != nil { if err := s.sendSubscriptions(); err != nil {
log.WithError(err).Errorf("subscription error") log.WithError(err).Errorf("subscription error")
return return
} }
@ -164,9 +163,11 @@ func (s *Stream) connect(ctx context.Context) error {
// create a new context // create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx) s.connCtx, s.connCancel = context.WithCancel(ctx)
conn.SetReadDeadline(time.Now().Add(readTimeout))
pingTimeout := s.bullet.PingTimeout()
conn.SetReadDeadline(time.Now().Add(pingTimeout))
conn.SetPongHandler(func(string) error { conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(readTimeout)) conn.SetReadDeadline(time.Now().Add(pingTimeout))
return nil return nil
}) })
@ -176,7 +177,7 @@ func (s *Stream) connect(ctx context.Context) error {
s.EmitConnect() s.EmitConnect()
go s.read(s.connCtx) go s.read(s.connCtx)
go ping(s.connCtx, s, pingInterval) go ping(s.connCtx, s, s.bullet.PingInterval())
return nil return nil
} }
@ -268,6 +269,8 @@ type WebSocketConnector interface {
} }
func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) { func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) {
log.Infof("starting ping worker with interval %s", interval)
pingTicker := time.NewTicker(interval) pingTicker := time.NewTicker(interval)
defer pingTicker.Stop() defer pingTicker.Stop()
@ -280,6 +283,15 @@ func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) {
case <-pingTicker.C: case <-pingTicker.C:
conn := w.Conn() conn := w.Conn()
if err := conn.WriteJSON(kucoinapi.WebSocketCommand{
Id: time.Now().UnixMilli(),
Type: "ping",
}); err != nil {
log.WithError(err).Error("websocket ping error", err)
w.Reconnect()
}
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil { if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err) log.WithError(err).Error("ping error", err)
w.Reconnect() w.Reconnect()