Merge pull request #1277 from bailantaotao/edwin/add-kline-api

FEATURE: [bybit] add k line api
This commit is contained in:
bailantaotao 2023-08-10 11:44:28 +08:00 committed by GitHub
commit 6379cab65e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 1103 additions and 12 deletions

View File

@ -5,6 +5,7 @@ import (
"os"
"strconv"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
@ -162,4 +163,14 @@ func TestClient(t *testing.T) {
assert.NoError(t, err)
t.Logf("apiResp: %+v", apiResp)
})
t.Run("GetKLinesRequest", func(t *testing.T) {
startTime := time.Date(2023, 8, 8, 9, 28, 0, 0, time.UTC)
endTime := time.Date(2023, 8, 8, 9, 45, 0, 0, time.UTC)
req := client.NewGetKLinesRequest().
Symbol("BTCUSDT").Interval("15").StartTime(startTime).EndTime(endTime)
apiResp, err := req.Do(ctx)
assert.NoError(t, err)
t.Logf("apiResp: %+v", apiResp.List)
})
}

View File

@ -0,0 +1,107 @@
package bybitapi
import (
"encoding/json"
"fmt"
"time"
"github.com/c9s/requestgen"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
//go:generate -command GetRequest requestgen -method GET -responseType .APIResponse -responseDataField Result
//go:generate -command PostRequest requestgen -method POST -responseType .APIResponse -responseDataField Result
type IntervalSign string
const (
IntervalSignDay IntervalSign = "D"
IntervalSignWeek IntervalSign = "W"
IntervalSignMonth IntervalSign = "M"
)
type KLinesResponse struct {
Symbol string `json:"symbol"`
// An string array of individual candle
// Sort in reverse by startTime
List []KLine `json:"list"`
Category Category `json:"category"`
}
type KLine struct {
// list[0]: startTime, Start time of the candle (ms)
StartTime types.MillisecondTimestamp
// list[1]: openPrice
Open fixedpoint.Value
// list[2]: highPrice
High fixedpoint.Value
// list[3]: lowPrice
Low fixedpoint.Value
// list[4]: closePrice
Close fixedpoint.Value
// list[5]: volume, Trade volume. Unit of contract: pieces of contract. Unit of spot: quantity of coins
Volume fixedpoint.Value
// list[6]: turnover, Turnover. Unit of figure: quantity of quota coin
TurnOver fixedpoint.Value
}
const KLinesArrayLen = 7
func (k *KLine) UnmarshalJSON(data []byte) error {
var jsonArr []json.RawMessage
err := json.Unmarshal(data, &jsonArr)
if err != nil {
return fmt.Errorf("failed to unmarshal jsonRawMessage: %v, err: %w", string(data), err)
}
if len(jsonArr) != KLinesArrayLen {
return fmt.Errorf("unexpected K Lines array length: %d, exp: %d", len(jsonArr), KLinesArrayLen)
}
err = json.Unmarshal(jsonArr[0], &k.StartTime)
if err != nil {
return fmt.Errorf("failed to unmarshal resp index 0: %v, err: %w", string(jsonArr[0]), err)
}
values := make([]fixedpoint.Value, len(jsonArr)-1)
for i, jsonRaw := range jsonArr[1:] {
err = json.Unmarshal(jsonRaw, &values[i])
if err != nil {
return fmt.Errorf("failed to unmarshal resp index %d: %v, err: %w", i+1, string(jsonRaw), err)
}
}
k.Open = values[0]
k.High = values[1]
k.Low = values[2]
k.Close = values[3]
k.Volume = values[4]
k.TurnOver = values[5]
return nil
}
//go:generate GetRequest -url "/v5/market/kline" -type GetKLinesRequest -responseDataType .KLinesResponse
type GetKLinesRequest struct {
client requestgen.APIClient
category Category `param:"category,query" validValues:"spot"`
symbol string `param:"symbol,query"`
// Kline interval.
// - 1,3,5,15,30,60,120,240,360,720: minute
// - D: day
// - M: month
// - W: week
interval string `param:"interval,query" validValues:"1,3,5,15,30,60,120,240,360,720,D,W,M"`
startTime *time.Time `param:"start,query,milliseconds"`
endTime *time.Time `param:"end,query,milliseconds"`
// Limit for data size per page. [1, 1000]. Default: 200
limit *uint64 `param:"limit,query"`
}
func (c *RestClient) NewGetKLinesRequest() *GetKLinesRequest {
return &GetKLinesRequest{
client: c,
category: CategorySpot,
}
}

View File

@ -0,0 +1,237 @@
// Code generated by "requestgen -method GET -responseType .APIResponse -responseDataField Result -url /v5/market/kline -type GetKLinesRequest -responseDataType .KLinesResponse"; DO NOT EDIT.
package bybitapi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"
"strconv"
"time"
)
func (g *GetKLinesRequest) Category(category Category) *GetKLinesRequest {
g.category = category
return g
}
func (g *GetKLinesRequest) Symbol(symbol string) *GetKLinesRequest {
g.symbol = symbol
return g
}
func (g *GetKLinesRequest) Interval(interval string) *GetKLinesRequest {
g.interval = interval
return g
}
func (g *GetKLinesRequest) StartTime(startTime time.Time) *GetKLinesRequest {
g.startTime = &startTime
return g
}
func (g *GetKLinesRequest) EndTime(endTime time.Time) *GetKLinesRequest {
g.endTime = &endTime
return g
}
func (g *GetKLinesRequest) Limit(limit uint64) *GetKLinesRequest {
g.limit = &limit
return g
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (g *GetKLinesRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
// check category field -> json key category
category := g.category
// TEMPLATE check-valid-values
switch category {
case "spot":
params["category"] = category
default:
return nil, fmt.Errorf("category value %v is invalid", category)
}
// END TEMPLATE check-valid-values
// assign parameter of category
params["category"] = category
// check symbol field -> json key symbol
symbol := g.symbol
// assign parameter of symbol
params["symbol"] = symbol
// check interval field -> json key interval
interval := g.interval
// TEMPLATE check-valid-values
switch interval {
case "1", "3", "5", "15", "30", "60", "120", "240", "360", "720", "D", "W", "M":
params["interval"] = interval
default:
return nil, fmt.Errorf("interval value %v is invalid", interval)
}
// END TEMPLATE check-valid-values
// assign parameter of interval
params["interval"] = interval
// check startTime field -> json key start
if g.startTime != nil {
startTime := *g.startTime
// assign parameter of startTime
// convert time.Time to milliseconds time stamp
params["start"] = strconv.FormatInt(startTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check endTime field -> json key end
if g.endTime != nil {
endTime := *g.endTime
// assign parameter of endTime
// convert time.Time to milliseconds time stamp
params["end"] = strconv.FormatInt(endTime.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check limit field -> json key limit
if g.limit != nil {
limit := *g.limit
// assign parameter of limit
params["limit"] = limit
} else {
}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (g *GetKLinesRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (g *GetKLinesRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := g.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if g.isVarSlice(_v) {
g.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (g *GetKLinesRequest) GetParametersJSON() ([]byte, error) {
params, err := g.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (g *GetKLinesRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (g *GetKLinesRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (g *GetKLinesRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (g *GetKLinesRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (g *GetKLinesRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := g.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
func (g *GetKLinesRequest) Do(ctx context.Context) (*KLinesResponse, error) {
// no body params
var params interface{}
query, err := g.GetQueryParameters()
if err != nil {
return nil, err
}
apiURL := "/v5/market/kline"
req, err := g.client.NewRequest(ctx, "GET", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := g.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse APIResponse
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
var data KLinesResponse
if err := json.Unmarshal(apiResponse.Result, &data); err != nil {
return nil, err
}
return &data, nil
}

View File

@ -0,0 +1,175 @@
package bybitapi
import (
"encoding/json"
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func TestKLinesResponse_UnmarshalJSON(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
data := `{
"symbol": "BTCUSDT",
"category": "spot",
"list": [
[
"1670608800000",
"17071",
"17073",
"17027",
"17055.5",
"268611",
"15.74462667"
],
[
"1670605200000",
"17071.5",
"17071.5",
"17061",
"17071",
"4177",
"0.24469757"
]
]
}`
expRes := &KLinesResponse{
Symbol: "BTCUSDT",
List: []KLine{
{
StartTime: types.NewMillisecondTimestampFromInt(1670608800000),
Open: fixedpoint.NewFromFloat(17071),
High: fixedpoint.NewFromFloat(17073),
Low: fixedpoint.NewFromFloat(17027),
Close: fixedpoint.NewFromFloat(17055.5),
Volume: fixedpoint.NewFromFloat(268611),
TurnOver: fixedpoint.NewFromFloat(15.74462667),
},
{
StartTime: types.NewMillisecondTimestampFromInt(1670605200000),
Open: fixedpoint.NewFromFloat(17071.5),
High: fixedpoint.NewFromFloat(17071.5),
Low: fixedpoint.NewFromFloat(17061),
Close: fixedpoint.NewFromFloat(17071),
Volume: fixedpoint.NewFromFloat(4177),
TurnOver: fixedpoint.NewFromFloat(0.24469757),
},
},
Category: CategorySpot,
}
kline := &KLinesResponse{}
err := json.Unmarshal([]byte(data), kline)
assert.NoError(t, err)
assert.Equal(t, expRes, kline)
})
t.Run("unexpected length", func(t *testing.T) {
data := `{
"symbol": "BTCUSDT",
"category": "spot",
"list": [
[
"1670608800000",
"17071",
"17073",
"17027",
"17055.5",
"268611"
]
]
}`
kline := &KLinesResponse{}
err := json.Unmarshal([]byte(data), kline)
assert.Equal(t, fmt.Errorf("unexpected K Lines array length: 6, exp: %d", KLinesArrayLen), err)
})
t.Run("unexpected json array", func(t *testing.T) {
klineJson := `{}`
data := fmt.Sprintf(`{
"symbol": "BTCUSDT",
"category": "spot",
"list": [%s]
}`, klineJson)
var jsonArr []json.RawMessage
expErr := json.Unmarshal([]byte(klineJson), &jsonArr)
assert.Error(t, expErr)
kline := &KLinesResponse{}
err := json.Unmarshal([]byte(data), kline)
assert.Equal(t, fmt.Errorf("failed to unmarshal jsonRawMessage: %v, err: %w", klineJson, expErr), err)
})
t.Run("unexpected json 0", func(t *testing.T) {
klineJson := `
[
"a",
"17071.5",
"17071.5",
"17061",
"17071",
"4177",
"0.24469757"
]
`
data := fmt.Sprintf(`{
"symbol": "BTCUSDT",
"category": "spot",
"list": [%s]
}`, klineJson)
var jsonArr []json.RawMessage
err := json.Unmarshal([]byte(klineJson), &jsonArr)
assert.NoError(t, err)
timestamp := types.MillisecondTimestamp{}
expErr := json.Unmarshal(jsonArr[0], &timestamp)
assert.NoError(t, err)
kline := &KLinesResponse{}
err = json.Unmarshal([]byte(data), kline)
assert.Equal(t, fmt.Errorf("failed to unmarshal resp index 0: %v, err: %w", string(jsonArr[0]), expErr), err)
})
t.Run("unexpected json 1", func(t *testing.T) {
// TODO: fix panic
t.Skip("test will result in a panic, skip it")
klineJson := `
[
"1670608800000",
"a",
"17071.5",
"17061",
"17071",
"4177",
"0.24469757"
]
`
data := fmt.Sprintf(`{
"symbol": "BTCUSDT",
"category": "spot",
"list": [%s]
}`, klineJson)
var jsonArr []json.RawMessage
err := json.Unmarshal([]byte(klineJson), &jsonArr)
assert.NoError(t, err)
var value fixedpoint.Value
expErr := json.Unmarshal(jsonArr[1], &value)
assert.NoError(t, err)
kline := &KLinesResponse{}
err = json.Unmarshal([]byte(data), kline)
assert.Equal(t, fmt.Errorf("failed to unmarshal resp index 1: %v, err: %w", string(jsonArr[1]), expErr), err)
})
}

View File

@ -1,5 +1,41 @@
package bybitapi
import "github.com/c9s/bbgo/pkg/types"
var (
SupportedIntervals = map[types.Interval]int{
types.Interval1m: 1 * 60,
types.Interval3m: 3 * 60,
types.Interval5m: 5 * 60,
types.Interval15m: 15 * 60,
types.Interval30m: 30 * 60,
types.Interval1h: 60 * 60,
types.Interval2h: 60 * 60 * 2,
types.Interval4h: 60 * 60 * 4,
types.Interval6h: 60 * 60 * 6,
types.Interval12h: 60 * 60 * 12,
types.Interval1d: 60 * 60 * 24,
types.Interval1w: 60 * 60 * 24 * 7,
types.Interval1mo: 60 * 60 * 24 * 30,
}
ToGlobalInterval = map[string]types.Interval{
"1": types.Interval1m,
"3": types.Interval3m,
"5": types.Interval5m,
"15": types.Interval15m,
"30": types.Interval30m,
"60": types.Interval1h,
"120": types.Interval2h,
"240": types.Interval4h,
"360": types.Interval6h,
"720": types.Interval12h,
"D": types.Interval1d,
"W": types.Interval1w,
"M": types.Interval1mo,
}
)
type Category string
const (

View File

@ -0,0 +1,41 @@
package bybitapi
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/types"
)
func Test_SupportedIntervals(t *testing.T) {
assert.Equal(t, SupportedIntervals[types.Interval1m], 60)
assert.Equal(t, SupportedIntervals[types.Interval3m], 180)
assert.Equal(t, SupportedIntervals[types.Interval5m], 300)
assert.Equal(t, SupportedIntervals[types.Interval15m], 15*60)
assert.Equal(t, SupportedIntervals[types.Interval30m], 30*60)
assert.Equal(t, SupportedIntervals[types.Interval1h], 60*60)
assert.Equal(t, SupportedIntervals[types.Interval2h], 60*60*2)
assert.Equal(t, SupportedIntervals[types.Interval4h], 60*60*4)
assert.Equal(t, SupportedIntervals[types.Interval6h], 60*60*6)
assert.Equal(t, SupportedIntervals[types.Interval12h], 60*60*12)
assert.Equal(t, SupportedIntervals[types.Interval1d], 60*60*24)
assert.Equal(t, SupportedIntervals[types.Interval1w], 60*60*24*7)
assert.Equal(t, SupportedIntervals[types.Interval1mo], 60*60*24*30)
}
func Test_ToGlobalInterval(t *testing.T) {
assert.Equal(t, ToGlobalInterval["1"], types.Interval1m)
assert.Equal(t, ToGlobalInterval["3"], types.Interval3m)
assert.Equal(t, ToGlobalInterval["5"], types.Interval5m)
assert.Equal(t, ToGlobalInterval["15"], types.Interval15m)
assert.Equal(t, ToGlobalInterval["30"], types.Interval30m)
assert.Equal(t, ToGlobalInterval["60"], types.Interval1h)
assert.Equal(t, ToGlobalInterval["120"], types.Interval2h)
assert.Equal(t, ToGlobalInterval["240"], types.Interval4h)
assert.Equal(t, ToGlobalInterval["360"], types.Interval6h)
assert.Equal(t, ToGlobalInterval["720"], types.Interval12h)
assert.Equal(t, ToGlobalInterval["D"], types.Interval1d)
assert.Equal(t, ToGlobalInterval["W"], types.Interval1w)
assert.Equal(t, ToGlobalInterval["M"], types.Interval1mo)
}

View File

@ -283,3 +283,48 @@ func toGlobalBalanceMap(events []bybitapi.WalletBalances) types.BalanceMap {
}
return bm
}
func toLocalInterval(interval types.Interval) (string, error) {
if _, found := bybitapi.SupportedIntervals[interval]; !found {
return "", fmt.Errorf("interval not supported: %s", interval)
}
switch interval {
case types.Interval1d:
return string(bybitapi.IntervalSignDay), nil
case types.Interval1w:
return string(bybitapi.IntervalSignWeek), nil
case types.Interval1mo:
return string(bybitapi.IntervalSignMonth), nil
default:
return fmt.Sprintf("%d", interval.Minutes()), nil
}
}
func toGlobalKLines(symbol string, interval types.Interval, klines []bybitapi.KLine) []types.KLine {
gKLines := make([]types.KLine, len(klines))
for i, kline := range klines {
endTime := types.Time(kline.StartTime.Time().Add(interval.Duration()))
gKLines[i] = types.KLine{
Exchange: types.ExchangeBybit,
Symbol: symbol,
StartTime: types.Time(kline.StartTime),
EndTime: endTime,
Interval: interval,
Open: kline.Open,
Close: kline.Close,
High: kline.High,
Low: kline.Low,
Volume: kline.Volume,
QuoteVolume: kline.TurnOver,
// Bybit doesn't support close flag in REST API
Closed: false,
}
}
return gKLines
}

View File

@ -459,3 +459,88 @@ func Test_toGlobalTrade(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, res, &exp)
}
func Test_toGlobalKLines(t *testing.T) {
symbol := "BTCUSDT"
interval := types.Interval15m
resp := bybitapi.KLinesResponse{
Symbol: symbol,
List: []bybitapi.KLine{
/*
[
{
"StartTime": "2023-08-08 17:30:00 +0800 CST",
"OpenPrice": 29045.3,
"HighPrice": 29228.56,
"LowPrice": 29045.3,
"ClosePrice": 29228.56,
"Volume": 9.265593,
"TurnOver": 270447.43520753
},
{
"StartTime": "2023-08-08 17:15:00 +0800 CST",
"OpenPrice": 29167.33,
"HighPrice": 29229.08,
"LowPrice": 29000,
"ClosePrice": 29045.3,
"Volume": 9.295508,
"TurnOver": 270816.87513775
}
]
*/
{
StartTime: types.NewMillisecondTimestampFromInt(1691486100000),
Open: fixedpoint.NewFromFloat(29045.3),
High: fixedpoint.NewFromFloat(29228.56),
Low: fixedpoint.NewFromFloat(29045.3),
Close: fixedpoint.NewFromFloat(29228.56),
Volume: fixedpoint.NewFromFloat(9.265593),
TurnOver: fixedpoint.NewFromFloat(270447.43520753),
},
{
StartTime: types.NewMillisecondTimestampFromInt(1691487000000),
Open: fixedpoint.NewFromFloat(29167.33),
High: fixedpoint.NewFromFloat(29229.08),
Low: fixedpoint.NewFromFloat(29000),
Close: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.295508),
TurnOver: fixedpoint.NewFromFloat(270816.87513775),
},
},
Category: bybitapi.CategorySpot,
}
expKlines := []types.KLine{
{
Exchange: types.ExchangeBybit,
Symbol: resp.Symbol,
StartTime: types.Time(resp.List[0].StartTime.Time()),
EndTime: types.Time(resp.List[0].StartTime.Time().Add(interval.Duration())),
Interval: interval,
Open: fixedpoint.NewFromFloat(29045.3),
Close: fixedpoint.NewFromFloat(29228.56),
High: fixedpoint.NewFromFloat(29228.56),
Low: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
QuoteVolume: fixedpoint.NewFromFloat(270447.43520753),
Closed: false,
},
{
Exchange: types.ExchangeBybit,
Symbol: resp.Symbol,
StartTime: types.Time(resp.List[1].StartTime.Time()),
EndTime: types.Time(resp.List[1].StartTime.Time().Add(interval.Duration())),
Interval: interval,
Open: fixedpoint.NewFromFloat(29167.33),
Close: fixedpoint.NewFromFloat(29045.3),
High: fixedpoint.NewFromFloat(29229.08),
Low: fixedpoint.NewFromFloat(29000),
Volume: fixedpoint.NewFromFloat(9.295508),
QuoteVolume: fixedpoint.NewFromFloat(270816.87513775),
Closed: false,
},
}
assert.Equal(t, toGlobalKLines(symbol, interval, resp.List), expKlines)
}

View File

@ -19,6 +19,7 @@ import (
const (
maxOrderIdLen = 36
defaultQueryLimit = 50
defaultKLineLimit = 1000
halfYearDuration = 6 * 30 * 24 * time.Hour
)
@ -39,6 +40,11 @@ var (
})
_ types.ExchangeAccountService = &Exchange{}
_ types.ExchangeMarketDataService = &Exchange{}
_ types.CustomIntervalProvider = &Exchange{}
_ types.ExchangeMinimal = &Exchange{}
_ types.ExchangeTradeService = &Exchange{}
_ types.Exchange = &Exchange{}
)
type Exchange struct {
@ -422,6 +428,68 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap,
return toGlobalBalanceMap(accounts.List), nil
}
/*
QueryKLines queries for historical klines (also known as candles/candlesticks). Charts are returned in groups based
on the requested interval.
A k-line's start time is inclusive, but end time is not(startTime + interval - 1 millisecond).
e.q. 15m interval k line can be represented as 00:00:00.000 ~ 00:14:59.999
*/
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
req := e.client.NewGetKLinesRequest().Symbol(symbol)
intervalStr, err := toLocalInterval(interval)
if err != nil {
return nil, err
}
req.Interval(intervalStr)
limit := uint64(options.Limit)
if limit > defaultKLineLimit || limit <= 0 {
log.Debugf("limtit is exceeded or zero, update to %d, got: %d", defaultKLineLimit, options.Limit)
limit = defaultKLineLimit
}
req.Limit(limit)
if options.StartTime != nil {
req.StartTime(*options.StartTime)
}
if options.EndTime != nil {
req.EndTime(*options.EndTime)
}
if err := sharedRateLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query klines rate limiter wait error: %w", err)
}
resp, err := req.Do(ctx)
if err != nil {
return nil, fmt.Errorf("failed to call k line, err: %w", err)
}
if resp.Category != bybitapi.CategorySpot {
return nil, fmt.Errorf("unexpected category: %s", resp.Category)
}
if resp.Symbol != symbol {
return nil, fmt.Errorf("unexpected symbol: %s, exp: %s", resp.Category, symbol)
}
kLines := toGlobalKLines(symbol, interval, resp.List)
return types.SortKLinesAscending(kLines), nil
}
func (e *Exchange) SupportedInterval() map[types.Interval]int {
return bybitapi.SupportedIntervals
}
func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
_, ok := bybitapi.SupportedIntervals[interval]
return ok
}
func (e *Exchange) NewStream() types.Stream {
return NewStream(e.key, e.secret)
}

View File

@ -34,6 +34,7 @@ type Stream struct {
bookEventCallbacks []func(e BookEvent)
walletEventCallbacks []func(e []bybitapi.WalletBalances)
kLineEventCallbacks []func(e KLineEvent)
orderEventCallbacks []func(e []OrderEvent)
}
@ -52,6 +53,7 @@ func NewStream(key, secret string) *Stream {
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnWalletEvent(stream.handleWalletEvent)
stream.OnOrderEvent(stream.handleOrderEvent)
return stream
@ -80,6 +82,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
case []bybitapi.WalletBalances:
s.EmitWalletEvent(e)
case *KLineEvent:
s.EmitKLineEvent(*e)
case []OrderEvent:
s.EmitOrderEvent(e)
}
@ -99,6 +104,7 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
case e.IsTopic():
switch getTopicType(e.Topic) {
case TopicTypeOrderBook:
var book BookEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
@ -109,6 +115,20 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
book.Type = e.WebSocketTopicEvent.Type
return &book, nil
case TopicTypeKLine:
var kLines []KLine
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &kLines)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
}
symbol, err := getSymbolFromTopic(e.Topic)
if err != nil {
return nil, err
}
return &KLineEvent{KLines: kLines, Symbol: symbol, Type: e.WebSocketTopicEvent.Type}, nil
case TopicTypeWallet:
var wallets []bybitapi.WalletBalances
return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets)
@ -165,7 +185,7 @@ func (s *Stream) handlerConnect() {
var topics []string
for _, subscription := range s.Subscriptions {
topic, err := convertSubscription(subscription)
topic, err := s.convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
@ -213,17 +233,27 @@ func (s *Stream) handlerConnect() {
}
}
func convertSubscription(s types.Subscription) (string, error) {
switch s.Channel {
func (s *Stream) convertSubscription(sub types.Subscription) (string, error) {
switch sub.Channel {
case types.BookChannel:
depth := types.DepthLevel1
if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 {
if len(sub.Options.Depth) > 0 && sub.Options.Depth == types.DepthLevel50 {
depth = types.DepthLevel50
}
return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil
return genTopic(TopicTypeOrderBook, depth, sub.Symbol), nil
case types.KLineChannel:
interval, err := toLocalInterval(sub.Options.Interval)
if err != nil {
return "", err
}
return "", fmt.Errorf("unsupported stream channel: %s", s.Channel)
return genTopic(TopicTypeKLine, interval, sub.Symbol), nil
}
return "", fmt.Errorf("unsupported stream channel: %s", sub.Channel)
}
func (s *Stream) handleBookEvent(e BookEvent) {
@ -257,3 +287,23 @@ func (s *Stream) handleOrderEvent(events []OrderEvent) {
s.StandardStream.EmitOrderUpdate(*gOrder)
}
}
func (s *Stream) handleKLineEvent(klineEvent KLineEvent) {
if klineEvent.Type != DataTypeSnapshot {
return
}
for _, event := range klineEvent.KLines {
kline, err := event.toGlobalKLine(klineEvent.Symbol)
if err != nil {
log.WithError(err).Error("failed to convert to global k line")
continue
}
if kline.Closed {
s.EmitKLineClosed(kline)
} else {
s.EmitKLine(kline)
}
}
}

View File

@ -26,6 +26,16 @@ func (s *Stream) EmitWalletEvent(e []bybitapi.WalletBalances) {
}
}
func (s *Stream) OnKLineEvent(cb func(e KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *Stream) EmitKLineEvent(e KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(e)
}
}
func (s *Stream) OnOrderEvent(cb func(e []OrderEvent)) {
s.orderEventCallbacks = append(s.orderEventCallbacks, cb)
}

View File

@ -2,6 +2,7 @@ package bybit
import (
"context"
"errors"
"fmt"
"os"
"strconv"
@ -76,6 +77,23 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})
t.Run("kline test", func(t *testing.T) {
s.Subscribe(types.KLineChannel, "BTCUSDT", types.SubscribeOptions{
Interval: types.Interval30m,
Depth: "",
Speed: "",
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnKLine(func(kline types.KLine) {
t.Log(kline)
})
c := make(chan struct{})
<-c
})
}
func TestStream_parseWebSocketEvent(t *testing.T) {
@ -151,6 +169,80 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
}, *book)
})
t.Run("TopicTypeKLine with snapshot", func(t *testing.T) {
input := `{
"topic": "kline.5.BTCUSDT",
"data": [
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882
}
],
"ts": 1672324988882,
"type": "snapshot"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
book, ok := res.(*KLineEvent)
assert.True(t, ok)
assert.Equal(t, KLineEvent{
Symbol: "BTCUSDT",
Type: DataTypeSnapshot,
KLines: []KLine{
{
StartTime: types.NewMillisecondTimestampFromInt(1672324800000),
EndTime: types.NewMillisecondTimestampFromInt(1672325099999),
Interval: "5",
OpenPrice: fixedpoint.NewFromFloat(16649.5),
ClosePrice: fixedpoint.NewFromFloat(16677),
HighPrice: fixedpoint.NewFromFloat(16677),
LowPrice: fixedpoint.NewFromFloat(16608),
Volume: fixedpoint.NewFromFloat(2.081),
Turnover: fixedpoint.NewFromFloat(34666.4005),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1672324988882),
},
},
}, *book)
})
t.Run("TopicTypeKLine with invalid topic", func(t *testing.T) {
input := `{
"topic": "kline.5",
"data": [
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882
}
],
"ts": 1672324988882,
"type": "snapshot"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.Equal(t, errors.New("unexpected topic: kline.5"), err)
assert.Nil(t, res)
})
t.Run("Parse fails", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
@ -168,8 +260,9 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
}
func Test_convertSubscription(t *testing.T) {
s := Stream{}
t.Run("BookChannel.DepthLevel1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -180,7 +273,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel. with default depth", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
})
@ -188,7 +281,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel.DepthLevel50", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -199,7 +292,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res)
})
t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -211,7 +304,7 @@ func Test_convertSubscription(t *testing.T) {
})
t.Run("unsupported channel", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: "unsupported",
})

View File

@ -82,6 +82,7 @@ const (
TopicTypeOrderBook TopicType = "orderbook"
TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
TopicTypeKLine TopicType = "kline"
)
type DataType string
@ -143,8 +144,69 @@ func getTopicType(topic string) TopicType {
return TopicType(slice[0])
}
func getSymbolFromTopic(topic string) (string, error) {
slice := strings.Split(topic, topicSeparator)
if len(slice) != 3 {
return "", fmt.Errorf("unexpected topic: %s", topic)
}
return slice[2], nil
}
type OrderEvent struct {
bybitapi.Order
Category bybitapi.Category `json:"category"`
}
type KLineEvent struct {
KLines []KLine
// internal use
// Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type
Type DataType
// Symbol. Copied from WebSocketTopicEvent.Topic
Symbol string
}
type KLine struct {
// The start timestamp (ms)
StartTime types.MillisecondTimestamp `json:"start"`
// The end timestamp (ms)
EndTime types.MillisecondTimestamp `json:"end"`
// Kline interval
Interval string `json:"interval"`
OpenPrice fixedpoint.Value `json:"open"`
ClosePrice fixedpoint.Value `json:"close"`
HighPrice fixedpoint.Value `json:"high"`
LowPrice fixedpoint.Value `json:"low"`
// Trade volume
Volume fixedpoint.Value `json:"volume"`
// Turnover. Unit of figure: quantity of quota coin
Turnover fixedpoint.Value `json:"turnover"`
// Weather the tick is ended or not
Confirm bool `json:"confirm"`
// The timestamp (ms) of the last matched order in the candle
Timestamp types.MillisecondTimestamp `json:"timestamp"`
}
func (k *KLine) toGlobalKLine(symbol string) (types.KLine, error) {
interval, found := bybitapi.ToGlobalInterval[k.Interval]
if !found {
return types.KLine{}, fmt.Errorf("unexpected k line interval: %+v", k)
}
return types.KLine{
Exchange: types.ExchangeBybit,
Symbol: symbol,
StartTime: types.Time(k.StartTime.Time()),
EndTime: types.Time(k.EndTime.Time()),
Interval: interval,
Open: k.OpenPrice,
Close: k.ClosePrice,
High: k.HighPrice,
Low: k.LowPrice,
Volume: k.Volume,
QuoteVolume: k.Turnover,
Closed: k.Confirm,
}, nil
}

View File

@ -386,3 +386,74 @@ func Test_getTopicName(t *testing.T) {
exp := TopicTypeOrderBook
assert.Equal(t, exp, getTopicType("orderbook.50.BTCUSDT"))
}
func Test_getSymbolFromTopic(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
exp := "BTCUSDT"
res, err := getSymbolFromTopic("kline.1.BTCUSDT")
assert.NoError(t, err)
assert.Equal(t, exp, res)
})
t.Run("unexpected topic", func(t *testing.T) {
res, err := getSymbolFromTopic("kline.1")
assert.Empty(t, res)
assert.Equal(t, err, fmt.Errorf("unexpected topic: kline.1"))
})
}
func TestKLine_toGlobalKLine(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
k := KLine{
StartTime: types.NewMillisecondTimestampFromInt(1691486100000),
EndTime: types.NewMillisecondTimestampFromInt(1691487000000),
Interval: "1",
OpenPrice: fixedpoint.NewFromFloat(29045.3),
ClosePrice: fixedpoint.NewFromFloat(29228.56),
HighPrice: fixedpoint.NewFromFloat(29228.56),
LowPrice: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
Turnover: fixedpoint.NewFromFloat(270447.43520753),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1691486100000),
}
gKline, err := k.toGlobalKLine("BTCUSDT")
assert.NoError(t, err)
assert.Equal(t, types.KLine{
Exchange: types.ExchangeBybit,
Symbol: "BTCUSDT",
StartTime: types.Time(k.StartTime.Time()),
EndTime: types.Time(k.EndTime.Time()),
Interval: types.Interval1m,
Open: fixedpoint.NewFromFloat(29045.3),
Close: fixedpoint.NewFromFloat(29228.56),
High: fixedpoint.NewFromFloat(29228.56),
Low: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
QuoteVolume: fixedpoint.NewFromFloat(270447.43520753),
Closed: false,
}, gKline)
})
t.Run("interval not supported", func(t *testing.T) {
k := KLine{
StartTime: types.NewMillisecondTimestampFromInt(1691486100000),
EndTime: types.NewMillisecondTimestampFromInt(1691487000000),
Interval: "112",
OpenPrice: fixedpoint.NewFromFloat(29045.3),
ClosePrice: fixedpoint.NewFromFloat(29228.56),
HighPrice: fixedpoint.NewFromFloat(29228.56),
LowPrice: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
Turnover: fixedpoint.NewFromFloat(270447.43520753),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1691486100000),
}
gKline, err := k.toGlobalKLine("BTCUSDT")
assert.Equal(t, fmt.Errorf("unexpected k line interval: %+v", &k), err)
assert.Equal(t, gKline, types.KLine{})
})
}