From e9d0ce5bbfe505286899e595dac3d34aff031ad2 Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 9 Aug 2023 11:41:04 +0800 Subject: [PATCH 1/2] pkg/exchage: support k line rest api --- pkg/exchange/bybit/bybitapi/client_test.go | 11 + .../bybit/bybitapi/get_k_lines_request.go | 107 ++++++++ .../get_k_lines_request_requestgen.go | 237 ++++++++++++++++++ .../bybitapi/get_k_lines_request_test.go | 175 +++++++++++++ pkg/exchange/bybit/bybitapi/types.go | 36 +++ pkg/exchange/bybit/bybitapi/types_test.go | 41 +++ pkg/exchange/bybit/convert.go | 45 ++++ pkg/exchange/bybit/convert_test.go | 85 +++++++ pkg/exchange/bybit/exchange.go | 70 +++++- 9 files changed, 806 insertions(+), 1 deletion(-) create mode 100644 pkg/exchange/bybit/bybitapi/get_k_lines_request.go create mode 100644 pkg/exchange/bybit/bybitapi/get_k_lines_request_requestgen.go create mode 100644 pkg/exchange/bybit/bybitapi/get_k_lines_request_test.go create mode 100644 pkg/exchange/bybit/bybitapi/types_test.go diff --git a/pkg/exchange/bybit/bybitapi/client_test.go b/pkg/exchange/bybit/bybitapi/client_test.go index cf6a61215..8156dd2c5 100644 --- a/pkg/exchange/bybit/bybitapi/client_test.go +++ b/pkg/exchange/bybit/bybitapi/client_test.go @@ -5,6 +5,7 @@ import ( "os" "strconv" "testing" + "time" "github.com/google/uuid" "github.com/stretchr/testify/assert" @@ -162,4 +163,14 @@ func TestClient(t *testing.T) { assert.NoError(t, err) t.Logf("apiResp: %+v", apiResp) }) + + t.Run("GetKLinesRequest", func(t *testing.T) { + startTime := time.Date(2023, 8, 8, 9, 28, 0, 0, time.UTC) + endTime := time.Date(2023, 8, 8, 9, 45, 0, 0, time.UTC) + req := client.NewGetKLinesRequest(). + Symbol("BTCUSDT").Interval("15").StartTime(startTime).EndTime(endTime) + apiResp, err := req.Do(ctx) + assert.NoError(t, err) + t.Logf("apiResp: %+v", apiResp.List) + }) } diff --git a/pkg/exchange/bybit/bybitapi/get_k_lines_request.go b/pkg/exchange/bybit/bybitapi/get_k_lines_request.go new file mode 100644 index 000000000..de04b72d3 --- /dev/null +++ b/pkg/exchange/bybit/bybitapi/get_k_lines_request.go @@ -0,0 +1,107 @@ +package bybitapi + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/c9s/requestgen" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +//go:generate -command GetRequest requestgen -method GET -responseType .APIResponse -responseDataField Result +//go:generate -command PostRequest requestgen -method POST -responseType .APIResponse -responseDataField Result + +type IntervalSign string + +const ( + IntervalSignDay IntervalSign = "D" + IntervalSignWeek IntervalSign = "W" + IntervalSignMonth IntervalSign = "M" +) + +type KLinesResponse struct { + Symbol string `json:"symbol"` + // An string array of individual candle + // Sort in reverse by startTime + List []KLine `json:"list"` + Category Category `json:"category"` +} + +type KLine struct { + // list[0]: startTime, Start time of the candle (ms) + StartTime types.MillisecondTimestamp + // list[1]: openPrice + Open fixedpoint.Value + // list[2]: highPrice + High fixedpoint.Value + // list[3]: lowPrice + Low fixedpoint.Value + // list[4]: closePrice + Close fixedpoint.Value + // list[5]: volume, Trade volume. Unit of contract: pieces of contract. Unit of spot: quantity of coins + Volume fixedpoint.Value + // list[6]: turnover, Turnover. Unit of figure: quantity of quota coin + TurnOver fixedpoint.Value +} + +const KLinesArrayLen = 7 + +func (k *KLine) UnmarshalJSON(data []byte) error { + var jsonArr []json.RawMessage + err := json.Unmarshal(data, &jsonArr) + if err != nil { + return fmt.Errorf("failed to unmarshal jsonRawMessage: %v, err: %w", string(data), err) + } + if len(jsonArr) != KLinesArrayLen { + return fmt.Errorf("unexpected K Lines array length: %d, exp: %d", len(jsonArr), KLinesArrayLen) + } + + err = json.Unmarshal(jsonArr[0], &k.StartTime) + if err != nil { + return fmt.Errorf("failed to unmarshal resp index 0: %v, err: %w", string(jsonArr[0]), err) + } + + values := make([]fixedpoint.Value, len(jsonArr)-1) + for i, jsonRaw := range jsonArr[1:] { + err = json.Unmarshal(jsonRaw, &values[i]) + if err != nil { + return fmt.Errorf("failed to unmarshal resp index %d: %v, err: %w", i+1, string(jsonRaw), err) + } + } + k.Open = values[0] + k.High = values[1] + k.Low = values[2] + k.Close = values[3] + k.Volume = values[4] + k.TurnOver = values[5] + + return nil +} + +//go:generate GetRequest -url "/v5/market/kline" -type GetKLinesRequest -responseDataType .KLinesResponse +type GetKLinesRequest struct { + client requestgen.APIClient + + category Category `param:"category,query" validValues:"spot"` + symbol string `param:"symbol,query"` + // Kline interval. + // - 1,3,5,15,30,60,120,240,360,720: minute + // - D: day + // - M: month + // - W: week + interval string `param:"interval,query" validValues:"1,3,5,15,30,60,120,240,360,720,D,W,M"` + startTime *time.Time `param:"start,query,milliseconds"` + endTime *time.Time `param:"end,query,milliseconds"` + // Limit for data size per page. [1, 1000]. Default: 200 + limit *uint64 `param:"limit,query"` +} + +func (c *RestClient) NewGetKLinesRequest() *GetKLinesRequest { + return &GetKLinesRequest{ + client: c, + category: CategorySpot, + } +} diff --git a/pkg/exchange/bybit/bybitapi/get_k_lines_request_requestgen.go b/pkg/exchange/bybit/bybitapi/get_k_lines_request_requestgen.go new file mode 100644 index 000000000..d6b3d06c3 --- /dev/null +++ b/pkg/exchange/bybit/bybitapi/get_k_lines_request_requestgen.go @@ -0,0 +1,237 @@ +// Code generated by "requestgen -method GET -responseType .APIResponse -responseDataField Result -url /v5/market/kline -type GetKLinesRequest -responseDataType .KLinesResponse"; DO NOT EDIT. + +package bybitapi + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "reflect" + "regexp" + "strconv" + "time" +) + +func (g *GetKLinesRequest) Category(category Category) *GetKLinesRequest { + g.category = category + return g +} + +func (g *GetKLinesRequest) Symbol(symbol string) *GetKLinesRequest { + g.symbol = symbol + return g +} + +func (g *GetKLinesRequest) Interval(interval string) *GetKLinesRequest { + g.interval = interval + return g +} + +func (g *GetKLinesRequest) StartTime(startTime time.Time) *GetKLinesRequest { + g.startTime = &startTime + return g +} + +func (g *GetKLinesRequest) EndTime(endTime time.Time) *GetKLinesRequest { + g.endTime = &endTime + return g +} + +func (g *GetKLinesRequest) Limit(limit uint64) *GetKLinesRequest { + g.limit = &limit + return g +} + +// GetQueryParameters builds and checks the query parameters and returns url.Values +func (g *GetKLinesRequest) GetQueryParameters() (url.Values, error) { + var params = map[string]interface{}{} + // check category field -> json key category + category := g.category + + // TEMPLATE check-valid-values + switch category { + case "spot": + params["category"] = category + + default: + return nil, fmt.Errorf("category value %v is invalid", category) + + } + // END TEMPLATE check-valid-values + + // assign parameter of category + params["category"] = category + // check symbol field -> json key symbol + symbol := g.symbol + + // assign parameter of symbol + params["symbol"] = symbol + // check interval field -> json key interval + interval := g.interval + + // TEMPLATE check-valid-values + switch interval { + case "1", "3", "5", "15", "30", "60", "120", "240", "360", "720", "D", "W", "M": + params["interval"] = interval + + default: + return nil, fmt.Errorf("interval value %v is invalid", interval) + + } + // END TEMPLATE check-valid-values + + // assign parameter of interval + params["interval"] = interval + // check startTime field -> json key start + if g.startTime != nil { + startTime := *g.startTime + + // assign parameter of startTime + // convert time.Time to milliseconds time stamp + params["start"] = strconv.FormatInt(startTime.UnixNano()/int64(time.Millisecond), 10) + } else { + } + // check endTime field -> json key end + if g.endTime != nil { + endTime := *g.endTime + + // assign parameter of endTime + // convert time.Time to milliseconds time stamp + params["end"] = strconv.FormatInt(endTime.UnixNano()/int64(time.Millisecond), 10) + } else { + } + // check limit field -> json key limit + if g.limit != nil { + limit := *g.limit + + // assign parameter of limit + params["limit"] = limit + } else { + } + + query := url.Values{} + for _k, _v := range params { + query.Add(_k, fmt.Sprintf("%v", _v)) + } + + return query, nil +} + +// GetParameters builds and checks the parameters and return the result in a map object +func (g *GetKLinesRequest) GetParameters() (map[string]interface{}, error) { + var params = map[string]interface{}{} + + return params, nil +} + +// GetParametersQuery converts the parameters from GetParameters into the url.Values format +func (g *GetKLinesRequest) GetParametersQuery() (url.Values, error) { + query := url.Values{} + + params, err := g.GetParameters() + if err != nil { + return query, err + } + + for _k, _v := range params { + if g.isVarSlice(_v) { + g.iterateSlice(_v, func(it interface{}) { + query.Add(_k+"[]", fmt.Sprintf("%v", it)) + }) + } else { + query.Add(_k, fmt.Sprintf("%v", _v)) + } + } + + return query, nil +} + +// GetParametersJSON converts the parameters from GetParameters into the JSON format +func (g *GetKLinesRequest) GetParametersJSON() ([]byte, error) { + params, err := g.GetParameters() + if err != nil { + return nil, err + } + + return json.Marshal(params) +} + +// GetSlugParameters builds and checks the slug parameters and return the result in a map object +func (g *GetKLinesRequest) GetSlugParameters() (map[string]interface{}, error) { + var params = map[string]interface{}{} + + return params, nil +} + +func (g *GetKLinesRequest) applySlugsToUrl(url string, slugs map[string]string) string { + for _k, _v := range slugs { + needleRE := regexp.MustCompile(":" + _k + "\\b") + url = needleRE.ReplaceAllString(url, _v) + } + + return url +} + +func (g *GetKLinesRequest) iterateSlice(slice interface{}, _f func(it interface{})) { + sliceValue := reflect.ValueOf(slice) + for _i := 0; _i < sliceValue.Len(); _i++ { + it := sliceValue.Index(_i).Interface() + _f(it) + } +} + +func (g *GetKLinesRequest) isVarSlice(_v interface{}) bool { + rt := reflect.TypeOf(_v) + switch rt.Kind() { + case reflect.Slice: + return true + } + return false +} + +func (g *GetKLinesRequest) GetSlugsMap() (map[string]string, error) { + slugs := map[string]string{} + params, err := g.GetSlugParameters() + if err != nil { + return slugs, nil + } + + for _k, _v := range params { + slugs[_k] = fmt.Sprintf("%v", _v) + } + + return slugs, nil +} + +func (g *GetKLinesRequest) Do(ctx context.Context) (*KLinesResponse, error) { + + // no body params + var params interface{} + query, err := g.GetQueryParameters() + if err != nil { + return nil, err + } + + apiURL := "/v5/market/kline" + + req, err := g.client.NewRequest(ctx, "GET", apiURL, query, params) + if err != nil { + return nil, err + } + + response, err := g.client.SendRequest(req) + if err != nil { + return nil, err + } + + var apiResponse APIResponse + if err := response.DecodeJSON(&apiResponse); err != nil { + return nil, err + } + var data KLinesResponse + if err := json.Unmarshal(apiResponse.Result, &data); err != nil { + return nil, err + } + return &data, nil +} diff --git a/pkg/exchange/bybit/bybitapi/get_k_lines_request_test.go b/pkg/exchange/bybit/bybitapi/get_k_lines_request_test.go new file mode 100644 index 000000000..05a739c58 --- /dev/null +++ b/pkg/exchange/bybit/bybitapi/get_k_lines_request_test.go @@ -0,0 +1,175 @@ +package bybitapi + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +func TestKLinesResponse_UnmarshalJSON(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + data := `{ + "symbol": "BTCUSDT", + "category": "spot", + "list": [ + [ + "1670608800000", + "17071", + "17073", + "17027", + "17055.5", + "268611", + "15.74462667" + ], + [ + "1670605200000", + "17071.5", + "17071.5", + "17061", + "17071", + "4177", + "0.24469757" + ] + ] + }` + + expRes := &KLinesResponse{ + Symbol: "BTCUSDT", + List: []KLine{ + { + StartTime: types.NewMillisecondTimestampFromInt(1670608800000), + Open: fixedpoint.NewFromFloat(17071), + High: fixedpoint.NewFromFloat(17073), + Low: fixedpoint.NewFromFloat(17027), + Close: fixedpoint.NewFromFloat(17055.5), + Volume: fixedpoint.NewFromFloat(268611), + TurnOver: fixedpoint.NewFromFloat(15.74462667), + }, + { + StartTime: types.NewMillisecondTimestampFromInt(1670605200000), + Open: fixedpoint.NewFromFloat(17071.5), + High: fixedpoint.NewFromFloat(17071.5), + Low: fixedpoint.NewFromFloat(17061), + Close: fixedpoint.NewFromFloat(17071), + Volume: fixedpoint.NewFromFloat(4177), + TurnOver: fixedpoint.NewFromFloat(0.24469757), + }, + }, + Category: CategorySpot, + } + + kline := &KLinesResponse{} + err := json.Unmarshal([]byte(data), kline) + assert.NoError(t, err) + assert.Equal(t, expRes, kline) + }) + + t.Run("unexpected length", func(t *testing.T) { + data := `{ + "symbol": "BTCUSDT", + "category": "spot", + "list": [ + [ + "1670608800000", + "17071", + "17073", + "17027", + "17055.5", + "268611" + ] + ] + }` + kline := &KLinesResponse{} + err := json.Unmarshal([]byte(data), kline) + assert.Equal(t, fmt.Errorf("unexpected K Lines array length: 6, exp: %d", KLinesArrayLen), err) + }) + + t.Run("unexpected json array", func(t *testing.T) { + klineJson := `{}` + + data := fmt.Sprintf(`{ + "symbol": "BTCUSDT", + "category": "spot", + "list": [%s] + }`, klineJson) + + var jsonArr []json.RawMessage + expErr := json.Unmarshal([]byte(klineJson), &jsonArr) + assert.Error(t, expErr) + + kline := &KLinesResponse{} + err := json.Unmarshal([]byte(data), kline) + assert.Equal(t, fmt.Errorf("failed to unmarshal jsonRawMessage: %v, err: %w", klineJson, expErr), err) + }) + + t.Run("unexpected json 0", func(t *testing.T) { + klineJson := ` + [ + "a", + "17071.5", + "17071.5", + "17061", + "17071", + "4177", + "0.24469757" + ] + ` + + data := fmt.Sprintf(`{ + "symbol": "BTCUSDT", + "category": "spot", + "list": [%s] + }`, klineJson) + + var jsonArr []json.RawMessage + err := json.Unmarshal([]byte(klineJson), &jsonArr) + assert.NoError(t, err) + + timestamp := types.MillisecondTimestamp{} + expErr := json.Unmarshal(jsonArr[0], ×tamp) + assert.NoError(t, err) + + kline := &KLinesResponse{} + err = json.Unmarshal([]byte(data), kline) + assert.Equal(t, fmt.Errorf("failed to unmarshal resp index 0: %v, err: %w", string(jsonArr[0]), expErr), err) + }) + + t.Run("unexpected json 1", func(t *testing.T) { + // TODO: fix panic + t.Skip("test will result in a panic, skip it") + klineJson := ` + [ + "1670608800000", + "a", + "17071.5", + "17061", + "17071", + "4177", + "0.24469757" + ] + ` + + data := fmt.Sprintf(`{ + "symbol": "BTCUSDT", + "category": "spot", + "list": [%s] + }`, klineJson) + + var jsonArr []json.RawMessage + err := json.Unmarshal([]byte(klineJson), &jsonArr) + assert.NoError(t, err) + + var value fixedpoint.Value + expErr := json.Unmarshal(jsonArr[1], &value) + assert.NoError(t, err) + + kline := &KLinesResponse{} + err = json.Unmarshal([]byte(data), kline) + assert.Equal(t, fmt.Errorf("failed to unmarshal resp index 1: %v, err: %w", string(jsonArr[1]), expErr), err) + }) +} diff --git a/pkg/exchange/bybit/bybitapi/types.go b/pkg/exchange/bybit/bybitapi/types.go index d54e54d63..509b51ded 100644 --- a/pkg/exchange/bybit/bybitapi/types.go +++ b/pkg/exchange/bybit/bybitapi/types.go @@ -1,5 +1,41 @@ package bybitapi +import "github.com/c9s/bbgo/pkg/types" + +var ( + SupportedIntervals = map[types.Interval]int{ + types.Interval1m: 1 * 60, + types.Interval3m: 3 * 60, + types.Interval5m: 5 * 60, + types.Interval15m: 15 * 60, + types.Interval30m: 30 * 60, + types.Interval1h: 60 * 60, + types.Interval2h: 60 * 60 * 2, + types.Interval4h: 60 * 60 * 4, + types.Interval6h: 60 * 60 * 6, + types.Interval12h: 60 * 60 * 12, + types.Interval1d: 60 * 60 * 24, + types.Interval1w: 60 * 60 * 24 * 7, + types.Interval1mo: 60 * 60 * 24 * 30, + } + + ToGlobalInterval = map[string]types.Interval{ + "1": types.Interval1m, + "3": types.Interval3m, + "5": types.Interval5m, + "15": types.Interval15m, + "30": types.Interval30m, + "60": types.Interval1h, + "120": types.Interval2h, + "240": types.Interval4h, + "360": types.Interval6h, + "720": types.Interval12h, + "D": types.Interval1d, + "W": types.Interval1w, + "M": types.Interval1mo, + } +) + type Category string const ( diff --git a/pkg/exchange/bybit/bybitapi/types_test.go b/pkg/exchange/bybit/bybitapi/types_test.go new file mode 100644 index 000000000..ba39b3d84 --- /dev/null +++ b/pkg/exchange/bybit/bybitapi/types_test.go @@ -0,0 +1,41 @@ +package bybitapi + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/types" +) + +func Test_SupportedIntervals(t *testing.T) { + assert.Equal(t, SupportedIntervals[types.Interval1m], 60) + assert.Equal(t, SupportedIntervals[types.Interval3m], 180) + assert.Equal(t, SupportedIntervals[types.Interval5m], 300) + assert.Equal(t, SupportedIntervals[types.Interval15m], 15*60) + assert.Equal(t, SupportedIntervals[types.Interval30m], 30*60) + assert.Equal(t, SupportedIntervals[types.Interval1h], 60*60) + assert.Equal(t, SupportedIntervals[types.Interval2h], 60*60*2) + assert.Equal(t, SupportedIntervals[types.Interval4h], 60*60*4) + assert.Equal(t, SupportedIntervals[types.Interval6h], 60*60*6) + assert.Equal(t, SupportedIntervals[types.Interval12h], 60*60*12) + assert.Equal(t, SupportedIntervals[types.Interval1d], 60*60*24) + assert.Equal(t, SupportedIntervals[types.Interval1w], 60*60*24*7) + assert.Equal(t, SupportedIntervals[types.Interval1mo], 60*60*24*30) +} + +func Test_ToGlobalInterval(t *testing.T) { + assert.Equal(t, ToGlobalInterval["1"], types.Interval1m) + assert.Equal(t, ToGlobalInterval["3"], types.Interval3m) + assert.Equal(t, ToGlobalInterval["5"], types.Interval5m) + assert.Equal(t, ToGlobalInterval["15"], types.Interval15m) + assert.Equal(t, ToGlobalInterval["30"], types.Interval30m) + assert.Equal(t, ToGlobalInterval["60"], types.Interval1h) + assert.Equal(t, ToGlobalInterval["120"], types.Interval2h) + assert.Equal(t, ToGlobalInterval["240"], types.Interval4h) + assert.Equal(t, ToGlobalInterval["360"], types.Interval6h) + assert.Equal(t, ToGlobalInterval["720"], types.Interval12h) + assert.Equal(t, ToGlobalInterval["D"], types.Interval1d) + assert.Equal(t, ToGlobalInterval["W"], types.Interval1w) + assert.Equal(t, ToGlobalInterval["M"], types.Interval1mo) +} diff --git a/pkg/exchange/bybit/convert.go b/pkg/exchange/bybit/convert.go index 9f665ecec..19043ad23 100644 --- a/pkg/exchange/bybit/convert.go +++ b/pkg/exchange/bybit/convert.go @@ -283,3 +283,48 @@ func toGlobalBalanceMap(events []bybitapi.WalletBalances) types.BalanceMap { } return bm } + +func toLocalInterval(interval types.Interval) (string, error) { + if _, found := bybitapi.SupportedIntervals[interval]; !found { + return "", fmt.Errorf("interval not supported: %s", interval) + } + + switch interval { + + case types.Interval1d: + return string(bybitapi.IntervalSignDay), nil + + case types.Interval1w: + return string(bybitapi.IntervalSignWeek), nil + + case types.Interval1mo: + return string(bybitapi.IntervalSignMonth), nil + + default: + return fmt.Sprintf("%d", interval.Minutes()), nil + + } +} + +func toGlobalKLines(symbol string, interval types.Interval, klines []bybitapi.KLine) []types.KLine { + gKLines := make([]types.KLine, len(klines)) + for i, kline := range klines { + endTime := types.Time(kline.StartTime.Time().Add(interval.Duration())) + gKLines[i] = types.KLine{ + Exchange: types.ExchangeBybit, + Symbol: symbol, + StartTime: types.Time(kline.StartTime), + EndTime: endTime, + Interval: interval, + Open: kline.Open, + Close: kline.Close, + High: kline.High, + Low: kline.Low, + Volume: kline.Volume, + QuoteVolume: kline.TurnOver, + // Bybit doesn't support close flag in REST API + Closed: false, + } + } + return gKLines +} diff --git a/pkg/exchange/bybit/convert_test.go b/pkg/exchange/bybit/convert_test.go index e6eb5b049..d2dab1632 100644 --- a/pkg/exchange/bybit/convert_test.go +++ b/pkg/exchange/bybit/convert_test.go @@ -459,3 +459,88 @@ func Test_toGlobalTrade(t *testing.T) { assert.NoError(t, err) assert.Equal(t, res, &exp) } + +func Test_toGlobalKLines(t *testing.T) { + symbol := "BTCUSDT" + interval := types.Interval15m + + resp := bybitapi.KLinesResponse{ + Symbol: symbol, + List: []bybitapi.KLine{ + /* + [ + { + "StartTime": "2023-08-08 17:30:00 +0800 CST", + "OpenPrice": 29045.3, + "HighPrice": 29228.56, + "LowPrice": 29045.3, + "ClosePrice": 29228.56, + "Volume": 9.265593, + "TurnOver": 270447.43520753 + }, + { + "StartTime": "2023-08-08 17:15:00 +0800 CST", + "OpenPrice": 29167.33, + "HighPrice": 29229.08, + "LowPrice": 29000, + "ClosePrice": 29045.3, + "Volume": 9.295508, + "TurnOver": 270816.87513775 + } + ] + */ + { + StartTime: types.NewMillisecondTimestampFromInt(1691486100000), + Open: fixedpoint.NewFromFloat(29045.3), + High: fixedpoint.NewFromFloat(29228.56), + Low: fixedpoint.NewFromFloat(29045.3), + Close: fixedpoint.NewFromFloat(29228.56), + Volume: fixedpoint.NewFromFloat(9.265593), + TurnOver: fixedpoint.NewFromFloat(270447.43520753), + }, + { + StartTime: types.NewMillisecondTimestampFromInt(1691487000000), + Open: fixedpoint.NewFromFloat(29167.33), + High: fixedpoint.NewFromFloat(29229.08), + Low: fixedpoint.NewFromFloat(29000), + Close: fixedpoint.NewFromFloat(29045.3), + Volume: fixedpoint.NewFromFloat(9.295508), + TurnOver: fixedpoint.NewFromFloat(270816.87513775), + }, + }, + Category: bybitapi.CategorySpot, + } + + expKlines := []types.KLine{ + { + Exchange: types.ExchangeBybit, + Symbol: resp.Symbol, + StartTime: types.Time(resp.List[0].StartTime.Time()), + EndTime: types.Time(resp.List[0].StartTime.Time().Add(interval.Duration())), + Interval: interval, + Open: fixedpoint.NewFromFloat(29045.3), + Close: fixedpoint.NewFromFloat(29228.56), + High: fixedpoint.NewFromFloat(29228.56), + Low: fixedpoint.NewFromFloat(29045.3), + Volume: fixedpoint.NewFromFloat(9.265593), + QuoteVolume: fixedpoint.NewFromFloat(270447.43520753), + Closed: false, + }, + { + Exchange: types.ExchangeBybit, + Symbol: resp.Symbol, + StartTime: types.Time(resp.List[1].StartTime.Time()), + EndTime: types.Time(resp.List[1].StartTime.Time().Add(interval.Duration())), + Interval: interval, + Open: fixedpoint.NewFromFloat(29167.33), + Close: fixedpoint.NewFromFloat(29045.3), + High: fixedpoint.NewFromFloat(29229.08), + Low: fixedpoint.NewFromFloat(29000), + Volume: fixedpoint.NewFromFloat(9.295508), + QuoteVolume: fixedpoint.NewFromFloat(270816.87513775), + Closed: false, + }, + } + + assert.Equal(t, toGlobalKLines(symbol, interval, resp.List), expKlines) +} diff --git a/pkg/exchange/bybit/exchange.go b/pkg/exchange/bybit/exchange.go index 6998f5407..962958589 100644 --- a/pkg/exchange/bybit/exchange.go +++ b/pkg/exchange/bybit/exchange.go @@ -19,6 +19,7 @@ import ( const ( maxOrderIdLen = 36 defaultQueryLimit = 50 + defaultKLineLimit = 1000 halfYearDuration = 6 * 30 * 24 * time.Hour ) @@ -38,7 +39,12 @@ var ( "exchange": "bybit", }) - _ types.ExchangeAccountService = &Exchange{} + _ types.ExchangeAccountService = &Exchange{} + _ types.ExchangeMarketDataService = &Exchange{} + _ types.CustomIntervalProvider = &Exchange{} + _ types.ExchangeMinimal = &Exchange{} + _ types.ExchangeTradeService = &Exchange{} + _ types.Exchange = &Exchange{} ) type Exchange struct { @@ -422,6 +428,68 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, return toGlobalBalanceMap(accounts.List), nil } + +/* +QueryKLines queries for historical klines (also known as candles/candlesticks). Charts are returned in groups based +on the requested interval. + +A k-line's start time is inclusive, but end time is not(startTime + interval - 1 millisecond). +e.q. 15m interval k line can be represented as 00:00:00.000 ~ 00:14:59.999 +*/ +func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { + req := e.client.NewGetKLinesRequest().Symbol(symbol) + intervalStr, err := toLocalInterval(interval) + if err != nil { + return nil, err + } + req.Interval(intervalStr) + + limit := uint64(options.Limit) + if limit > defaultKLineLimit || limit <= 0 { + log.Debugf("limtit is exceeded or zero, update to %d, got: %d", defaultKLineLimit, options.Limit) + limit = defaultKLineLimit + } + req.Limit(limit) + + if options.StartTime != nil { + req.StartTime(*options.StartTime) + } + + if options.EndTime != nil { + req.EndTime(*options.EndTime) + } + + if err := sharedRateLimiter.Wait(ctx); err != nil { + return nil, fmt.Errorf("query klines rate limiter wait error: %w", err) + } + + resp, err := req.Do(ctx) + if err != nil { + return nil, fmt.Errorf("failed to call k line, err: %w", err) + } + + if resp.Category != bybitapi.CategorySpot { + return nil, fmt.Errorf("unexpected category: %s", resp.Category) + } + + if resp.Symbol != symbol { + return nil, fmt.Errorf("unexpected symbol: %s, exp: %s", resp.Category, symbol) + } + + kLines := toGlobalKLines(symbol, interval, resp.List) + return types.SortKLinesAscending(kLines), nil + +} + +func (e *Exchange) SupportedInterval() map[types.Interval]int { + return bybitapi.SupportedIntervals +} + +func (e *Exchange) IsSupportedInterval(interval types.Interval) bool { + _, ok := bybitapi.SupportedIntervals[interval] + return ok +} + func (e *Exchange) NewStream() types.Stream { return NewStream(e.key, e.secret) } From 4cee22ce31aa96c2f513f831aa842afd9d5e466d Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 9 Aug 2023 11:41:16 +0800 Subject: [PATCH 2/2] pkg/exchage: support k line websocket event --- pkg/exchange/bybit/stream.go | 62 +++++++++++++-- pkg/exchange/bybit/stream_callbacks.go | 10 +++ pkg/exchange/bybit/stream_test.go | 103 +++++++++++++++++++++++-- pkg/exchange/bybit/types.go | 62 +++++++++++++++ pkg/exchange/bybit/types_test.go | 71 +++++++++++++++++ 5 files changed, 297 insertions(+), 11 deletions(-) diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index dcaa158fe..1fe90298d 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -34,6 +34,7 @@ type Stream struct { bookEventCallbacks []func(e BookEvent) walletEventCallbacks []func(e []bybitapi.WalletBalances) + kLineEventCallbacks []func(e KLineEvent) orderEventCallbacks []func(e []OrderEvent) } @@ -52,6 +53,7 @@ func NewStream(key, secret string) *Stream { stream.OnConnect(stream.handlerConnect) stream.OnBookEvent(stream.handleBookEvent) + stream.OnKLineEvent(stream.handleKLineEvent) stream.OnWalletEvent(stream.handleWalletEvent) stream.OnOrderEvent(stream.handleOrderEvent) return stream @@ -80,6 +82,9 @@ func (s *Stream) dispatchEvent(event interface{}) { case []bybitapi.WalletBalances: s.EmitWalletEvent(e) + case *KLineEvent: + s.EmitKLineEvent(*e) + case []OrderEvent: s.EmitOrderEvent(e) } @@ -99,6 +104,7 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { case e.IsTopic(): switch getTopicType(e.Topic) { + case TopicTypeOrderBook: var book BookEvent err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book) @@ -109,6 +115,20 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { book.Type = e.WebSocketTopicEvent.Type return &book, nil + case TopicTypeKLine: + var kLines []KLine + err = json.Unmarshal(e.WebSocketTopicEvent.Data, &kLines) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, : %w", string(e.WebSocketTopicEvent.Data), err) + } + + symbol, err := getSymbolFromTopic(e.Topic) + if err != nil { + return nil, err + } + + return &KLineEvent{KLines: kLines, Symbol: symbol, Type: e.WebSocketTopicEvent.Type}, nil + case TopicTypeWallet: var wallets []bybitapi.WalletBalances return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets) @@ -165,7 +185,7 @@ func (s *Stream) handlerConnect() { var topics []string for _, subscription := range s.Subscriptions { - topic, err := convertSubscription(subscription) + topic, err := s.convertSubscription(subscription) if err != nil { log.WithError(err).Errorf("subscription convert error") continue @@ -213,17 +233,27 @@ func (s *Stream) handlerConnect() { } } -func convertSubscription(s types.Subscription) (string, error) { - switch s.Channel { +func (s *Stream) convertSubscription(sub types.Subscription) (string, error) { + switch sub.Channel { + case types.BookChannel: depth := types.DepthLevel1 - if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 { + if len(sub.Options.Depth) > 0 && sub.Options.Depth == types.DepthLevel50 { depth = types.DepthLevel50 } - return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil + return genTopic(TopicTypeOrderBook, depth, sub.Symbol), nil + + case types.KLineChannel: + interval, err := toLocalInterval(sub.Options.Interval) + if err != nil { + return "", err + } + + return genTopic(TopicTypeKLine, interval, sub.Symbol), nil + } - return "", fmt.Errorf("unsupported stream channel: %s", s.Channel) + return "", fmt.Errorf("unsupported stream channel: %s", sub.Channel) } func (s *Stream) handleBookEvent(e BookEvent) { @@ -257,3 +287,23 @@ func (s *Stream) handleOrderEvent(events []OrderEvent) { s.StandardStream.EmitOrderUpdate(*gOrder) } } + +func (s *Stream) handleKLineEvent(klineEvent KLineEvent) { + if klineEvent.Type != DataTypeSnapshot { + return + } + + for _, event := range klineEvent.KLines { + kline, err := event.toGlobalKLine(klineEvent.Symbol) + if err != nil { + log.WithError(err).Error("failed to convert to global k line") + continue + } + + if kline.Closed { + s.EmitKLineClosed(kline) + } else { + s.EmitKLine(kline) + } + } +} diff --git a/pkg/exchange/bybit/stream_callbacks.go b/pkg/exchange/bybit/stream_callbacks.go index 2669dcfa7..0c7df8042 100644 --- a/pkg/exchange/bybit/stream_callbacks.go +++ b/pkg/exchange/bybit/stream_callbacks.go @@ -26,6 +26,16 @@ func (s *Stream) EmitWalletEvent(e []bybitapi.WalletBalances) { } } +func (s *Stream) OnKLineEvent(cb func(e KLineEvent)) { + s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb) +} + +func (s *Stream) EmitKLineEvent(e KLineEvent) { + for _, cb := range s.kLineEventCallbacks { + cb(e) + } +} + func (s *Stream) OnOrderEvent(cb func(e []OrderEvent)) { s.orderEventCallbacks = append(s.orderEventCallbacks, cb) } diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index 07ced5e43..1c6a3badf 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -2,6 +2,7 @@ package bybit import ( "context" + "errors" "fmt" "os" "strconv" @@ -76,6 +77,23 @@ func TestStream(t *testing.T) { c := make(chan struct{}) <-c }) + + t.Run("kline test", func(t *testing.T) { + s.Subscribe(types.KLineChannel, "BTCUSDT", types.SubscribeOptions{ + Interval: types.Interval30m, + Depth: "", + Speed: "", + }) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnKLine(func(kline types.KLine) { + t.Log(kline) + }) + c := make(chan struct{}) + <-c + }) } func TestStream_parseWebSocketEvent(t *testing.T) { @@ -151,6 +169,80 @@ func TestStream_parseWebSocketEvent(t *testing.T) { }, *book) }) + t.Run("TopicTypeKLine with snapshot", func(t *testing.T) { + input := `{ + "topic": "kline.5.BTCUSDT", + "data": [ + { + "start": 1672324800000, + "end": 1672325099999, + "interval": "5", + "open": "16649.5", + "close": "16677", + "high": "16677", + "low": "16608", + "volume": "2.081", + "turnover": "34666.4005", + "confirm": false, + "timestamp": 1672324988882 + } + ], + "ts": 1672324988882, + "type": "snapshot" +}` + + res, err := s.parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + book, ok := res.(*KLineEvent) + assert.True(t, ok) + assert.Equal(t, KLineEvent{ + Symbol: "BTCUSDT", + Type: DataTypeSnapshot, + KLines: []KLine{ + { + StartTime: types.NewMillisecondTimestampFromInt(1672324800000), + EndTime: types.NewMillisecondTimestampFromInt(1672325099999), + Interval: "5", + OpenPrice: fixedpoint.NewFromFloat(16649.5), + ClosePrice: fixedpoint.NewFromFloat(16677), + HighPrice: fixedpoint.NewFromFloat(16677), + LowPrice: fixedpoint.NewFromFloat(16608), + Volume: fixedpoint.NewFromFloat(2.081), + Turnover: fixedpoint.NewFromFloat(34666.4005), + Confirm: false, + Timestamp: types.NewMillisecondTimestampFromInt(1672324988882), + }, + }, + }, *book) + }) + + t.Run("TopicTypeKLine with invalid topic", func(t *testing.T) { + input := `{ + "topic": "kline.5", + "data": [ + { + "start": 1672324800000, + "end": 1672325099999, + "interval": "5", + "open": "16649.5", + "close": "16677", + "high": "16677", + "low": "16608", + "volume": "2.081", + "turnover": "34666.4005", + "confirm": false, + "timestamp": 1672324988882 + } + ], + "ts": 1672324988882, + "type": "snapshot" +}` + + res, err := s.parseWebSocketEvent([]byte(input)) + assert.Equal(t, errors.New("unexpected topic: kline.5"), err) + assert.Nil(t, res) + }) + t.Run("Parse fails", func(t *testing.T) { input := `{ "topic":"orderbook.50.BTCUSDT", @@ -168,8 +260,9 @@ func TestStream_parseWebSocketEvent(t *testing.T) { } func Test_convertSubscription(t *testing.T) { + s := Stream{} t.Run("BookChannel.DepthLevel1", func(t *testing.T) { - res, err := convertSubscription(types.Subscription{ + res, err := s.convertSubscription(types.Subscription{ Symbol: "BTCUSDT", Channel: types.BookChannel, Options: types.SubscribeOptions{ @@ -180,7 +273,7 @@ func Test_convertSubscription(t *testing.T) { assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res) }) t.Run("BookChannel. with default depth", func(t *testing.T) { - res, err := convertSubscription(types.Subscription{ + res, err := s.convertSubscription(types.Subscription{ Symbol: "BTCUSDT", Channel: types.BookChannel, }) @@ -188,7 +281,7 @@ func Test_convertSubscription(t *testing.T) { assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res) }) t.Run("BookChannel.DepthLevel50", func(t *testing.T) { - res, err := convertSubscription(types.Subscription{ + res, err := s.convertSubscription(types.Subscription{ Symbol: "BTCUSDT", Channel: types.BookChannel, Options: types.SubscribeOptions{ @@ -199,7 +292,7 @@ func Test_convertSubscription(t *testing.T) { assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res) }) t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) { - res, err := convertSubscription(types.Subscription{ + res, err := s.convertSubscription(types.Subscription{ Symbol: "BTCUSDT", Channel: types.BookChannel, Options: types.SubscribeOptions{ @@ -211,7 +304,7 @@ func Test_convertSubscription(t *testing.T) { }) t.Run("unsupported channel", func(t *testing.T) { - res, err := convertSubscription(types.Subscription{ + res, err := s.convertSubscription(types.Subscription{ Symbol: "BTCUSDT", Channel: "unsupported", }) diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 8046630b8..d817c8e7b 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -82,6 +82,7 @@ const ( TopicTypeOrderBook TopicType = "orderbook" TopicTypeWallet TopicType = "wallet" TopicTypeOrder TopicType = "order" + TopicTypeKLine TopicType = "kline" ) type DataType string @@ -143,8 +144,69 @@ func getTopicType(topic string) TopicType { return TopicType(slice[0]) } +func getSymbolFromTopic(topic string) (string, error) { + slice := strings.Split(topic, topicSeparator) + if len(slice) != 3 { + return "", fmt.Errorf("unexpected topic: %s", topic) + } + return slice[2], nil +} + type OrderEvent struct { bybitapi.Order Category bybitapi.Category `json:"category"` } + +type KLineEvent struct { + KLines []KLine + + // internal use + // Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type + Type DataType + // Symbol. Copied from WebSocketTopicEvent.Topic + Symbol string +} + +type KLine struct { + // The start timestamp (ms) + StartTime types.MillisecondTimestamp `json:"start"` + // The end timestamp (ms) + EndTime types.MillisecondTimestamp `json:"end"` + // Kline interval + Interval string `json:"interval"` + OpenPrice fixedpoint.Value `json:"open"` + ClosePrice fixedpoint.Value `json:"close"` + HighPrice fixedpoint.Value `json:"high"` + LowPrice fixedpoint.Value `json:"low"` + // Trade volume + Volume fixedpoint.Value `json:"volume"` + // Turnover. Unit of figure: quantity of quota coin + Turnover fixedpoint.Value `json:"turnover"` + // Weather the tick is ended or not + Confirm bool `json:"confirm"` + // The timestamp (ms) of the last matched order in the candle + Timestamp types.MillisecondTimestamp `json:"timestamp"` +} + +func (k *KLine) toGlobalKLine(symbol string) (types.KLine, error) { + interval, found := bybitapi.ToGlobalInterval[k.Interval] + if !found { + return types.KLine{}, fmt.Errorf("unexpected k line interval: %+v", k) + } + + return types.KLine{ + Exchange: types.ExchangeBybit, + Symbol: symbol, + StartTime: types.Time(k.StartTime.Time()), + EndTime: types.Time(k.EndTime.Time()), + Interval: interval, + Open: k.OpenPrice, + Close: k.ClosePrice, + High: k.HighPrice, + Low: k.LowPrice, + Volume: k.Volume, + QuoteVolume: k.Turnover, + Closed: k.Confirm, + }, nil +} diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 3332814cf..22aec43bb 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -386,3 +386,74 @@ func Test_getTopicName(t *testing.T) { exp := TopicTypeOrderBook assert.Equal(t, exp, getTopicType("orderbook.50.BTCUSDT")) } + +func Test_getSymbolFromTopic(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + exp := "BTCUSDT" + res, err := getSymbolFromTopic("kline.1.BTCUSDT") + assert.NoError(t, err) + assert.Equal(t, exp, res) + }) + + t.Run("unexpected topic", func(t *testing.T) { + res, err := getSymbolFromTopic("kline.1") + assert.Empty(t, res) + assert.Equal(t, err, fmt.Errorf("unexpected topic: kline.1")) + }) +} + +func TestKLine_toGlobalKLine(t *testing.T) { + t.Run("succeeds", func(t *testing.T) { + k := KLine{ + StartTime: types.NewMillisecondTimestampFromInt(1691486100000), + EndTime: types.NewMillisecondTimestampFromInt(1691487000000), + Interval: "1", + OpenPrice: fixedpoint.NewFromFloat(29045.3), + ClosePrice: fixedpoint.NewFromFloat(29228.56), + HighPrice: fixedpoint.NewFromFloat(29228.56), + LowPrice: fixedpoint.NewFromFloat(29045.3), + Volume: fixedpoint.NewFromFloat(9.265593), + Turnover: fixedpoint.NewFromFloat(270447.43520753), + Confirm: false, + Timestamp: types.NewMillisecondTimestampFromInt(1691486100000), + } + + gKline, err := k.toGlobalKLine("BTCUSDT") + assert.NoError(t, err) + + assert.Equal(t, types.KLine{ + Exchange: types.ExchangeBybit, + Symbol: "BTCUSDT", + StartTime: types.Time(k.StartTime.Time()), + EndTime: types.Time(k.EndTime.Time()), + Interval: types.Interval1m, + Open: fixedpoint.NewFromFloat(29045.3), + Close: fixedpoint.NewFromFloat(29228.56), + High: fixedpoint.NewFromFloat(29228.56), + Low: fixedpoint.NewFromFloat(29045.3), + Volume: fixedpoint.NewFromFloat(9.265593), + QuoteVolume: fixedpoint.NewFromFloat(270447.43520753), + Closed: false, + }, gKline) + }) + + t.Run("interval not supported", func(t *testing.T) { + k := KLine{ + StartTime: types.NewMillisecondTimestampFromInt(1691486100000), + EndTime: types.NewMillisecondTimestampFromInt(1691487000000), + Interval: "112", + OpenPrice: fixedpoint.NewFromFloat(29045.3), + ClosePrice: fixedpoint.NewFromFloat(29228.56), + HighPrice: fixedpoint.NewFromFloat(29228.56), + LowPrice: fixedpoint.NewFromFloat(29045.3), + Volume: fixedpoint.NewFromFloat(9.265593), + Turnover: fixedpoint.NewFromFloat(270447.43520753), + Confirm: false, + Timestamp: types.NewMillisecondTimestampFromInt(1691486100000), + } + + gKline, err := k.toGlobalKLine("BTCUSDT") + assert.Equal(t, fmt.Errorf("unexpected k line interval: %+v", &k), err) + assert.Equal(t, gKline, types.KLine{}) + }) +}