From 730ce31e679856656873944cdedc69f396ea1a30 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 23 Dec 2021 01:32:02 +0800 Subject: [PATCH] kucoin: implement NewStream --- pkg/exchange/kucoin/convert.go | 38 +++++++++++++-- pkg/exchange/kucoin/exchange.go | 2 +- pkg/exchange/kucoin/stream.go | 86 +++++++++++---------------------- 3 files changed, 63 insertions(+), 63 deletions(-) diff --git a/pkg/exchange/kucoin/convert.go b/pkg/exchange/kucoin/convert.go index bf008e80a..f4c9aecd0 100644 --- a/pkg/exchange/kucoin/convert.go +++ b/pkg/exchange/kucoin/convert.go @@ -1,8 +1,10 @@ package kucoin import ( + "fmt" "math" "strings" + "time" "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" "github.com/c9s/bbgo/pkg/types" @@ -64,6 +66,36 @@ func toGlobalTicker(s kucoinapi.Ticker24H) types.Ticker { } } -func convertSubscription(s types.Subscription) (WebsocketSubscription, error) { - return WebsocketSubscription{}, nil -} \ No newline at end of file + +// convertSubscriptions global subscription to local websocket command +func convertSubscriptions(ss []types.Subscription) ([]kucoinapi.WebSocketCommand, error) { + var id = time.Now().UnixMilli() + var cmds []kucoinapi.WebSocketCommand + for _, s := range ss { + id++ + + var subscribeType string + switch s.Channel { + case types.BookChannel: + // see https://docs.kucoin.com/#level-2-market-data + subscribeType = "/market/level2" + ":" + toLocalSymbol(s.Symbol) + + case types.KLineChannel: + subscribeType = "/market/candles" + ":" + toLocalSymbol(s.Symbol) + "_" + s.Options.Interval + + default: + return nil, fmt.Errorf("websocket channel %s is not supported by kucoin", s.Channel) + } + + cmds = append(cmds, kucoinapi.WebSocketCommand{ + Id: id, + Type: subscribeType, + Topic: "subscribe", + PrivateChannel: false, + Response: true, + }) + } + + return cmds, nil +} + diff --git a/pkg/exchange/kucoin/exchange.go b/pkg/exchange/kucoin/exchange.go index 9c85b43f6..05d2205bd 100644 --- a/pkg/exchange/kucoin/exchange.go +++ b/pkg/exchange/kucoin/exchange.go @@ -138,5 +138,5 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro } func (e *Exchange) NewStream() types.Stream { - panic("implement me") + return NewStream(e.client) } diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index 62c5899c9..c9d8df2fd 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -9,6 +9,7 @@ import ( "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" "github.com/c9s/bbgo/pkg/types" "github.com/gorilla/websocket" + "github.com/pkg/errors" ) const readTimeout = 15 * time.Second @@ -40,8 +41,6 @@ type Stream struct { publicOnly bool } -type WebsocketSubscription struct{} - func NewStream(client *kucoinapi.RestClient) *Stream { stream := &Stream{ client: client, @@ -50,63 +49,34 @@ func NewStream(client *kucoinapi.RestClient) *Stream { }, } - stream.OnConnect(func() { - if stream.publicOnly { - var subs []WebsocketSubscription - for _, subscription := range stream.Subscriptions { - sub, err := convertSubscription(subscription) - if err != nil { - log.WithError(err).Errorf("subscription convert error") - continue - } - - subs = append(subs, sub) - } - if len(subs) == 0 { - return - } - - log.Infof("subscribing channels: %+v", subs) - err := stream.conn.WriteJSON(WebsocketOp{ - Op: "subscribe", - Args: subs, - }) - - if err != nil { - log.WithError(err).Error("subscribe error") - } - } else { - // login as private channel - // sign example: - // sign=CryptoJS.enc.Base64.Stringify(CryptoJS.HmacSHA256(timestamp +'GET'+'/users/self/verify', secretKey)) - /* - msTimestamp := strconv.FormatFloat(float64(time.Now().UnixNano())/float64(time.Second), 'f', -1, 64) - payload := msTimestamp + "GET" + "/users/self/verify" - sign := okexapi.Sign(payload, stream.client.Secret) - op := WebsocketOp{ - Op: "login", - Args: []WebsocketLogin{ - { - Key: stream.client.Key, - Passphrase: stream.client.Passphrase, - Timestamp: msTimestamp, - Sign: sign, - }, - }, - } - - log.Infof("sending login request: %+v", op) - err := stream.conn.WriteJSON(op) - if err != nil { - log.WithError(err).Errorf("can not send login message") - } - */ - } - }) - + stream.OnConnect(stream.handleConnect) return stream } +func (s *Stream) sendSubscriptions() error { + cmds, err := convertSubscriptions(s.Subscriptions) + if err != nil { + return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions) + } + + for _, cmd := range cmds { + if err := s.conn.WriteJSON(cmd) ; err != nil { + return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd) + } + } + + return nil +} + +func (s *Stream) handleConnect() { + if s.publicOnly { + if err := s.sendSubscriptions() ; err != nil { + log.WithError(err).Errorf("subscription error") + return + } + } +} + func (s *Stream) SetPublicOnly() { s.publicOnly = true } @@ -225,9 +195,7 @@ func (s *Stream) read(ctx context.Context) { return default: - s.connLock.Lock() - conn := s.conn - s.connLock.Unlock() + conn := s.Conn() if err := conn.SetReadDeadline(time.Now().Add(readTimeout)); err != nil { log.WithError(err).Errorf("set read deadline error: %s", err.Error())