From 0b906606fe15c27a440656cca16cc42600b8cdb3 Mon Sep 17 00:00:00 2001 From: Edwin Date: Tue, 2 Jan 2024 23:18:57 +0800 Subject: [PATCH] pkg/exchange: refactor book and kline --- pkg/exchange/okex/parse.go | 513 ++++++++++++----------- pkg/exchange/okex/parse_test.go | 577 ++++++++++++++++++++++++++ pkg/exchange/okex/stream.go | 43 +- pkg/exchange/okex/stream_callbacks.go | 10 +- pkg/exchange/okex/stream_test.go | 17 + 5 files changed, 875 insertions(+), 285 deletions(-) create mode 100644 pkg/exchange/okex/parse_test.go diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 93a6b0c93..cd2cbffe1 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -4,7 +4,6 @@ import ( "encoding/json" "errors" "fmt" - "strconv" "strings" "time" @@ -15,288 +14,326 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -func parseWebSocketEvent(str []byte) (interface{}, error) { - v, err := fastjson.ParseBytes(str) +type Channel string + +const ( + ChannelBooks Channel = "books" + ChannelBook5 Channel = "book5" + ChannelCandlePrefix Channel = "candle" + ChannelAccount Channel = "account" + ChannelOrders Channel = "orders" +) + +type ActionType string + +const ( + ActionTypeSnapshot ActionType = "snapshot" + ActionTypeUpdate ActionType = "update" +) + +func parseWebSocketEvent(in []byte) (interface{}, error) { + v, err := fastjson.ParseBytes(in) if err != nil { return nil, err } - if v.Exists("event") { - return parseEvent(v) + var event WebSocketEvent + err = json.Unmarshal(in, &event) + if err != nil { + return nil, err + } + if event.Event != "" { + // TODO: remove fastjson + return event, nil } - if v.Exists("data") { - return parseData(v) + switch event.Arg.Channel { + case ChannelAccount: + // TODO: remove fastjson + return parseAccount(v) + + case ChannelBooks, ChannelBook5: + var bookEvent BookEvent + err = json.Unmarshal(event.Data, &bookEvent.Data) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into BookEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err) + } + + instId := event.Arg.InstId + bookEvent.InstrumentID = instId + bookEvent.Symbol = toGlobalSymbol(instId) + bookEvent.channel = event.Arg.Channel + bookEvent.Action = event.ActionType + return &bookEvent, nil + + case ChannelOrders: + // TODO: remove fastjson + return parseOrder(v) + + default: + if strings.HasPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix)) { + // TODO: Support kline subscription. The kline requires another URL to subscribe, which is why we cannot + // support it at this time. + var kLineEvt KLineEvent + err = json.Unmarshal(event.Data, &kLineEvt.Events) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into KLineEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err) + } + + kLineEvt.Channel = event.Arg.Channel + kLineEvt.InstrumentID = event.Arg.InstId + kLineEvt.Symbol = toGlobalSymbol(event.Arg.InstId) + kLineEvt.Interval = strings.ToLower(strings.TrimPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix))) + return &kLineEvt, nil + } } return nil, nil } type WebSocketEvent struct { - Event string `json:"event"` - Code string `json:"code,omitempty"` - Message string `json:"msg,omitempty"` - Arg interface{} `json:"arg,omitempty"` -} - -func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { - // event could be "subscribe", "unsubscribe" or "error" - event := string(v.GetStringBytes("event")) - code := string(v.GetStringBytes("code")) - message := string(v.GetStringBytes("msg")) - arg := v.GetObject("arg") - return &WebSocketEvent{ - Event: event, - Code: code, - Message: message, - Arg: arg, - }, nil + Event string `json:"event"` + Code string `json:"code,omitempty"` + Message string `json:"msg,omitempty"` + Arg struct { + Channel Channel `json:"channel"` + InstId string `json:"instId"` + } `json:"arg,omitempty"` + Data json.RawMessage `json:"data"` + ActionType ActionType `json:"action"` } type BookEvent struct { - InstrumentID string - Symbol string - Action string - Bids []BookEntry - Asks []BookEntry - MillisecondTimestamp int64 - Checksum int - channel string + InstrumentID string + Symbol string + Action ActionType + channel Channel + + Data []struct { + Bids PriceVolumeOrderSlice `json:"bids"` + Asks PriceVolumeOrderSlice `json:"asks"` + MillisecondTimestamp types.MillisecondTimestamp `json:"ts"` + Checksum int `json:"checksum"` + } } -func (data *BookEvent) BookTicker() types.BookTicker { +func (event *BookEvent) BookTicker() types.BookTicker { ticker := types.BookTicker{ - Symbol: data.Symbol, + Symbol: event.Symbol, } - if len(data.Bids) > 0 { - ticker.Buy = data.Bids[0].Price - ticker.BuySize = data.Bids[0].Volume - } + if len(event.Data) > 0 { + if len(event.Data[0].Bids) > 0 { + ticker.Buy = event.Data[0].Bids[0].Price + ticker.BuySize = event.Data[0].Bids[0].Volume + } - if len(data.Asks) > 0 { - ticker.Sell = data.Asks[0].Price - ticker.SellSize = data.Asks[0].Volume + if len(event.Data[0].Asks) > 0 { + ticker.Sell = event.Data[0].Asks[0].Price + ticker.SellSize = event.Data[0].Asks[0].Volume + } } return ticker } -func (data *BookEvent) Book() types.SliceOrderBook { +func (event *BookEvent) Book() types.SliceOrderBook { book := types.SliceOrderBook{ - Symbol: data.Symbol, - Time: types.NewMillisecondTimestampFromInt(data.MillisecondTimestamp).Time(), + Symbol: event.Symbol, } - for _, bid := range data.Bids { - book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume}) + if len(event.Data) > 0 { + book.Time = event.Data[0].MillisecondTimestamp.Time() } - for _, ask := range data.Asks { - book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume}) + for _, data := range event.Data { + for _, bid := range data.Bids { + book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume}) + } + + for _, ask := range data.Asks { + book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume}) + } } return book } -type BookEntry struct { - Price fixedpoint.Value - Volume fixedpoint.Value +type PriceVolumeOrder struct { + types.PriceVolume + // NumLiquidated is part of a deprecated feature and it is always "0" NumLiquidated int - NumOrders int + // NumOrders is the number of orders at the price. + NumOrders int } -func parseBookEntry(v *fastjson.Value) (*BookEntry, error) { - arr, err := v.Array() +type PriceVolumeOrderSlice []PriceVolumeOrder + +func (slice *PriceVolumeOrderSlice) UnmarshalJSON(b []byte) error { + s, err := ParsePriceVolumeOrderSliceJSON(b) if err != nil { - return nil, err + return err } - if len(arr) < 4 { - return nil, fmt.Errorf("unexpected book entry size: %d", len(arr)) - } - - price := fixedpoint.Must(fixedpoint.NewFromString(string(arr[0].GetStringBytes()))) - volume := fixedpoint.Must(fixedpoint.NewFromString(string(arr[1].GetStringBytes()))) - numLiquidated, err := strconv.Atoi(string(arr[2].GetStringBytes())) - if err != nil { - return nil, err - } - - numOrders, err := strconv.Atoi(string(arr[3].GetStringBytes())) - if err != nil { - return nil, err - } - - return &BookEntry{ - Price: price, - Volume: volume, - NumLiquidated: numLiquidated, - NumOrders: numOrders, - }, nil + *slice = s + return nil } -func parseBookData(v *fastjson.Value) (*BookEvent, error) { - instrumentId := string(v.GetStringBytes("arg", "instId")) - data := v.GetArray("data") - if len(data) == 0 { - return nil, errors.New("empty data payload") - } +// ParsePriceVolumeOrderSliceJSON tries to parse a 2 dimensional string array into a PriceVolumeOrderSlice +// +// [["8476.98", "415", "0", "13"], ["8477", "7", "0", "2"], ... ] +func ParsePriceVolumeOrderSliceJSON(b []byte) (slice PriceVolumeOrderSlice, err error) { + var as [][]fixedpoint.Value - // "snapshot" or "update" - action := string(v.GetStringBytes("action")) - - millisecondTimestamp, err := strconv.ParseInt(string(data[0].GetStringBytes("ts")), 10, 64) + err = json.Unmarshal(b, &as) if err != nil { - return nil, err + return slice, fmt.Errorf("failed to unmarshal price volume order slice: %w", err) } - checksum := data[0].GetInt("checksum") + for _, a := range as { + var pv PriceVolumeOrder + pv.Price = a[0] + pv.Volume = a[1] + pv.NumLiquidated = a[2].Int() + pv.NumOrders = a[3].Int() - var asks []BookEntry - var bids []BookEntry + slice = append(slice, pv) + } - for _, v := range data[0].GetArray("asks") { - entry, err := parseBookEntry(v) - if err != nil { - return nil, err + return slice, nil +} + +type KLine struct { + StartTime types.MillisecondTimestamp + OpenPrice fixedpoint.Value + HighestPrice fixedpoint.Value + LowestPrice fixedpoint.Value + ClosePrice fixedpoint.Value + // Volume trading volume, with a unit of contract.cccccbcvefkeibbhtrebbfklrbetukhrgjgkiilufbde + + // If it is a derivatives contract, the value is the number of contracts. + // If it is SPOT/MARGIN, the value is the quantity in base currency. + Volume fixedpoint.Value + // VolumeCcy trading volume, with a unit of currency. + // If it is a derivatives contract, the value is the number of base currency. + // If it is SPOT/MARGIN, the value is the quantity in quote currency. + VolumeCcy fixedpoint.Value + // VolumeCcyQuote Trading volume, the value is the quantity in quote currency + // e.g. The unit is USDT for BTC-USDT and BTC-USDT-SWAP; + // The unit is USD for BTC-USD-SWAP + VolumeCcyQuote fixedpoint.Value + // The state of candlesticks. + // 0 represents that it is uncompleted, 1 represents that it is completed. + Confirm fixedpoint.Value +} + +func (k KLine) ToGlobal(interval types.Interval, symbol string) types.KLine { + startTime := k.StartTime.Time() + + return types.KLine{ + Exchange: types.ExchangeOKEx, + Symbol: symbol, + StartTime: types.Time(startTime), + EndTime: types.Time(startTime.Add(interval.Duration() - time.Millisecond)), + Interval: interval, + Open: k.OpenPrice, + Close: k.ClosePrice, + High: k.HighestPrice, + Low: k.LowestPrice, + Volume: k.Volume, + QuoteVolume: k.VolumeCcy, // not supported + TakerBuyBaseAssetVolume: fixedpoint.Zero, // not supported + TakerBuyQuoteAssetVolume: fixedpoint.Zero, // not supported + LastTradeID: 0, // not supported + NumberOfTrades: 0, // not supported + Closed: !k.Confirm.IsZero(), + } +} + +type KLineSlice []KLine + +func (m *KLineSlice) UnmarshalJSON(b []byte) error { + if m == nil { + return errors.New("nil pointer of kline slice") + } + s, err := parseKLineSliceJSON(b) + if err != nil { + return err + } + + *m = s + return nil +} + +// parseKLineSliceJSON tries to parse a 2 dimensional string array into a KLineSlice +// +// [ +// [ +// "1597026383085", +// "8533.02", +// "8553.74", +// "8527.17", +// "8548.26", +// "45247", +// "529.5858061", +// "5529.5858061", +// "0" +// ] +// ] +func parseKLineSliceJSON(in []byte) (slice KLineSlice, err error) { + var rawKLines [][]json.RawMessage + + err = json.Unmarshal(in, &rawKLines) + if err != nil { + return slice, err + } + + for _, raw := range rawKLines { + if len(raw) != 9 { + return nil, fmt.Errorf("unexpected kline length: %d, data: %q", len(raw), raw) } - asks = append(asks, *entry) - } - - for _, v := range data[0].GetArray("bids") { - entry, err := parseBookEntry(v) - if err != nil { - return nil, err + var kline KLine + if err = json.Unmarshal(raw[0], &kline.StartTime); err != nil { + return nil, fmt.Errorf("failed to unmarshal into timestamp: %q", raw[0]) } - bids = append(bids, *entry) + if err = json.Unmarshal(raw[1], &kline.OpenPrice); err != nil { + return nil, fmt.Errorf("failed to unmarshal into open price: %q", raw[1]) + } + if err = json.Unmarshal(raw[2], &kline.HighestPrice); err != nil { + return nil, fmt.Errorf("failed to unmarshal into highest price: %q", raw[2]) + } + if err = json.Unmarshal(raw[3], &kline.LowestPrice); err != nil { + return nil, fmt.Errorf("failed to unmarshal into lowest price: %q", raw[3]) + } + if err = json.Unmarshal(raw[4], &kline.ClosePrice); err != nil { + return nil, fmt.Errorf("failed to unmarshal into close price: %q", raw[4]) + } + if err = json.Unmarshal(raw[5], &kline.Volume); err != nil { + return nil, fmt.Errorf("failed to unmarshal into volume: %q", raw[5]) + } + if err = json.Unmarshal(raw[6], &kline.VolumeCcy); err != nil { + return nil, fmt.Errorf("failed to unmarshal into volume currency: %q", raw[6]) + } + if err = json.Unmarshal(raw[7], &kline.VolumeCcyQuote); err != nil { + return nil, fmt.Errorf("failed to unmarshal into trading currency quote: %q", raw[7]) + } + if err = json.Unmarshal(raw[8], &kline.Confirm); err != nil { + return nil, fmt.Errorf("failed to unmarshal into confirm: %q", raw[8]) + } + + slice = append(slice, kline) } - return &BookEvent{ - InstrumentID: instrumentId, - Symbol: toGlobalSymbol(instrumentId), - Action: action, - Bids: bids, - Asks: asks, - Checksum: checksum, - MillisecondTimestamp: millisecondTimestamp, - }, nil + return slice, nil } -type Candle struct { - Channel string +type KLineEvent struct { + Events KLineSlice + InstrumentID string Symbol string Interval string - Open fixedpoint.Value - High fixedpoint.Value - Low fixedpoint.Value - Close fixedpoint.Value - - // Trading volume, with a unit of contact. - // If it is a derivatives contract, the value is the number of contracts. - // If it is SPOT/MARGIN, the value is the amount of trading currency. - Volume fixedpoint.Value - - // Trading volume, with a unit of currency. - // If it is a derivatives contract, the value is the number of settlement currency. - // If it is SPOT/MARGIN, the value is the number of quote currency. - VolumeInCurrency fixedpoint.Value - - MillisecondTimestamp int64 - - StartTime time.Time -} - -func (c *Candle) KLine() types.KLine { - interval := types.Interval(c.Interval) - endTime := c.StartTime.Add(interval.Duration() - 1*time.Millisecond) - return types.KLine{ - Exchange: types.ExchangeOKEx, - Interval: interval, - Open: c.Open, - High: c.High, - Low: c.Low, - Close: c.Close, - Volume: c.Volume, - QuoteVolume: c.VolumeInCurrency, - StartTime: types.Time(c.StartTime), - EndTime: types.Time(endTime), - } -} - -func parseCandle(channel string, v *fastjson.Value) (*Candle, error) { - instrumentID := string(v.GetStringBytes("arg", "instId")) - data, err := v.Get("data").Array() - if err != nil { - return nil, err - } - - if len(data) == 0 { - return nil, errors.New("candle data is empty") - } - - arr, err := data[0].Array() - if err != nil { - return nil, err - } - - if len(arr) < 7 { - return nil, fmt.Errorf("unexpected candle data length: %d", len(arr)) - } - - interval := strings.ToLower(strings.TrimPrefix(channel, "candle")) - - timestamp, err := strconv.ParseInt(string(arr[0].GetStringBytes()), 10, 64) - if err != nil { - return nil, err - } - - open, err := fixedpoint.NewFromString(string(arr[1].GetStringBytes())) - if err != nil { - return nil, err - } - - high, err := fixedpoint.NewFromString(string(arr[2].GetStringBytes())) - if err != nil { - return nil, err - } - - low, err := fixedpoint.NewFromString(string(arr[3].GetStringBytes())) - if err != nil { - return nil, err - } - - cls, err := fixedpoint.NewFromString(string(arr[4].GetStringBytes())) - if err != nil { - return nil, err - } - - vol, err := fixedpoint.NewFromString(string(arr[5].GetStringBytes())) - if err != nil { - return nil, err - } - - volCurrency, err := fixedpoint.NewFromString(string(arr[6].GetStringBytes())) - if err != nil { - return nil, err - } - - candleTime := time.Unix(0, timestamp*int64(time.Millisecond)) - return &Candle{ - Channel: channel, - InstrumentID: instrumentID, - Symbol: toGlobalSymbol(instrumentID), - Interval: interval, - Open: open, - High: high, - Low: low, - Close: cls, - Volume: vol, - VolumeInCurrency: volCurrency, - MillisecondTimestamp: timestamp, - StartTime: candleTime, - }, nil + Channel Channel } func parseAccount(v *fastjson.Value) (*okexapi.Account, error) { @@ -326,31 +363,3 @@ func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) { return orderDetails, nil } - -func parseData(v *fastjson.Value) (interface{}, error) { - - channel := string(v.GetStringBytes("arg", "channel")) - - switch channel { - case "books5": - data, err := parseBookData(v) - data.channel = channel - return data, err - case "books": - data, err := parseBookData(v) - data.channel = channel - return data, err - case "account": - return parseAccount(v) - case "orders": - return parseOrder(v) - default: - if strings.HasPrefix(channel, "candle") { - data, err := parseCandle(channel, v) - return data, err - } - - } - - return nil, nil -} diff --git a/pkg/exchange/okex/parse_test.go b/pkg/exchange/okex/parse_test.go new file mode 100644 index 000000000..42c41cf2d --- /dev/null +++ b/pkg/exchange/okex/parse_test.go @@ -0,0 +1,577 @@ +package okex + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +func TestParsePriceVolumeOrderSliceJSON(t *testing.T) { + t.Run("snapshot", func(t *testing.T) { + in := ` +{ + "arg": { + "channel": "books", + "instId": "BTC-USDT" + }, + "action": "snapshot", + "data": [ + { + "asks": [ + ["8476.98", "415", "0", "13"], + ["8477", "7", "0", "2"] + ], + "bids": [ + ["8476", "256", "0", "12"] + ], + "ts": "1597026383085", + "checksum": -855196043, + "prevSeqId": -1, + "seqId": 123456 + } + ] +} +` + + asks := PriceVolumeOrderSlice{ + { + PriceVolume: types.PriceVolume{ + Price: fixedpoint.NewFromFloat(8476.98), + Volume: fixedpoint.NewFromFloat(415), + }, + NumLiquidated: fixedpoint.Zero.Int(), + NumOrders: fixedpoint.NewFromFloat(13).Int(), + }, + { + PriceVolume: types.PriceVolume{ + Price: fixedpoint.NewFromFloat(8477), + Volume: fixedpoint.NewFromFloat(7), + }, + NumLiquidated: fixedpoint.Zero.Int(), + NumOrders: fixedpoint.NewFromFloat(2).Int(), + }, + } + bids := PriceVolumeOrderSlice{ + { + PriceVolume: types.PriceVolume{ + Price: fixedpoint.NewFromFloat(8476), + Volume: fixedpoint.NewFromFloat(256), + }, + NumLiquidated: fixedpoint.Zero.Int(), + NumOrders: fixedpoint.NewFromFloat(12).Int(), + }, + } + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.(*BookEvent) + assert.True(t, ok) + assert.Equal(t, "BTCUSDT", event.Symbol) + assert.Equal(t, ChannelBooks, event.channel) + assert.Equal(t, ActionTypeSnapshot, event.Action) + assert.Len(t, event.Data, 1) + assert.Len(t, event.Data[0].Asks, 2) + assert.Equal(t, asks, event.Data[0].Asks) + assert.Len(t, event.Data[0].Bids, 1) + assert.Equal(t, bids, event.Data[0].Bids) + }) + + t.Run("unexpected asks", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "books", + "instId": "BTC-USDT" + }, + "action": "snapshot", + "data": [ + { + "asks": [ + ["XYZ", "415", "0", "13"] + ], + "bids": [ + ["8476", "256", "0", "12"] + ], + "ts": "1597026383085", + "checksum": -855196043, + "prevSeqId": -1, + "seqId": 123456 + } + ] +} +` + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "price volume order") + }) +} + +func TestBookEvent_BookTicker(t *testing.T) { + in := ` +{ + "arg": { + "channel": "books", + "instId": "BTC-USDT" + }, + "action": "snapshot", + "data": [ + { + "asks": [ + ["8476.98", "415", "0", "13"], + ["8477", "7", "0", "2"] + ], + "bids": [ + ["8476", "256", "0", "12"] + ], + "ts": "1597026383085", + "checksum": -855196043, + "prevSeqId": -1, + "seqId": 123456 + } + ] +} +` + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.(*BookEvent) + assert.True(t, ok) + + ticker := event.BookTicker() + assert.Equal(t, types.BookTicker{ + Symbol: "BTCUSDT", + Buy: fixedpoint.NewFromFloat(8476), + BuySize: fixedpoint.NewFromFloat(256), + Sell: fixedpoint.NewFromFloat(8476.98), + SellSize: fixedpoint.NewFromFloat(415), + }, ticker) +} + +func TestBookEvent_Book(t *testing.T) { + in := ` +{ + "arg": { + "channel": "books", + "instId": "BTC-USDT" + }, + "action": "snapshot", + "data": [ + { + "asks": [ + ["8476.98", "415", "0", "13"], + ["8477", "7", "0", "2"] + ], + "bids": [ + ["8476", "256", "0", "12"] + ], + "ts": "1597026383085", + "checksum": -855196043, + "prevSeqId": -1, + "seqId": 123456 + } + ] +} +` + bids := types.PriceVolumeSlice{ + { + Price: fixedpoint.NewFromFloat(8476), + Volume: fixedpoint.NewFromFloat(256), + }, + } + asks := types.PriceVolumeSlice{ + { + Price: fixedpoint.NewFromFloat(8476.98), + Volume: fixedpoint.NewFromFloat(415), + }, + { + Price: fixedpoint.NewFromFloat(8477), + Volume: fixedpoint.NewFromFloat(7), + }, + } + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.(*BookEvent) + assert.True(t, ok) + + book := event.Book() + assert.Equal(t, types.SliceOrderBook{ + Symbol: "BTCUSDT", + Time: types.NewMillisecondTimestampFromInt(1597026383085).Time(), + Bids: bids, + Asks: asks, + }, book) +} + +func Test_parseKLineSliceJSON(t *testing.T) { + t.Run("snapshot", func(t *testing.T) { + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + exp := &KLineEvent{ + Events: KLineSlice{ + { + StartTime: types.NewMillisecondTimestampFromInt(1597026383085), + OpenPrice: fixedpoint.NewFromFloat(8533), + HighestPrice: fixedpoint.NewFromFloat(8553.74), + LowestPrice: fixedpoint.NewFromFloat(8527.17), + ClosePrice: fixedpoint.NewFromFloat(8548.26), + Volume: fixedpoint.NewFromFloat(45247), + VolumeCcy: fixedpoint.NewFromFloat(529.5858061), + VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061), + Confirm: fixedpoint.Zero, + }, + }, + InstrumentID: "BTC-USDT", + Symbol: "BTCUSDT", + Interval: "1d", + Channel: "candle1D", + } + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.(*KLineEvent) + assert.True(t, ok) + assert.Len(t, event.Events, 1) + assert.Equal(t, exp, event) + }) + + t.Run("failed to convert timestamp", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "x", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "timestamp") + }) + + t.Run("failed to convert open price", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "x", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "open price") + }) + + t.Run("failed to convert highest price", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "x", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "highest price") + }) + t.Run("failed to convert lowest price", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "x", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "lowest price") + }) + t.Run("failed to convert close price", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "x", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "close price") + }) + t.Run("failed to convert volume", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "x", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "volume") + }) + t.Run("failed to convert volume currency", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "x", + "529.5858061", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "volume currency") + }) + t.Run("failed to convert trading currency quote ", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "x", + "0" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "trading currency") + }) + t.Run("failed to convert confirm", func(t *testing.T) { + t.Skip("this will cause panic, so i skip it") + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "g" + ] + ] +} +` + + _, err := parseWebSocketEvent([]byte(in)) + assert.ErrorContains(t, err, "confirm") + }) + +} + +func TestKLine_ToGlobal(t *testing.T) { + t.Run("snapshot", func(t *testing.T) { + in := ` +{ + "arg": { + "channel": "candle1D", + "instId": "BTC-USDT" + }, + "data": [ + [ + "1597026383085", + "8533", + "8553.74", + "8527.17", + "8548.26", + "45247", + "529.5858061", + "529.5858061", + "0" + ] + ] +} +` + exp := &KLineEvent{ + Events: KLineSlice{ + { + StartTime: types.NewMillisecondTimestampFromInt(1597026383085), + OpenPrice: fixedpoint.NewFromFloat(8533), + HighestPrice: fixedpoint.NewFromFloat(8553.74), + LowestPrice: fixedpoint.NewFromFloat(8527.17), + ClosePrice: fixedpoint.NewFromFloat(8548.26), + Volume: fixedpoint.NewFromFloat(45247), + VolumeCcy: fixedpoint.NewFromFloat(529.5858061), + VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061), + Confirm: fixedpoint.Zero, + }, + }, + InstrumentID: "BTC-USDT", + Symbol: "BTCUSDT", + Interval: "1d", + Channel: "candle1D", + } + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.(*KLineEvent) + assert.True(t, ok) + + assert.Equal(t, types.KLine{ + Exchange: types.ExchangeOKEx, + Symbol: "BTCUSDT", + StartTime: types.Time(types.NewMillisecondTimestampFromInt(1597026383085)), + EndTime: types.Time(types.NewMillisecondTimestampFromInt(1597026383085).Time().Add(types.Interval(exp.Interval).Duration() - time.Millisecond)), + Interval: types.Interval(exp.Interval), + Open: exp.Events[0].OpenPrice, + Close: exp.Events[0].ClosePrice, + High: exp.Events[0].HighestPrice, + Low: exp.Events[0].LowestPrice, + Volume: exp.Events[0].Volume, + QuoteVolume: exp.Events[0].VolumeCcy, + TakerBuyBaseAssetVolume: fixedpoint.Zero, + TakerBuyQuoteAssetVolume: fixedpoint.Zero, + LastTradeID: 0, + NumberOfTrades: 0, + Closed: false, + }, event.Events[0].ToGlobal(types.Interval(event.Interval), event.Symbol)) + }) + +} diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 7d7c7a77e..9d45ae3b2 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -28,32 +28,24 @@ type Stream struct { client *okexapi.RestClient // public callbacks - candleEventCallbacks []func(candle Candle) + kLineEventCallbacks []func(candle KLineEvent) bookEventCallbacks []func(book BookEvent) eventCallbacks []func(event WebSocketEvent) accountEventCallbacks []func(account okexapi.Account) orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails) - - lastCandle map[CandleKey]Candle -} - -type CandleKey struct { - InstrumentID string - Channel string } func NewStream(client *okexapi.RestClient) *Stream { stream := &Stream{ client: client, StandardStream: types.NewStandardStream(), - lastCandle: make(map[CandleKey]Candle), } stream.SetParser(parseWebSocketEvent) stream.SetDispatcher(stream.dispatchEvent) stream.SetEndpointCreator(stream.createEndpoint) - stream.OnCandleEvent(stream.handleCandleEvent) + stream.OnKLineEvent(stream.handleKLineEvent) stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountEvent(stream.handleAccountEvent) stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent) @@ -167,27 +159,22 @@ func (s *Stream) handleAccountEvent(account okexapi.Account) { func (s *Stream) handleBookEvent(data BookEvent) { book := data.Book() switch data.Action { - case "snapshot": + case ActionTypeSnapshot: s.EmitBookSnapshot(book) - case "update": + case ActionTypeUpdate: s.EmitBookUpdate(book) } } -func (s *Stream) handleCandleEvent(candle Candle) { - key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID} - kline := candle.KLine() - - // check if we need to close previous kline - lastCandle, ok := s.lastCandle[key] - if ok && candle.StartTime.After(lastCandle.StartTime) { - lastKline := lastCandle.KLine() - lastKline.Closed = true - s.EmitKLineClosed(lastKline) +func (s *Stream) handleKLineEvent(k KLineEvent) { + for _, event := range k.Events { + kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol) + if kline.Closed { + s.EmitKLineClosed(kline) + } else { + s.EmitKLine(kline) + } } - - s.EmitKLine(kline) - s.lastCandle[key] = candle } func (s *Stream) createEndpoint(ctx context.Context) (string, error) { @@ -207,12 +194,12 @@ func (s *Stream) dispatchEvent(e interface{}) { case *BookEvent: // there's "books" for 400 depth and books5 for 5 depth - if et.channel != "books5" { + if et.channel != ChannelBook5 { s.EmitBookEvent(*et) } s.EmitBookTickerUpdate(et.BookTicker()) - case *Candle: - s.EmitCandleEvent(*et) + case *KLineEvent: + s.EmitKLineEvent(*et) case *okexapi.Account: s.EmitAccountEvent(*et) diff --git a/pkg/exchange/okex/stream_callbacks.go b/pkg/exchange/okex/stream_callbacks.go index 6fb6e7231..750614b7c 100644 --- a/pkg/exchange/okex/stream_callbacks.go +++ b/pkg/exchange/okex/stream_callbacks.go @@ -6,12 +6,12 @@ import ( "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" ) -func (s *Stream) OnCandleEvent(cb func(candle Candle)) { - s.candleEventCallbacks = append(s.candleEventCallbacks, cb) +func (s *Stream) OnKLineEvent(cb func(candle KLineEvent)) { + s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb) } -func (s *Stream) EmitCandleEvent(candle Candle) { - for _, cb := range s.candleEventCallbacks { +func (s *Stream) EmitKLineEvent(candle KLineEvent) { + for _, cb := range s.kLineEventCallbacks { cb(candle) } } @@ -57,7 +57,7 @@ func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { } type StreamEventHub interface { - OnCandleEvent(cb func(candle Candle)) + OnKLineEvent(cb func(candle KLineEvent)) OnBookEvent(cb func(book BookEvent)) diff --git a/pkg/exchange/okex/stream_test.go b/pkg/exchange/okex/stream_test.go index 596b4b978..1cc4e5e5d 100644 --- a/pkg/exchange/okex/stream_test.go +++ b/pkg/exchange/okex/stream_test.go @@ -48,4 +48,21 @@ func TestStream(t *testing.T) { c := make(chan struct{}) <-c }) + t.Run("kline test", func(t *testing.T) { + s.Subscribe(types.KLineChannel, "LTC-USD-200327", types.SubscribeOptions{ + Interval: types.Interval1m, + }) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnKLine(func(kline types.KLine) { + t.Log("got update", kline) + }) + s.OnKLineClosed(func(kline types.KLine) { + t.Log("got closed", kline) + }) + c := make(chan struct{}) + <-c + }) }