From e76dd1cbc4477fa7c69b24f9532b0cda93952e93 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 23 Dec 2021 17:32:56 +0800 Subject: [PATCH] kucoin: fix kline parsing and subscription --- examples/kucoin/websocket.go | 51 ++++++++---- pkg/cmd/kline.go | 34 +++++--- pkg/cmd/userdatastream.go | 2 + pkg/exchange/kucoin/convert.go | 34 ++++++-- pkg/exchange/kucoin/parse.go | 78 ++++++++++++++----- pkg/exchange/kucoin/stream.go | 73 ++++++++--------- pkg/exchange/kucoin/stream_callbacks.go | 44 +++++------ .../kucoin/{kucoinapi => }/websocket.go | 54 ++++++++++--- pkg/types/time.go | 22 ++++++ 9 files changed, 273 insertions(+), 119 deletions(-) rename pkg/exchange/kucoin/{kucoinapi => }/websocket.go (72%) diff --git a/examples/kucoin/websocket.go b/examples/kucoin/websocket.go index 2a624685f..af67b4feb 100644 --- a/examples/kucoin/websocket.go +++ b/examples/kucoin/websocket.go @@ -7,6 +7,7 @@ import ( "os/signal" "time" + "github.com/c9s/bbgo/pkg/exchange/kucoin" "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" "github.com/gorilla/websocket" "github.com/sirupsen/logrus" @@ -73,25 +74,33 @@ var websocketCmd = &cobra.Command{ defer c.Close() - wsCmd := &kucoinapi.WebSocketCommand{ - Id: time.Now().UnixMilli(), - Type: "subscribe", - Topic: "/market/ticker:ETH-USDT", - PrivateChannel: false, - Response: true, + id := time.Now().UnixMilli() + wsCmds := []kucoin.WebSocketCommand{ + /* + { + Id: id+1, + Type: "subscribe", + Topic: "/market/ticker:ETH-USDT", + PrivateChannel: false, + Response: true, + }, + */ + { + Id: id+2, + Type: "subscribe", + Topic: "/market/candles:ETH-USDT_1min", + PrivateChannel: false, + Response: true, + }, } - msg, err := wsCmd.JSON() - if err != nil { - return err + for _, wsCmd := range wsCmds { + err = c.WriteJSON(wsCmd) + if err != nil { + return err + } } - err = c.WriteMessage(websocket.TextMessage, msg) - if err != nil { - return err - } - - done := make(chan struct{}) go func() { defer close(done) @@ -106,11 +115,23 @@ var websocketCmd = &cobra.Command{ } }() + pingTicker := time.NewTicker(bullet.PingInterval()) + defer pingTicker.Stop() + for { select { case <-done: return nil + case <-pingTicker.C: + if err := c.WriteJSON(kucoin.WebSocketCommand{ + Id: time.Now().UnixMilli(), + Type: "ping", + }); err != nil { + logrus.WithError(err).Error("websocket ping error", err) + } + + case <-interrupt: logrus.Infof("interrupt") diff --git a/pkg/cmd/kline.go b/pkg/cmd/kline.go index f65dedb91..a55852cdc 100644 --- a/pkg/cmd/kline.go +++ b/pkg/cmd/kline.go @@ -5,8 +5,10 @@ import ( "fmt" "syscall" + "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/types" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" "github.com/spf13/cobra" ) @@ -18,19 +20,23 @@ var klineCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - exName, err := cmd.Flags().GetString("exchange") - if err != nil { - return fmt.Errorf("can not get exchange from flags: %w", err) + if userConfig == nil { + return errors.New("--config option or config file is missing") } - exchangeName, err := types.ValidExchangeName(exName) + environ := bbgo.NewEnvironment() + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + sessionName, err := cmd.Flags().GetString("session") if err != nil { return err } - ex, err := cmdutil.NewExchange(exchangeName) - if err != nil { - return err + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) } symbol, err := cmd.Flags().GetString("symbol") @@ -47,7 +53,7 @@ var klineCmd = &cobra.Command{ return err } - s := ex.NewStream() + s := session.Exchange.NewStream() s.SetPublicOnly() s.Subscribe(types.KLineChannel, symbol, types.SubscribeOptions{Interval: interval}) @@ -61,9 +67,17 @@ var klineCmd = &cobra.Command{ log.Infof("connecting...") if err := s.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect to %s", exchangeName) + return err } + log.Infof("connected") + defer func() { + log.Infof("closing connection...") + if err := s.Close(); err != nil { + log.WithError(err).Errorf("connection close error") + } + }() + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) return nil }, @@ -71,7 +85,7 @@ var klineCmd = &cobra.Command{ func init() { // since the public data does not require trading authentication, we use --exchange option here. - klineCmd.Flags().String("exchange", "", "the exchange name") + klineCmd.Flags().String("session", "", "session name") klineCmd.Flags().String("symbol", "", "the trading pair. e.g, BTCUSDT, LTCUSDT...") klineCmd.Flags().String("interval", "1m", "interval of the kline (candle), .e.g, 1m, 3m, 15m") RootCmd.AddCommand(klineCmd) diff --git a/pkg/cmd/userdatastream.go b/pkg/cmd/userdatastream.go index ce281d4f7..b90f0dd0d 100644 --- a/pkg/cmd/userdatastream.go +++ b/pkg/cmd/userdatastream.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "syscall" + "time" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/cmd/cmdutil" @@ -63,6 +64,7 @@ var userDataStreamCmd = &cobra.Command{ if err := s.Close(); err != nil { log.WithError(err).Errorf("connection close error") } + time.Sleep(1 * time.Second) }() cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) diff --git a/pkg/exchange/kucoin/convert.go b/pkg/exchange/kucoin/convert.go index f1e681237..6786c75a8 100644 --- a/pkg/exchange/kucoin/convert.go +++ b/pkg/exchange/kucoin/convert.go @@ -67,11 +67,35 @@ func toGlobalTicker(s kucoinapi.Ticker24H) types.Ticker { } } +func toLocalInterval(i types.Interval) string { + switch i { + case types.Interval1m: + return "1min" + + case types.Interval15m: + return "15min" + + case types.Interval30m: + return "30min" + + case types.Interval1h: + return "1hour" + + case types.Interval2h: + return "2hour" + + case types.Interval4h: + return "4hour" + + } + + return "1h" +} // convertSubscriptions global subscription to local websocket command -func convertSubscriptions(ss []types.Subscription) ([]kucoinapi.WebSocketCommand, error) { +func convertSubscriptions(ss []types.Subscription) ([]WebSocketCommand, error) { var id = time.Now().UnixMilli() - var cmds []kucoinapi.WebSocketCommand + var cmds []WebSocketCommand for _, s := range ss { id++ @@ -82,15 +106,15 @@ func convertSubscriptions(ss []types.Subscription) ([]kucoinapi.WebSocketCommand subscribeTopic = "/market/level2" + ":" + toLocalSymbol(s.Symbol) case types.KLineChannel: - subscribeTopic = "/market/candles" + ":" + toLocalSymbol(s.Symbol) + "_" + s.Options.Interval + subscribeTopic = "/market/candles" + ":" + toLocalSymbol(s.Symbol) + "_" + toLocalInterval(types.Interval(s.Options.Interval)) default: return nil, fmt.Errorf("websocket channel %s is not supported by kucoin", s.Channel) } - cmds = append(cmds, kucoinapi.WebSocketCommand{ + cmds = append(cmds, WebSocketCommand{ Id: id, - Type: kucoinapi.WebSocketMessageTypeSubscribe, + Type: WebSocketMessageTypeSubscribe, Topic: subscribeTopic, PrivateChannel: false, Response: true, diff --git a/pkg/exchange/kucoin/parse.go b/pkg/exchange/kucoin/parse.go index 35ddab230..a3bd18257 100644 --- a/pkg/exchange/kucoin/parse.go +++ b/pkg/exchange/kucoin/parse.go @@ -2,53 +2,61 @@ package kucoin import ( "encoding/json" + "strings" - "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" + "github.com/c9s/bbgo/pkg/types" ) -func parseWebsocketPayload(in []byte) (*kucoinapi.WebSocketEvent, error) { - var resp kucoinapi.WebSocketEvent +func parseWebsocketPayload(in []byte) (*WebSocketEvent, error) { + var resp WebSocketEvent var err = json.Unmarshal(in, &resp) if err != nil { return nil, err } switch resp.Type { - case kucoinapi.WebSocketMessageTypeAck: + case WebSocketMessageTypeAck: return &resp, nil - case kucoinapi.WebSocketMessageTypeMessage: + case WebSocketMessageTypeError: + resp.Object = string(resp.Data) + return &resp, nil + + case WebSocketMessageTypeMessage: switch resp.Subject { - case kucoinapi.WebSocketSubjectOrderChange: - var o kucoinapi.WebSocketPrivateOrder + case WebSocketSubjectOrderChange: + var o WebSocketPrivateOrderEvent if err := json.Unmarshal(resp.Data, &o); err != nil { return &resp, err } resp.Object = &o - case kucoinapi.WebSocketSubjectAccountBalance: - var o kucoinapi.WebSocketAccountBalance + case WebSocketSubjectAccountBalance: + var o WebSocketAccountBalanceEvent if err := json.Unmarshal(resp.Data, &o); err != nil { return &resp, err } resp.Object = &o - case kucoinapi.WebSocketSubjectTradeCandlesUpdate: - var o kucoinapi.WebSocketCandle + case WebSocketSubjectTradeCandlesUpdate, WebSocketSubjectTradeCandlesAdd: + var o WebSocketCandleEvent + if err := json.Unmarshal(resp.Data, &o); err != nil { + return &resp, err + } + + o.Interval = extractIntervalFromTopic(resp.Topic) + o.Add = resp.Subject == WebSocketSubjectTradeCandlesAdd + resp.Object = &o + + case WebSocketSubjectTradeL2Update: + var o WebSocketOrderBookL2Event if err := json.Unmarshal(resp.Data, &o); err != nil { return &resp, err } resp.Object = &o - case kucoinapi.WebSocketSubjectTradeL2Update: - var o kucoinapi.WebSocketOrderBookL2 - if err := json.Unmarshal(resp.Data, &o); err != nil { - return &resp, err - } - resp.Object = &o - - case kucoinapi.WebSocketSubjectTradeTicker: - var o kucoinapi.WebSocketTicker + case WebSocketSubjectTradeTicker: + var o WebSocketTickerEvent if err := json.Unmarshal(resp.Data, &o); err != nil { return &resp, err } @@ -62,3 +70,33 @@ func parseWebsocketPayload(in []byte) (*kucoinapi.WebSocketEvent, error) { return &resp, nil } + +func extractIntervalFromTopic(topic string) types.Interval { + ta := strings.Split(topic, ":") + tb := strings.Split(ta[1], "_") + interval := tb[1] + return toGlobalInterval(interval) +} + +func toGlobalInterval(a string) types.Interval { + switch a { + case "1min": + return types.Interval1m + case "15min": + return types.Interval15m + case "30min": + return types.Interval30m + case "1hour": + return types.Interval1h + case "2hour": + return types.Interval2h + case "4hour": + return types.Interval4h + case "6hour": + return types.Interval6h + case "12hour": + return types.Interval12h + + } + return "" +} diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index 5e13d7bd5..a40fe058d 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -14,18 +14,6 @@ import ( const readTimeout = 30 * time.Second -type WebsocketOp struct { - Op string `json:"op"` - Args interface{} `json:"args"` -} - -type WebsocketLogin struct { - Key string `json:"apiKey"` - Passphrase string `json:"passphrase"` - Timestamp string `json:"timestamp"` - Sign string `json:"sign"` -} - //go:generate callbackgen -type Stream -interface type Stream struct { types.StandardStream @@ -36,13 +24,15 @@ type Stream struct { connCtx context.Context connCancel context.CancelFunc - bullet *kucoinapi.Bullet + bullet *kucoinapi.Bullet - candleEventCallbacks []func(e *kucoinapi.WebSocketCandle) - orderBookL2EventCallbacks []func(e *kucoinapi.WebSocketOrderBookL2) - tickerEventCallbacks []func(e *kucoinapi.WebSocketTicker) - accountBalanceEventCallbacks []func(e *kucoinapi.WebSocketAccountBalance) - privateOrderEventCallbacks []func(e *kucoinapi.WebSocketPrivateOrder) + candleEventCallbacks []func(candle *WebSocketCandleEvent, e *WebSocketEvent) + orderBookL2EventCallbacks []func(e *WebSocketOrderBookL2Event) + tickerEventCallbacks []func(e *WebSocketTickerEvent) + accountBalanceEventCallbacks []func(e *WebSocketAccountBalanceEvent) + privateOrderEventCallbacks []func(e *WebSocketPrivateOrderEvent) + + lastCandle map[string]types.KLine } func NewStream(client *kucoinapi.RestClient) *Stream { @@ -51,6 +41,7 @@ func NewStream(client *kucoinapi.RestClient) *Stream { StandardStream: types.StandardStream{ ReconnectC: make(chan struct{}, 1), }, + lastCandle: make(map[string]types.KLine), } stream.OnConnect(stream.handleConnect) @@ -58,16 +49,27 @@ func NewStream(client *kucoinapi.RestClient) *Stream { stream.OnOrderBookL2Event(stream.handleOrderBookL2Event) stream.OnTickerEvent(stream.handleTickerEvent) stream.OnPrivateOrderEvent(stream.handlePrivateOrderEvent) + stream.OnAccountBalanceEvent(stream.handleAccountBalanceEvent) return stream } -func (s *Stream) handleCandleEvent(e *kucoinapi.WebSocketCandle) {} +func (s *Stream) handleCandleEvent(candle *WebSocketCandleEvent, e *WebSocketEvent) { + kline := candle.KLine() + last, ok := s.lastCandle[e.Topic] + if ok && kline.StartTime.After(last.StartTime.Time()) || e.Subject == WebSocketSubjectTradeCandlesAdd { + last.Closed = true + s.EmitKLineClosed(last) + } -func (s *Stream) handleOrderBookL2Event(e *kucoinapi.WebSocketOrderBookL2) {} + s.EmitKLine(kline) + s.lastCandle[e.Topic] = kline +} -func (s *Stream) handleTickerEvent(e *kucoinapi.WebSocketTicker) {} +func (s *Stream) handleOrderBookL2Event(e *WebSocketOrderBookL2Event) {} -func (s *Stream) handleAccountBalanceEvent(e *kucoinapi.WebSocketAccountBalance) { +func (s *Stream) handleTickerEvent(e *WebSocketTickerEvent) {} + +func (s *Stream) handleAccountBalanceEvent(e *WebSocketAccountBalanceEvent) { bm := types.BalanceMap{} bm[e.Currency] = types.Balance{ Currency: e.Currency, @@ -77,7 +79,7 @@ func (s *Stream) handleAccountBalanceEvent(e *kucoinapi.WebSocketAccountBalance) s.StandardStream.EmitBalanceUpdate(bm) } -func (s *Stream) handlePrivateOrderEvent(e *kucoinapi.WebSocketPrivateOrder) { +func (s *Stream) handlePrivateOrderEvent(e *WebSocketPrivateOrderEvent) { if e.Type == "match" { s.StandardStream.EmitTradeUpdate(types.Trade{ OrderID: hashStringID(e.OrderId), @@ -136,17 +138,17 @@ func (s *Stream) handleConnect() { } } else { id := time.Now().UnixMilli() - cmds := []kucoinapi.WebSocketCommand{ + cmds := []WebSocketCommand{ { Id: id, - Type: kucoinapi.WebSocketMessageTypeSubscribe, + Type: WebSocketMessageTypeSubscribe, Topic: "/spotMarket/tradeOrders", PrivateChannel: true, Response: true, }, { Id: id + 1, - Type: kucoinapi.WebSocketMessageTypeSubscribe, + Type: WebSocketMessageTypeSubscribe, Topic: "/account/balance", PrivateChannel: true, Response: true, @@ -160,7 +162,6 @@ func (s *Stream) handleConnect() { } } - func (s *Stream) Close() error { conn := s.Conn() return conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) @@ -333,7 +334,7 @@ func (s *Stream) read(ctx context.Context) { } // used for debugging - // fmt.Println(string(message)) + // log.Println(string(message)) e, err := parseWebsocketPayload(message) if err != nil { @@ -352,22 +353,22 @@ func (s *Stream) read(ctx context.Context) { } } -func (s *Stream) dispatchEvent(e *kucoinapi.WebSocketEvent) { +func (s *Stream) dispatchEvent(e *WebSocketEvent) { switch et := e.Object.(type) { - case *kucoinapi.WebSocketTicker: + case *WebSocketTickerEvent: s.EmitTickerEvent(et) - case *kucoinapi.WebSocketOrderBookL2: + case *WebSocketOrderBookL2Event: s.EmitOrderBookL2Event(et) - case *kucoinapi.WebSocketCandle: - s.EmitCandleEvent(et) + case *WebSocketCandleEvent: + s.EmitCandleEvent(et, e) - case *kucoinapi.WebSocketAccountBalance: + case *WebSocketAccountBalanceEvent: s.EmitAccountBalanceEvent(et) - case *kucoinapi.WebSocketPrivateOrder: + case *WebSocketPrivateOrderEvent: s.EmitPrivateOrderEvent(et) default: @@ -404,7 +405,7 @@ func ping(ctx context.Context, w WebSocketConnector, interval time.Duration) { case <-pingTicker.C: conn := w.Conn() - if err := conn.WriteJSON(kucoinapi.WebSocketCommand{ + if err := conn.WriteJSON(WebSocketCommand{ Id: time.Now().UnixMilli(), Type: "ping", }); err != nil { diff --git a/pkg/exchange/kucoin/stream_callbacks.go b/pkg/exchange/kucoin/stream_callbacks.go index de60ad871..944d15948 100644 --- a/pkg/exchange/kucoin/stream_callbacks.go +++ b/pkg/exchange/kucoin/stream_callbacks.go @@ -2,68 +2,64 @@ package kucoin -import ( - "github.com/c9s/bbgo/pkg/exchange/kucoin/kucoinapi" -) - -func (s *Stream) OnCandleEvent(cb func(c *kucoinapi.WebSocketCandle)) { +func (s *Stream) OnCandleEvent(cb func(candle *WebSocketCandleEvent, e *WebSocketEvent)) { s.candleEventCallbacks = append(s.candleEventCallbacks, cb) } -func (s *Stream) EmitCandleEvent(c *kucoinapi.WebSocketCandle) { +func (s *Stream) EmitCandleEvent(candle *WebSocketCandleEvent, e *WebSocketEvent) { for _, cb := range s.candleEventCallbacks { - cb(c) + cb(candle, e) } } -func (s *Stream) OnOrderBookL2Event(cb func(c *kucoinapi.WebSocketOrderBookL2)) { +func (s *Stream) OnOrderBookL2Event(cb func(e *WebSocketOrderBookL2Event)) { s.orderBookL2EventCallbacks = append(s.orderBookL2EventCallbacks, cb) } -func (s *Stream) EmitOrderBookL2Event(c *kucoinapi.WebSocketOrderBookL2) { +func (s *Stream) EmitOrderBookL2Event(e *WebSocketOrderBookL2Event) { for _, cb := range s.orderBookL2EventCallbacks { - cb(c) + cb(e) } } -func (s *Stream) OnTickerEvent(cb func(c *kucoinapi.WebSocketTicker)) { +func (s *Stream) OnTickerEvent(cb func(e *WebSocketTickerEvent)) { s.tickerEventCallbacks = append(s.tickerEventCallbacks, cb) } -func (s *Stream) EmitTickerEvent(c *kucoinapi.WebSocketTicker) { +func (s *Stream) EmitTickerEvent(e *WebSocketTickerEvent) { for _, cb := range s.tickerEventCallbacks { - cb(c) + cb(e) } } -func (s *Stream) OnAccountBalanceEvent(cb func(c *kucoinapi.WebSocketAccountBalance)) { +func (s *Stream) OnAccountBalanceEvent(cb func(e *WebSocketAccountBalanceEvent)) { s.accountBalanceEventCallbacks = append(s.accountBalanceEventCallbacks, cb) } -func (s *Stream) EmitAccountBalanceEvent(c *kucoinapi.WebSocketAccountBalance) { +func (s *Stream) EmitAccountBalanceEvent(e *WebSocketAccountBalanceEvent) { for _, cb := range s.accountBalanceEventCallbacks { - cb(c) + cb(e) } } -func (s *Stream) OnPrivateOrderEvent(cb func(c *kucoinapi.WebSocketPrivateOrder)) { +func (s *Stream) OnPrivateOrderEvent(cb func(e *WebSocketPrivateOrderEvent)) { s.privateOrderEventCallbacks = append(s.privateOrderEventCallbacks, cb) } -func (s *Stream) EmitPrivateOrderEvent(c *kucoinapi.WebSocketPrivateOrder) { +func (s *Stream) EmitPrivateOrderEvent(e *WebSocketPrivateOrderEvent) { for _, cb := range s.privateOrderEventCallbacks { - cb(c) + cb(e) } } type StreamEventHub interface { - OnCandleEvent(cb func(c *kucoinapi.WebSocketCandle)) + OnCandleEvent(cb func(candle *WebSocketCandleEvent, e *WebSocketEvent)) - OnOrderBookL2Event(cb func(c *kucoinapi.WebSocketOrderBookL2)) + OnOrderBookL2Event(cb func(e *WebSocketOrderBookL2Event)) - OnTickerEvent(cb func(c *kucoinapi.WebSocketTicker)) + OnTickerEvent(cb func(e *WebSocketTickerEvent)) - OnAccountBalanceEvent(cb func(c *kucoinapi.WebSocketAccountBalance)) + OnAccountBalanceEvent(cb func(e *WebSocketAccountBalanceEvent)) - OnPrivateOrderEvent(cb func(c *kucoinapi.WebSocketPrivateOrder)) + OnPrivateOrderEvent(cb func(e *WebSocketPrivateOrderEvent)) } diff --git a/pkg/exchange/kucoin/kucoinapi/websocket.go b/pkg/exchange/kucoin/websocket.go similarity index 72% rename from pkg/exchange/kucoin/kucoinapi/websocket.go rename to pkg/exchange/kucoin/websocket.go index b213473b3..521e019c3 100644 --- a/pkg/exchange/kucoin/kucoinapi/websocket.go +++ b/pkg/exchange/kucoin/websocket.go @@ -1,10 +1,12 @@ -package kucoinapi +package kucoin import ( "encoding/json" + "time" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" ) type WebSocketMessageType string @@ -14,6 +16,7 @@ const ( WebSocketMessageTypeSubscribe WebSocketMessageType = "subscribe" WebSocketMessageTypeUnsubscribe WebSocketMessageType = "unsubscribe" WebSocketMessageTypeAck WebSocketMessageType = "ack" + WebSocketMessageTypeError WebSocketMessageType = "error" WebSocketMessageTypePong WebSocketMessageType = "pong" WebSocketMessageTypeWelcome WebSocketMessageType = "welcome" WebSocketMessageTypeMessage WebSocketMessageType = "message" @@ -27,6 +30,7 @@ const ( WebSocketSubjectTradeL2Update WebSocketSubject = "trade.l2update" // order book L2 WebSocketSubjectLevel2 WebSocketSubject = "level2" // level2 WebSocketSubjectTradeCandlesUpdate WebSocketSubject = "trade.candles.update" + WebSocketSubjectTradeCandlesAdd WebSocketSubject = "trade.candles.add" // private subjects WebSocketSubjectOrderChange WebSocketSubject = "orderChange" @@ -53,12 +57,13 @@ type WebSocketEvent struct { Topic string `json:"topic"` Subject WebSocketSubject `json:"subject"` Data json.RawMessage `json:"data"` + Code int `json:"code"` // used in type error // Object is used for storing the parsed Data Object interface{} `json:"-"` } -type WebSocketTicker struct { +type WebSocketTickerEvent struct { Sequence string `json:"sequence"` Price fixedpoint.Value `json:"price"` Size fixedpoint.Value `json:"size"` @@ -68,7 +73,7 @@ type WebSocketTicker struct { BestBidSize fixedpoint.Value `json:"bestBidSize"` } -type WebSocketOrderBookL2 struct { +type WebSocketOrderBookL2Event struct { SequenceStart int64 `json:"sequenceStart"` SequenceEnd int64 `json:"sequenceEnd"` Symbol string `json:"symbol"` @@ -78,13 +83,44 @@ type WebSocketOrderBookL2 struct { } `json:"changes"` } -type WebSocketCandle struct { - Symbol string `json:"symbol"` - Candles []string `json:"candles"` - Time int64 `json:"time"` +type WebSocketCandleEvent struct { + Symbol string `json:"symbol"` + Candles []string `json:"candles"` + Time types.MillisecondTimestamp `json:"time"` + + // Interval is an injected field (not from the payload) + Interval types.Interval + + // Is a new candle or not + Add bool } -type WebSocketPrivateOrder struct { +func (e *WebSocketCandleEvent) KLine() types.KLine { + startTime := types.MustParseUnixTimestamp(e.Candles[0]) + openPrice := util.MustParseFloat(e.Candles[1]) + closePrice := util.MustParseFloat(e.Candles[2]) + highPrice := util.MustParseFloat(e.Candles[3]) + lowPrice := util.MustParseFloat(e.Candles[4]) + volume := util.MustParseFloat(e.Candles[5]) + quoteVolume := util.MustParseFloat(e.Candles[6]) + kline := types.KLine{ + Exchange: types.ExchangeKucoin, + Symbol: toGlobalSymbol(e.Symbol), + StartTime: types.Time(startTime), + EndTime: types.Time(startTime.Add(e.Interval.Duration() - time.Millisecond)), + Interval: e.Interval, + Open: openPrice, + Close: closePrice, + High: highPrice, + Low: lowPrice, + Volume: volume, + QuoteVolume: quoteVolume, + Closed: false, + } + return kline +} + +type WebSocketPrivateOrderEvent struct { OrderId string `json:"orderId"` TradeId string `json:"tradeId"` Symbol string `json:"symbol"` @@ -105,7 +141,7 @@ type WebSocketPrivateOrder struct { Ts types.MillisecondTimestamp `json:"ts"` } -type WebSocketAccountBalance struct { +type WebSocketAccountBalanceEvent struct { Total fixedpoint.Value `json:"total"` Available fixedpoint.Value `json:"available"` AvailableChange fixedpoint.Value `json:"availableChange"` diff --git a/pkg/types/time.go b/pkg/types/time.go index 6d2d4c09c..d5bf5573c 100644 --- a/pkg/types/time.go +++ b/pkg/types/time.go @@ -10,6 +10,28 @@ import ( type MillisecondTimestamp time.Time +func NewMillisecondTimestampFromInt(i int64) MillisecondTimestamp { + return MillisecondTimestamp(time.Unix(0, i * int64(time.Millisecond))) +} + +func MustParseMillisecondTimestamp(a string) MillisecondTimestamp { + m, err := strconv.ParseInt(a, 10, 64) // startTime + if err != nil { + panic(fmt.Errorf("millisecond timestamp parse error %v", err)) + } + + return NewMillisecondTimestampFromInt(m) +} + +func MustParseUnixTimestamp(a string) time.Time { + m, err := strconv.ParseInt(a, 10, 64) // startTime + if err != nil { + panic(fmt.Errorf("millisecond timestamp parse error %v", err)) + } + + return time.Unix(m, 0) +} + func (t MillisecondTimestamp) String() string { return time.Time(t).String() }