Merge pull request #1507 from c9s/edwin/okx/refactor-kline-api

REFACTOR: [okx] refactor kline api
This commit is contained in:
bailantaotao 2024-01-30 09:33:41 +08:00 committed by GitHub
commit 2d3f8fb923
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 410 additions and 314 deletions

View File

@ -32,6 +32,7 @@ var (
queryOpenOrderLimiter = rate.NewLimiter(rate.Every(30*time.Millisecond), 30)
queryClosedOrderRateLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 10)
queryTradeLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 10)
queryKLineLimiter = rate.NewLimiter(rate.Every(100*time.Millisecond), 20)
)
const (
@ -379,24 +380,24 @@ func (e *Exchange) NewStream() types.Stream {
}
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
if err := marketDataLimiter.Wait(ctx); err != nil {
return nil, err
if err := queryKLineLimiter.Wait(ctx); err != nil {
return nil, fmt.Errorf("query k line rate limiter wait error: %w", err)
}
intervalParam, err := toLocalInterval(interval)
if err != nil {
return nil, fmt.Errorf("fail to get interval: %w", err)
return nil, fmt.Errorf("failed to get interval: %w", err)
}
req := e.client.NewCandlesticksRequest(toLocalSymbol(symbol))
req := e.client.NewGetCandlesRequest().InstrumentID(toLocalSymbol(symbol))
req.Bar(intervalParam)
if options.StartTime != nil {
req.After(options.StartTime.Unix())
req.After(*options.StartTime)
}
if options.EndTime != nil {
req.Before(options.EndTime.Unix())
req.Before(*options.EndTime)
}
candles, err := req.Do(ctx)
@ -406,20 +407,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
var klines []types.KLine
for _, candle := range candles {
klines = append(klines, types.KLine{
Exchange: types.ExchangeOKEx,
Symbol: symbol,
Interval: interval,
Open: candle.Open,
High: candle.High,
Low: candle.Low,
Close: candle.Close,
Closed: true,
Volume: candle.Volume,
QuoteVolume: candle.VolumeInCurrency,
StartTime: types.Time(candle.Time),
EndTime: types.Time(candle.Time.Add(interval.Duration() - time.Millisecond)),
})
klines = append(klines, kLineToGlobal(candle, interval, symbol))
}
return klines, nil

View File

@ -307,3 +307,12 @@ func TestClient_GetOrderDetailsRequest(t *testing.T) {
assert.NotEmpty(t, orderDetail)
t.Logf("order detail: %+v", orderDetail)
}
func TestClient_CandlesTicksRequest(t *testing.T) {
client := getTestClientOrSkip(t)
ctx := context.Background()
req := client.NewGetCandlesRequest().InstrumentID("BTC-USDT")
res, err := req.Do(ctx)
assert.NoError(t, err)
t.Log(res)
}

View File

@ -0,0 +1,137 @@
package okexapi
//go:generate -command GetRequest requestgen -method GET -responseType .APIResponse -responseDataField Data
//go:generate -command PostRequest requestgen -method POST -responseType .APIResponse -responseDataField Data
import (
"encoding/json"
"errors"
"fmt"
"time"
"github.com/c9s/requestgen"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type KLine struct {
StartTime types.MillisecondTimestamp
OpenPrice fixedpoint.Value
HighestPrice fixedpoint.Value
LowestPrice fixedpoint.Value
ClosePrice fixedpoint.Value
// Volume trading volume, with a unit of contract.
// If it is a derivatives contract, the value is the number of contracts.
// If it is SPOT/MARGIN, the value is the quantity in base currency.
Volume fixedpoint.Value
// VolumeInCurrency trading volume, with a unit of currency.
// If it is a derivatives contract, the value is the number of base currency.
// If it is SPOT/MARGIN, the value is the quantity in quote currency.
VolumeInCurrency fixedpoint.Value
// VolumeInCurrencyQuote Trading volume, the value is the quantity in quote currency
// e.g. The unit is USDT for BTC-USDT and BTC-USDT-SWAP;
// The unit is USD for BTC-USD-SWAP
// ** REMARK: To prevent overflow, we need to avoid unmarshaling it. **
//VolumeInCurrencyQuote fixedpoint.Value
// The state of candlesticks.
// 0 represents that it is uncompleted, 1 represents that it is completed.
Confirm fixedpoint.Value
}
type KLineSlice []KLine
func (m *KLineSlice) UnmarshalJSON(b []byte) error {
if m == nil {
return errors.New("nil pointer of kline slice")
}
s, err := parseKLineSliceJSON(b)
if err != nil {
return err
}
*m = s
return nil
}
// parseKLineSliceJSON tries to parse a 2 dimensional string array into a KLineSlice
//
// [
// [
// "1597026383085",
// "8533.02",
// "8553.74",
// "8527.17",
// "8548.26",
// "45247",
// "529.5858061",
// "5529.5858061",
// "0"
// ]
// ]
func parseKLineSliceJSON(in []byte) (slice KLineSlice, err error) {
var rawKLines [][]json.RawMessage
err = json.Unmarshal(in, &rawKLines)
if err != nil {
return slice, err
}
for _, raw := range rawKLines {
if len(raw) != 9 {
return nil, fmt.Errorf("unexpected kline length: %d, data: %q", len(raw), raw)
}
var kline KLine
if err = json.Unmarshal(raw[0], &kline.StartTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal into timestamp: %q", raw[0])
}
if err = json.Unmarshal(raw[1], &kline.OpenPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into open price: %q", raw[1])
}
if err = json.Unmarshal(raw[2], &kline.HighestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into highest price: %q", raw[2])
}
if err = json.Unmarshal(raw[3], &kline.LowestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into lowest price: %q", raw[3])
}
if err = json.Unmarshal(raw[4], &kline.ClosePrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into close price: %q", raw[4])
}
if err = json.Unmarshal(raw[5], &kline.Volume); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume: %q", raw[5])
}
if err = json.Unmarshal(raw[6], &kline.VolumeInCurrency); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume currency: %q", raw[6])
}
//if err = json.Unmarshal(raw[7], &kline.VolumeInCurrencyQuote); err != nil {
// return nil, fmt.Errorf("failed to unmarshal into trading currency quote: %q", raw[7])
//}
if err = json.Unmarshal(raw[8], &kline.Confirm); err != nil {
return nil, fmt.Errorf("failed to unmarshal into confirm: %q", raw[8])
}
slice = append(slice, kline)
}
return slice, nil
}
//go:generate GetRequest -url "/api/v5/market/candles" -type GetCandlesRequest -responseDataType KLineSlice
type GetCandlesRequest struct {
client requestgen.APIClient
instrumentID string `param:"instId,query"`
limit *int `param:"limit,query"`
bar *string `param:"bar,query"`
after *time.Time `param:"after,query,milliseconds"`
before *time.Time `param:"before,query,milliseconds"`
}
func (c *RestClient) NewGetCandlesRequest() *GetCandlesRequest {
return &GetCandlesRequest{client: c}
}

View File

@ -0,0 +1,226 @@
// Code generated by "requestgen -method GET -responseType .APIResponse -responseDataField Data -url /api/v5/market/candles -type GetCandlesRequest -responseDataType KLineSlice"; DO NOT EDIT.
package okexapi
import (
"context"
"encoding/json"
"fmt"
"net/url"
"reflect"
"regexp"
"strconv"
"time"
)
func (g *GetCandlesRequest) InstrumentID(instrumentID string) *GetCandlesRequest {
g.instrumentID = instrumentID
return g
}
func (g *GetCandlesRequest) Limit(limit int) *GetCandlesRequest {
g.limit = &limit
return g
}
func (g *GetCandlesRequest) Bar(bar string) *GetCandlesRequest {
g.bar = &bar
return g
}
func (g *GetCandlesRequest) After(after time.Time) *GetCandlesRequest {
g.after = &after
return g
}
func (g *GetCandlesRequest) Before(before time.Time) *GetCandlesRequest {
g.before = &before
return g
}
// GetQueryParameters builds and checks the query parameters and returns url.Values
func (g *GetCandlesRequest) GetQueryParameters() (url.Values, error) {
var params = map[string]interface{}{}
// check instrumentID field -> json key instId
instrumentID := g.instrumentID
// assign parameter of instrumentID
params["instId"] = instrumentID
// check limit field -> json key limit
if g.limit != nil {
limit := *g.limit
// assign parameter of limit
params["limit"] = limit
} else {
}
// check bar field -> json key bar
if g.bar != nil {
bar := *g.bar
// assign parameter of bar
params["bar"] = bar
} else {
}
// check after field -> json key after
if g.after != nil {
after := *g.after
// assign parameter of after
// convert time.Time to milliseconds time stamp
params["after"] = strconv.FormatInt(after.UnixNano()/int64(time.Millisecond), 10)
} else {
}
// check before field -> json key before
if g.before != nil {
before := *g.before
// assign parameter of before
// convert time.Time to milliseconds time stamp
params["before"] = strconv.FormatInt(before.UnixNano()/int64(time.Millisecond), 10)
} else {
}
query := url.Values{}
for _k, _v := range params {
query.Add(_k, fmt.Sprintf("%v", _v))
}
return query, nil
}
// GetParameters builds and checks the parameters and return the result in a map object
func (g *GetCandlesRequest) GetParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
// GetParametersQuery converts the parameters from GetParameters into the url.Values format
func (g *GetCandlesRequest) GetParametersQuery() (url.Values, error) {
query := url.Values{}
params, err := g.GetParameters()
if err != nil {
return query, err
}
for _k, _v := range params {
if g.isVarSlice(_v) {
g.iterateSlice(_v, func(it interface{}) {
query.Add(_k+"[]", fmt.Sprintf("%v", it))
})
} else {
query.Add(_k, fmt.Sprintf("%v", _v))
}
}
return query, nil
}
// GetParametersJSON converts the parameters from GetParameters into the JSON format
func (g *GetCandlesRequest) GetParametersJSON() ([]byte, error) {
params, err := g.GetParameters()
if err != nil {
return nil, err
}
return json.Marshal(params)
}
// GetSlugParameters builds and checks the slug parameters and return the result in a map object
func (g *GetCandlesRequest) GetSlugParameters() (map[string]interface{}, error) {
var params = map[string]interface{}{}
return params, nil
}
func (g *GetCandlesRequest) applySlugsToUrl(url string, slugs map[string]string) string {
for _k, _v := range slugs {
needleRE := regexp.MustCompile(":" + _k + "\\b")
url = needleRE.ReplaceAllString(url, _v)
}
return url
}
func (g *GetCandlesRequest) iterateSlice(slice interface{}, _f func(it interface{})) {
sliceValue := reflect.ValueOf(slice)
for _i := 0; _i < sliceValue.Len(); _i++ {
it := sliceValue.Index(_i).Interface()
_f(it)
}
}
func (g *GetCandlesRequest) isVarSlice(_v interface{}) bool {
rt := reflect.TypeOf(_v)
switch rt.Kind() {
case reflect.Slice:
return true
}
return false
}
func (g *GetCandlesRequest) GetSlugsMap() (map[string]string, error) {
slugs := map[string]string{}
params, err := g.GetSlugParameters()
if err != nil {
return slugs, nil
}
for _k, _v := range params {
slugs[_k] = fmt.Sprintf("%v", _v)
}
return slugs, nil
}
// GetPath returns the request path of the API
func (g *GetCandlesRequest) GetPath() string {
return "/api/v5/market/candles"
}
// Do generates the request object and send the request object to the API endpoint
func (g *GetCandlesRequest) Do(ctx context.Context) (KLineSlice, error) {
// no body params
var params interface{}
query, err := g.GetQueryParameters()
if err != nil {
return nil, err
}
var apiURL string
apiURL = g.GetPath()
req, err := g.client.NewRequest(ctx, "GET", apiURL, query, params)
if err != nil {
return nil, err
}
response, err := g.client.SendRequest(req)
if err != nil {
return nil, err
}
var apiResponse APIResponse
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
type responseValidator interface {
Validate() error
}
validator, ok := interface{}(apiResponse).(responseValidator)
if ok {
if err := validator.Validate(); err != nil {
return nil, err
}
}
var data KLineSlice
if err := json.Unmarshal(apiResponse.Data, &data); err != nil {
return nil, err
}
return data, nil
}

View File

@ -5,163 +5,8 @@ import (
"encoding/json"
"fmt"
"net/url"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
type Candle struct {
InstrumentID string
Interval string
Time time.Time
Open fixedpoint.Value
High fixedpoint.Value
Low fixedpoint.Value
Close fixedpoint.Value
Volume fixedpoint.Value
VolumeInCurrency fixedpoint.Value
}
type CandlesticksRequest struct {
client *RestClient
instId string `param:"instId"`
limit *int `param:"limit"`
bar *string `param:"bar"`
after *int64 `param:"after,seconds"`
before *int64 `param:"before,seconds"`
}
func (r *CandlesticksRequest) After(after int64) *CandlesticksRequest {
r.after = &after
return r
}
func (r *CandlesticksRequest) Before(before int64) *CandlesticksRequest {
r.before = &before
return r
}
func (r *CandlesticksRequest) Bar(bar string) *CandlesticksRequest {
r.bar = &bar
return r
}
func (r *CandlesticksRequest) Limit(limit int) *CandlesticksRequest {
r.limit = &limit
return r
}
func (r *CandlesticksRequest) InstrumentID(instId string) *CandlesticksRequest {
r.instId = instId
return r
}
func (r *CandlesticksRequest) Do(ctx context.Context) ([]Candle, error) {
// SPOT, SWAP, FUTURES, OPTION
var params = url.Values{}
params.Add("instId", r.instId)
if r.bar != nil {
params.Add("bar", *r.bar)
}
if r.before != nil {
params.Add("before", strconv.FormatInt(*r.before, 10))
}
if r.after != nil {
params.Add("after", strconv.FormatInt(*r.after, 10))
}
if r.limit != nil {
params.Add("limit", strconv.Itoa(*r.limit))
}
req, err := r.client.NewRequest(ctx, "GET", "/api/v5/market/candles", params, nil)
if err != nil {
return nil, err
}
response, err := r.client.SendRequest(req)
if err != nil {
return nil, err
}
type candleEntry [7]string
var apiResponse APIResponse
if err := response.DecodeJSON(&apiResponse); err != nil {
return nil, err
}
var data []candleEntry
if err := json.Unmarshal(apiResponse.Data, &data); err != nil {
return nil, err
}
var candles []Candle
for _, entry := range data {
timestamp, err := strconv.ParseInt(entry[0], 10, 64)
if err != nil {
return candles, err
}
open, err := fixedpoint.NewFromString(entry[1])
if err != nil {
return candles, err
}
high, err := fixedpoint.NewFromString(entry[2])
if err != nil {
return candles, err
}
low, err := fixedpoint.NewFromString(entry[3])
if err != nil {
return candles, err
}
cls, err := fixedpoint.NewFromString(entry[4])
if err != nil {
return candles, err
}
vol, err := fixedpoint.NewFromString(entry[5])
if err != nil {
return candles, err
}
volCcy, err := fixedpoint.NewFromString(entry[6])
if err != nil {
return candles, err
}
var interval = "1m"
if r.bar != nil {
interval = *r.bar
}
candles = append(candles, Candle{
InstrumentID: r.instId,
Interval: interval,
Time: time.Unix(0, timestamp*int64(time.Millisecond)),
Open: open,
High: high,
Low: low,
Close: cls,
Volume: vol,
VolumeInCurrency: volCcy,
})
}
return candles, nil
}
type MarketTickersRequest struct {
client *RestClient
@ -255,10 +100,3 @@ func (c *RestClient) NewMarketTickersRequest(instType string) *MarketTickersRequ
instType: instType,
}
}
func (c *RestClient) NewCandlesticksRequest(instId string) *CandlesticksRequest {
return &CandlesticksRequest{
client: c,
instId: instId,
}
}

View File

@ -2,7 +2,6 @@ package okex
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
@ -243,31 +242,7 @@ func ParsePriceVolumeOrderSliceJSON(b []byte) (slice PriceVolumeOrderSlice, err
return slice, nil
}
type KLine struct {
StartTime types.MillisecondTimestamp
OpenPrice fixedpoint.Value
HighestPrice fixedpoint.Value
LowestPrice fixedpoint.Value
ClosePrice fixedpoint.Value
// Volume trading volume, with a unit of contract.cccccbcvefkeibbhtrebbfklrbetukhrgjgkiilufbde
// If it is a derivatives contract, the value is the number of contracts.
// If it is SPOT/MARGIN, the value is the quantity in base currency.
Volume fixedpoint.Value
// VolumeCcy trading volume, with a unit of currency.
// If it is a derivatives contract, the value is the number of base currency.
// If it is SPOT/MARGIN, the value is the quantity in quote currency.
VolumeCcy fixedpoint.Value
// VolumeCcyQuote Trading volume, the value is the quantity in quote currency
// e.g. The unit is USDT for BTC-USDT and BTC-USDT-SWAP;
// The unit is USD for BTC-USD-SWAP
VolumeCcyQuote fixedpoint.Value
// The state of candlesticks.
// 0 represents that it is uncompleted, 1 represents that it is completed.
Confirm fixedpoint.Value
}
func (k KLine) ToGlobal(interval types.Interval, symbol string) types.KLine {
func kLineToGlobal(k okexapi.KLine, interval types.Interval, symbol string) types.KLine {
startTime := k.StartTime.Time()
return types.KLine{
@ -281,7 +256,7 @@ func (k KLine) ToGlobal(interval types.Interval, symbol string) types.KLine {
High: k.HighestPrice,
Low: k.LowestPrice,
Volume: k.Volume,
QuoteVolume: k.VolumeCcy, // not supported
QuoteVolume: k.VolumeInCurrency, // not supported
TakerBuyBaseAssetVolume: fixedpoint.Zero, // not supported
TakerBuyQuoteAssetVolume: fixedpoint.Zero, // not supported
LastTradeID: 0, // not supported
@ -290,85 +265,8 @@ func (k KLine) ToGlobal(interval types.Interval, symbol string) types.KLine {
}
}
type KLineSlice []KLine
func (m *KLineSlice) UnmarshalJSON(b []byte) error {
if m == nil {
return errors.New("nil pointer of kline slice")
}
s, err := parseKLineSliceJSON(b)
if err != nil {
return err
}
*m = s
return nil
}
// parseKLineSliceJSON tries to parse a 2 dimensional string array into a KLineSlice
//
// [
// [
// "1597026383085",
// "8533.02",
// "8553.74",
// "8527.17",
// "8548.26",
// "45247",
// "529.5858061",
// "5529.5858061",
// "0"
// ]
// ]
func parseKLineSliceJSON(in []byte) (slice KLineSlice, err error) {
var rawKLines [][]json.RawMessage
err = json.Unmarshal(in, &rawKLines)
if err != nil {
return slice, err
}
for _, raw := range rawKLines {
if len(raw) != 9 {
return nil, fmt.Errorf("unexpected kline length: %d, data: %q", len(raw), raw)
}
var kline KLine
if err = json.Unmarshal(raw[0], &kline.StartTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal into timestamp: %q", raw[0])
}
if err = json.Unmarshal(raw[1], &kline.OpenPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into open price: %q", raw[1])
}
if err = json.Unmarshal(raw[2], &kline.HighestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into highest price: %q", raw[2])
}
if err = json.Unmarshal(raw[3], &kline.LowestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into lowest price: %q", raw[3])
}
if err = json.Unmarshal(raw[4], &kline.ClosePrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into close price: %q", raw[4])
}
if err = json.Unmarshal(raw[5], &kline.Volume); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume: %q", raw[5])
}
if err = json.Unmarshal(raw[6], &kline.VolumeCcy); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume currency: %q", raw[6])
}
if err = json.Unmarshal(raw[7], &kline.VolumeCcyQuote); err != nil {
return nil, fmt.Errorf("failed to unmarshal into trading currency quote: %q", raw[7])
}
if err = json.Unmarshal(raw[8], &kline.Confirm); err != nil {
return nil, fmt.Errorf("failed to unmarshal into confirm: %q", raw[8])
}
slice = append(slice, kline)
}
return slice, nil
}
type KLineEvent struct {
Events KLineSlice
Events okexapi.KLineSlice
InstrumentID string
Symbol string

View File

@ -352,7 +352,7 @@ func Test_parseKLineSliceJSON(t *testing.T) {
}
`
exp := &KLineEvent{
Events: KLineSlice{
Events: okexapi.KLineSlice{
{
StartTime: types.NewMillisecondTimestampFromInt(1597026383085),
OpenPrice: fixedpoint.NewFromFloat(8533),
@ -360,8 +360,8 @@ func Test_parseKLineSliceJSON(t *testing.T) {
LowestPrice: fixedpoint.NewFromFloat(8527.17),
ClosePrice: fixedpoint.NewFromFloat(8548.26),
Volume: fixedpoint.NewFromFloat(45247),
VolumeCcy: fixedpoint.NewFromFloat(529.5858061),
VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061),
VolumeInCurrency: fixedpoint.NewFromFloat(529.5858061),
//VolumeInCurrencyQuote: fixedpoint.NewFromFloat(529.5858061),
Confirm: fixedpoint.Zero,
},
},
@ -651,7 +651,7 @@ func TestKLine_ToGlobal(t *testing.T) {
}
`
exp := &KLineEvent{
Events: KLineSlice{
Events: okexapi.KLineSlice{
{
StartTime: types.NewMillisecondTimestampFromInt(1597026383085),
OpenPrice: fixedpoint.NewFromFloat(8533),
@ -659,8 +659,8 @@ func TestKLine_ToGlobal(t *testing.T) {
LowestPrice: fixedpoint.NewFromFloat(8527.17),
ClosePrice: fixedpoint.NewFromFloat(8548.26),
Volume: fixedpoint.NewFromFloat(45247),
VolumeCcy: fixedpoint.NewFromFloat(529.5858061),
VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061),
VolumeInCurrency: fixedpoint.NewFromFloat(529.5858061),
//VolumeInCurrencyQuote: fixedpoint.NewFromFloat(529.5858061),
Confirm: fixedpoint.Zero,
},
},
@ -686,13 +686,13 @@ func TestKLine_ToGlobal(t *testing.T) {
High: exp.Events[0].HighestPrice,
Low: exp.Events[0].LowestPrice,
Volume: exp.Events[0].Volume,
QuoteVolume: exp.Events[0].VolumeCcy,
QuoteVolume: exp.Events[0].VolumeInCurrency,
TakerBuyBaseAssetVolume: fixedpoint.Zero,
TakerBuyQuoteAssetVolume: fixedpoint.Zero,
LastTradeID: 0,
NumberOfTrades: 0,
Closed: false,
}, event.Events[0].ToGlobal(types.Interval(event.Interval), event.Symbol))
}, kLineToGlobal(event.Events[0], types.Interval(event.Interval), event.Symbol))
})
}

View File

@ -252,7 +252,7 @@ func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) {
func (s *Stream) handleKLineEvent(k KLineEvent) {
for _, event := range k.Events {
kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol)
kline := kLineToGlobal(event, types.Interval(k.Interval), k.Symbol)
if kline.Closed {
s.EmitKLineClosed(kline)
} else {