bbgo_origin/pkg/exchange/kucoin/stream.go

294 lines
5.7 KiB
Go
Raw Normal View History

2021-12-22 13:06:21 +00:00
package kucoin
import (
"context"
"net"
"sync"
"time"
"github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi"
"github.com/c9s/bbgo/pkg/types"
"github.com/gorilla/websocket"
2021-12-22 17:32:02 +00:00
"github.com/pkg/errors"
2021-12-22 13:06:21 +00:00
)
const readTimeout = 15 * time.Second
2021-12-22 16:51:50 +00:00
const pingInterval = 18000 * time.Millisecond
2021-12-22 13:06:21 +00:00
type WebsocketOp struct {
Op string `json:"op"`
Args interface{} `json:"args"`
}
type WebsocketLogin struct {
Key string `json:"apiKey"`
Passphrase string `json:"passphrase"`
Timestamp string `json:"timestamp"`
Sign string `json:"sign"`
}
//go:generate callbackgen -type Stream -interface
type Stream struct {
types.StandardStream
2021-12-22 16:51:50 +00:00
client *kucoinapi.RestClient
conn *websocket.Conn
connLock sync.Mutex
2021-12-22 13:06:21 +00:00
connCtx context.Context
connCancel context.CancelFunc
2021-12-22 16:51:50 +00:00
bullet *kucoinapi.Bullet
2021-12-22 13:06:21 +00:00
publicOnly bool
}
func NewStream(client *kucoinapi.RestClient) *Stream {
stream := &Stream{
2021-12-22 16:51:50 +00:00
client: client,
2021-12-22 13:06:21 +00:00
StandardStream: types.StandardStream{
ReconnectC: make(chan struct{}, 1),
},
}
2021-12-22 17:32:02 +00:00
stream.OnConnect(stream.handleConnect)
return stream
}
2021-12-22 13:06:21 +00:00
2021-12-22 17:32:02 +00:00
func (s *Stream) sendSubscriptions() error {
cmds, err := convertSubscriptions(s.Subscriptions)
if err != nil {
return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions)
}
2021-12-22 13:06:21 +00:00
2021-12-22 17:32:02 +00:00
for _, cmd := range cmds {
if err := s.conn.WriteJSON(cmd) ; err != nil {
return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd)
}
}
2021-12-22 13:06:21 +00:00
2021-12-22 17:32:02 +00:00
return nil
}
2021-12-22 13:06:21 +00:00
2021-12-22 17:32:02 +00:00
func (s *Stream) handleConnect() {
if s.publicOnly {
if err := s.sendSubscriptions() ; err != nil {
log.WithError(err).Errorf("subscription error")
return
2021-12-22 13:06:21 +00:00
}
2021-12-22 17:32:02 +00:00
}
2021-12-22 13:06:21 +00:00
}
func (s *Stream) SetPublicOnly() {
s.publicOnly = true
}
func (s *Stream) Close() error {
return nil
}
func (s *Stream) Connect(ctx context.Context) error {
err := s.connect(ctx)
if err != nil {
return err
}
// start one re-connector goroutine with the base context
go s.Reconnector(ctx)
s.EmitStart()
return nil
}
func (s *Stream) Reconnector(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-s.ReconnectC:
log.Warnf("received reconnect signal, reconnecting...")
time.Sleep(3 * time.Second)
if err := s.connect(ctx); err != nil {
log.WithError(err).Errorf("connect error, try to reconnect again...")
s.Reconnect()
}
}
}
}
2021-12-22 16:45:59 +00:00
// getEndpoint use the publicOnly flag to check whether we should allocate a public bullet or private bullet
func (s *Stream) getEndpoint() (string, error) {
var bullet *kucoinapi.Bullet
var err error
2021-12-22 13:06:21 +00:00
if s.publicOnly {
2021-12-22 16:51:50 +00:00
bullet, err = s.client.BulletService.NewGetPublicBulletRequest().Do(nil)
2021-12-22 13:06:21 +00:00
} else {
2021-12-22 16:51:50 +00:00
bullet, err = s.client.BulletService.NewGetPrivateBulletRequest().Do(nil)
2021-12-22 13:06:21 +00:00
}
2021-12-22 16:45:59 +00:00
if err != nil {
return "", err
}
url, err := bullet.URL()
if err != nil {
return "", err
}
2021-12-22 16:51:50 +00:00
s.bullet = bullet
2021-12-22 16:45:59 +00:00
return url.String(), nil
2021-12-22 13:06:21 +00:00
}
func (s *Stream) connect(ctx context.Context) error {
2021-12-22 16:45:59 +00:00
url, err := s.getEndpoint()
if err != nil {
return err
}
2021-12-22 13:06:21 +00:00
conn, err := s.StandardStream.Dial(url)
if err != nil {
return err
}
log.Infof("websocket connected: %s", url)
// should only start one connection one time, so we lock the mutex
s.connLock.Lock()
// ensure the previous context is cancelled
if s.connCancel != nil {
s.connCancel()
}
// create a new context
s.connCtx, s.connCancel = context.WithCancel(ctx)
conn.SetReadDeadline(time.Now().Add(readTimeout))
conn.SetPongHandler(func(string) error {
conn.SetReadDeadline(time.Now().Add(readTimeout))
return nil
})
2021-12-22 16:51:50 +00:00
s.conn = conn
2021-12-22 13:06:21 +00:00
s.connLock.Unlock()
s.EmitConnect()
go s.read(s.connCtx)
2021-12-22 16:53:55 +00:00
go ping(s.connCtx, s, pingInterval)
2021-12-22 13:06:21 +00:00
return nil
}
func (s *Stream) read(ctx context.Context) {
defer func() {
if s.connCancel != nil {
s.connCancel()
}
s.EmitDisconnect()
}()
for {
select {
case <-ctx.Done():
return
default:
2021-12-22 17:32:02 +00:00
conn := s.Conn()
2021-12-22 13:06:21 +00:00
if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil {
log.WithError(err).Errorf("set read deadline error: %s", err.Error())
}
mt, message, err := conn.ReadMessage()
if err != nil {
// if it's a network timeout error, we should re-connect
switch err := err.(type) {
// if it's a websocket related error
case *websocket.CloseError:
if err.Code == websocket.CloseNormalClosure {
return
}
// for unexpected close error, we should re-connect
// emit reconnect to start a new connection
s.Reconnect()
return
case net.Error:
log.WithError(err).Error("network error")
s.Reconnect()
return
default:
log.WithError(err).Error("unexpected connection error")
s.Reconnect()
return
}
}
// skip non-text messages
if mt != websocket.TextMessage {
continue
}
e, err := parseWebsocketPayload(string(message))
if err != nil {
log.WithError(err).Error("message parse error")
}
if e != nil {
switch et := e.(type) {
/*
case *AccountEvent:
s.EmitOrderDetails(et)
*/
default:
log.Warnf("unhandled event: %+v", et)
}
}
}
}
}
2021-12-22 16:53:55 +00:00
func (s *Stream) Conn() *websocket.Conn {
2021-12-22 13:06:21 +00:00
s.connLock.Lock()
2021-12-22 16:51:50 +00:00
conn := s.conn
2021-12-22 13:06:21 +00:00
s.connLock.Unlock()
return conn
}
2021-12-22 16:53:55 +00:00
type WebSocketConnector interface {
Conn() *websocket.Conn
Reconnect()
}
func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) {
2021-12-22 16:51:50 +00:00
pingTicker := time.NewTicker(interval)
2021-12-22 13:06:21 +00:00
defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
log.Debug("ping worker stopped")
return
case <-pingTicker.C:
2021-12-22 16:53:55 +00:00
conn := w.Conn()
2021-12-22 13:06:21 +00:00
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(3*time.Second)); err != nil {
log.WithError(err).Error("ping error", err)
2021-12-22 16:53:55 +00:00
w.Reconnect()
2021-12-22 13:06:21 +00:00
}
}
}
}
func parseWebsocketPayload(in string) (interface{}, error) {
return nil, nil
}