pkg/exchange: refactor book and kline

This commit is contained in:
Edwin 2024-01-02 23:18:57 +08:00
parent f2d5731acf
commit 0b906606fe
5 changed files with 875 additions and 285 deletions

View File

@ -4,7 +4,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"time" "time"
@ -15,18 +14,78 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
func parseWebSocketEvent(str []byte) (interface{}, error) { type Channel string
v, err := fastjson.ParseBytes(str)
const (
ChannelBooks Channel = "books"
ChannelBook5 Channel = "book5"
ChannelCandlePrefix Channel = "candle"
ChannelAccount Channel = "account"
ChannelOrders Channel = "orders"
)
type ActionType string
const (
ActionTypeSnapshot ActionType = "snapshot"
ActionTypeUpdate ActionType = "update"
)
func parseWebSocketEvent(in []byte) (interface{}, error) {
v, err := fastjson.ParseBytes(in)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if v.Exists("event") { var event WebSocketEvent
return parseEvent(v) err = json.Unmarshal(in, &event)
if err != nil {
return nil, err
}
if event.Event != "" {
// TODO: remove fastjson
return event, nil
} }
if v.Exists("data") { switch event.Arg.Channel {
return parseData(v) case ChannelAccount:
// TODO: remove fastjson
return parseAccount(v)
case ChannelBooks, ChannelBook5:
var bookEvent BookEvent
err = json.Unmarshal(event.Data, &bookEvent.Data)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into BookEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
}
instId := event.Arg.InstId
bookEvent.InstrumentID = instId
bookEvent.Symbol = toGlobalSymbol(instId)
bookEvent.channel = event.Arg.Channel
bookEvent.Action = event.ActionType
return &bookEvent, nil
case ChannelOrders:
// TODO: remove fastjson
return parseOrder(v)
default:
if strings.HasPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix)) {
// TODO: Support kline subscription. The kline requires another URL to subscribe, which is why we cannot
// support it at this time.
var kLineEvt KLineEvent
err = json.Unmarshal(event.Data, &kLineEvt.Events)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into KLineEvent, Arg: %+v Data: %s, err: %w", event.Arg, string(event.Data), err)
}
kLineEvt.Channel = event.Arg.Channel
kLineEvt.InstrumentID = event.Arg.InstId
kLineEvt.Symbol = toGlobalSymbol(event.Arg.InstId)
kLineEvt.Interval = strings.ToLower(strings.TrimPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix)))
return &kLineEvt, nil
}
} }
return nil, nil return nil, nil
@ -36,58 +95,58 @@ type WebSocketEvent struct {
Event string `json:"event"` Event string `json:"event"`
Code string `json:"code,omitempty"` Code string `json:"code,omitempty"`
Message string `json:"msg,omitempty"` Message string `json:"msg,omitempty"`
Arg interface{} `json:"arg,omitempty"` Arg struct {
} Channel Channel `json:"channel"`
InstId string `json:"instId"`
func parseEvent(v *fastjson.Value) (*WebSocketEvent, error) { } `json:"arg,omitempty"`
// event could be "subscribe", "unsubscribe" or "error" Data json.RawMessage `json:"data"`
event := string(v.GetStringBytes("event")) ActionType ActionType `json:"action"`
code := string(v.GetStringBytes("code"))
message := string(v.GetStringBytes("msg"))
arg := v.GetObject("arg")
return &WebSocketEvent{
Event: event,
Code: code,
Message: message,
Arg: arg,
}, nil
} }
type BookEvent struct { type BookEvent struct {
InstrumentID string InstrumentID string
Symbol string Symbol string
Action string Action ActionType
Bids []BookEntry channel Channel
Asks []BookEntry
MillisecondTimestamp int64 Data []struct {
Checksum int Bids PriceVolumeOrderSlice `json:"bids"`
channel string Asks PriceVolumeOrderSlice `json:"asks"`
MillisecondTimestamp types.MillisecondTimestamp `json:"ts"`
Checksum int `json:"checksum"`
}
} }
func (data *BookEvent) BookTicker() types.BookTicker { func (event *BookEvent) BookTicker() types.BookTicker {
ticker := types.BookTicker{ ticker := types.BookTicker{
Symbol: data.Symbol, Symbol: event.Symbol,
} }
if len(data.Bids) > 0 { if len(event.Data) > 0 {
ticker.Buy = data.Bids[0].Price if len(event.Data[0].Bids) > 0 {
ticker.BuySize = data.Bids[0].Volume ticker.Buy = event.Data[0].Bids[0].Price
ticker.BuySize = event.Data[0].Bids[0].Volume
} }
if len(data.Asks) > 0 { if len(event.Data[0].Asks) > 0 {
ticker.Sell = data.Asks[0].Price ticker.Sell = event.Data[0].Asks[0].Price
ticker.SellSize = data.Asks[0].Volume ticker.SellSize = event.Data[0].Asks[0].Volume
}
} }
return ticker return ticker
} }
func (data *BookEvent) Book() types.SliceOrderBook { func (event *BookEvent) Book() types.SliceOrderBook {
book := types.SliceOrderBook{ book := types.SliceOrderBook{
Symbol: data.Symbol, Symbol: event.Symbol,
Time: types.NewMillisecondTimestampFromInt(data.MillisecondTimestamp).Time(),
} }
if len(event.Data) > 0 {
book.Time = event.Data[0].MillisecondTimestamp.Time()
}
for _, data := range event.Data {
for _, bid := range data.Bids { for _, bid := range data.Bids {
book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume}) book.Bids = append(book.Bids, types.PriceVolume{Price: bid.Price, Volume: bid.Volume})
} }
@ -95,208 +154,186 @@ func (data *BookEvent) Book() types.SliceOrderBook {
for _, ask := range data.Asks { for _, ask := range data.Asks {
book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume}) book.Asks = append(book.Asks, types.PriceVolume{Price: ask.Price, Volume: ask.Volume})
} }
}
return book return book
} }
type BookEntry struct { type PriceVolumeOrder struct {
Price fixedpoint.Value types.PriceVolume
Volume fixedpoint.Value // NumLiquidated is part of a deprecated feature and it is always "0"
NumLiquidated int NumLiquidated int
// NumOrders is the number of orders at the price.
NumOrders int NumOrders int
} }
func parseBookEntry(v *fastjson.Value) (*BookEntry, error) { type PriceVolumeOrderSlice []PriceVolumeOrder
arr, err := v.Array()
func (slice *PriceVolumeOrderSlice) UnmarshalJSON(b []byte) error {
s, err := ParsePriceVolumeOrderSliceJSON(b)
if err != nil { if err != nil {
return nil, err return err
} }
if len(arr) < 4 { *slice = s
return nil, fmt.Errorf("unexpected book entry size: %d", len(arr)) return nil
}
price := fixedpoint.Must(fixedpoint.NewFromString(string(arr[0].GetStringBytes())))
volume := fixedpoint.Must(fixedpoint.NewFromString(string(arr[1].GetStringBytes())))
numLiquidated, err := strconv.Atoi(string(arr[2].GetStringBytes()))
if err != nil {
return nil, err
}
numOrders, err := strconv.Atoi(string(arr[3].GetStringBytes()))
if err != nil {
return nil, err
}
return &BookEntry{
Price: price,
Volume: volume,
NumLiquidated: numLiquidated,
NumOrders: numOrders,
}, nil
} }
func parseBookData(v *fastjson.Value) (*BookEvent, error) { // ParsePriceVolumeOrderSliceJSON tries to parse a 2 dimensional string array into a PriceVolumeOrderSlice
instrumentId := string(v.GetStringBytes("arg", "instId")) //
data := v.GetArray("data") // [["8476.98", "415", "0", "13"], ["8477", "7", "0", "2"], ... ]
if len(data) == 0 { func ParsePriceVolumeOrderSliceJSON(b []byte) (slice PriceVolumeOrderSlice, err error) {
return nil, errors.New("empty data payload") var as [][]fixedpoint.Value
}
// "snapshot" or "update" err = json.Unmarshal(b, &as)
action := string(v.GetStringBytes("action"))
millisecondTimestamp, err := strconv.ParseInt(string(data[0].GetStringBytes("ts")), 10, 64)
if err != nil { if err != nil {
return nil, err return slice, fmt.Errorf("failed to unmarshal price volume order slice: %w", err)
} }
checksum := data[0].GetInt("checksum") for _, a := range as {
var pv PriceVolumeOrder
pv.Price = a[0]
pv.Volume = a[1]
pv.NumLiquidated = a[2].Int()
pv.NumOrders = a[3].Int()
var asks []BookEntry slice = append(slice, pv)
var bids []BookEntry
for _, v := range data[0].GetArray("asks") {
entry, err := parseBookEntry(v)
if err != nil {
return nil, err
}
asks = append(asks, *entry)
} }
for _, v := range data[0].GetArray("bids") { return slice, nil
entry, err := parseBookEntry(v)
if err != nil {
return nil, err
}
bids = append(bids, *entry)
}
return &BookEvent{
InstrumentID: instrumentId,
Symbol: toGlobalSymbol(instrumentId),
Action: action,
Bids: bids,
Asks: asks,
Checksum: checksum,
MillisecondTimestamp: millisecondTimestamp,
}, nil
} }
type Candle struct { type KLine struct {
Channel string StartTime types.MillisecondTimestamp
OpenPrice fixedpoint.Value
HighestPrice fixedpoint.Value
LowestPrice fixedpoint.Value
ClosePrice fixedpoint.Value
// Volume trading volume, with a unit of contract.cccccbcvefkeibbhtrebbfklrbetukhrgjgkiilufbde
// If it is a derivatives contract, the value is the number of contracts.
// If it is SPOT/MARGIN, the value is the quantity in base currency.
Volume fixedpoint.Value
// VolumeCcy trading volume, with a unit of currency.
// If it is a derivatives contract, the value is the number of base currency.
// If it is SPOT/MARGIN, the value is the quantity in quote currency.
VolumeCcy fixedpoint.Value
// VolumeCcyQuote Trading volume, the value is the quantity in quote currency
// e.g. The unit is USDT for BTC-USDT and BTC-USDT-SWAP;
// The unit is USD for BTC-USD-SWAP
VolumeCcyQuote fixedpoint.Value
// The state of candlesticks.
// 0 represents that it is uncompleted, 1 represents that it is completed.
Confirm fixedpoint.Value
}
func (k KLine) ToGlobal(interval types.Interval, symbol string) types.KLine {
startTime := k.StartTime.Time()
return types.KLine{
Exchange: types.ExchangeOKEx,
Symbol: symbol,
StartTime: types.Time(startTime),
EndTime: types.Time(startTime.Add(interval.Duration() - time.Millisecond)),
Interval: interval,
Open: k.OpenPrice,
Close: k.ClosePrice,
High: k.HighestPrice,
Low: k.LowestPrice,
Volume: k.Volume,
QuoteVolume: k.VolumeCcy, // not supported
TakerBuyBaseAssetVolume: fixedpoint.Zero, // not supported
TakerBuyQuoteAssetVolume: fixedpoint.Zero, // not supported
LastTradeID: 0, // not supported
NumberOfTrades: 0, // not supported
Closed: !k.Confirm.IsZero(),
}
}
type KLineSlice []KLine
func (m *KLineSlice) UnmarshalJSON(b []byte) error {
if m == nil {
return errors.New("nil pointer of kline slice")
}
s, err := parseKLineSliceJSON(b)
if err != nil {
return err
}
*m = s
return nil
}
// parseKLineSliceJSON tries to parse a 2 dimensional string array into a KLineSlice
//
// [
// [
// "1597026383085",
// "8533.02",
// "8553.74",
// "8527.17",
// "8548.26",
// "45247",
// "529.5858061",
// "5529.5858061",
// "0"
// ]
// ]
func parseKLineSliceJSON(in []byte) (slice KLineSlice, err error) {
var rawKLines [][]json.RawMessage
err = json.Unmarshal(in, &rawKLines)
if err != nil {
return slice, err
}
for _, raw := range rawKLines {
if len(raw) != 9 {
return nil, fmt.Errorf("unexpected kline length: %d, data: %q", len(raw), raw)
}
var kline KLine
if err = json.Unmarshal(raw[0], &kline.StartTime); err != nil {
return nil, fmt.Errorf("failed to unmarshal into timestamp: %q", raw[0])
}
if err = json.Unmarshal(raw[1], &kline.OpenPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into open price: %q", raw[1])
}
if err = json.Unmarshal(raw[2], &kline.HighestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into highest price: %q", raw[2])
}
if err = json.Unmarshal(raw[3], &kline.LowestPrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into lowest price: %q", raw[3])
}
if err = json.Unmarshal(raw[4], &kline.ClosePrice); err != nil {
return nil, fmt.Errorf("failed to unmarshal into close price: %q", raw[4])
}
if err = json.Unmarshal(raw[5], &kline.Volume); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume: %q", raw[5])
}
if err = json.Unmarshal(raw[6], &kline.VolumeCcy); err != nil {
return nil, fmt.Errorf("failed to unmarshal into volume currency: %q", raw[6])
}
if err = json.Unmarshal(raw[7], &kline.VolumeCcyQuote); err != nil {
return nil, fmt.Errorf("failed to unmarshal into trading currency quote: %q", raw[7])
}
if err = json.Unmarshal(raw[8], &kline.Confirm); err != nil {
return nil, fmt.Errorf("failed to unmarshal into confirm: %q", raw[8])
}
slice = append(slice, kline)
}
return slice, nil
}
type KLineEvent struct {
Events KLineSlice
InstrumentID string InstrumentID string
Symbol string Symbol string
Interval string Interval string
Open fixedpoint.Value Channel Channel
High fixedpoint.Value
Low fixedpoint.Value
Close fixedpoint.Value
// Trading volume, with a unit of contact.
// If it is a derivatives contract, the value is the number of contracts.
// If it is SPOT/MARGIN, the value is the amount of trading currency.
Volume fixedpoint.Value
// Trading volume, with a unit of currency.
// If it is a derivatives contract, the value is the number of settlement currency.
// If it is SPOT/MARGIN, the value is the number of quote currency.
VolumeInCurrency fixedpoint.Value
MillisecondTimestamp int64
StartTime time.Time
}
func (c *Candle) KLine() types.KLine {
interval := types.Interval(c.Interval)
endTime := c.StartTime.Add(interval.Duration() - 1*time.Millisecond)
return types.KLine{
Exchange: types.ExchangeOKEx,
Interval: interval,
Open: c.Open,
High: c.High,
Low: c.Low,
Close: c.Close,
Volume: c.Volume,
QuoteVolume: c.VolumeInCurrency,
StartTime: types.Time(c.StartTime),
EndTime: types.Time(endTime),
}
}
func parseCandle(channel string, v *fastjson.Value) (*Candle, error) {
instrumentID := string(v.GetStringBytes("arg", "instId"))
data, err := v.Get("data").Array()
if err != nil {
return nil, err
}
if len(data) == 0 {
return nil, errors.New("candle data is empty")
}
arr, err := data[0].Array()
if err != nil {
return nil, err
}
if len(arr) < 7 {
return nil, fmt.Errorf("unexpected candle data length: %d", len(arr))
}
interval := strings.ToLower(strings.TrimPrefix(channel, "candle"))
timestamp, err := strconv.ParseInt(string(arr[0].GetStringBytes()), 10, 64)
if err != nil {
return nil, err
}
open, err := fixedpoint.NewFromString(string(arr[1].GetStringBytes()))
if err != nil {
return nil, err
}
high, err := fixedpoint.NewFromString(string(arr[2].GetStringBytes()))
if err != nil {
return nil, err
}
low, err := fixedpoint.NewFromString(string(arr[3].GetStringBytes()))
if err != nil {
return nil, err
}
cls, err := fixedpoint.NewFromString(string(arr[4].GetStringBytes()))
if err != nil {
return nil, err
}
vol, err := fixedpoint.NewFromString(string(arr[5].GetStringBytes()))
if err != nil {
return nil, err
}
volCurrency, err := fixedpoint.NewFromString(string(arr[6].GetStringBytes()))
if err != nil {
return nil, err
}
candleTime := time.Unix(0, timestamp*int64(time.Millisecond))
return &Candle{
Channel: channel,
InstrumentID: instrumentID,
Symbol: toGlobalSymbol(instrumentID),
Interval: interval,
Open: open,
High: high,
Low: low,
Close: cls,
Volume: vol,
VolumeInCurrency: volCurrency,
MillisecondTimestamp: timestamp,
StartTime: candleTime,
}, nil
} }
func parseAccount(v *fastjson.Value) (*okexapi.Account, error) { func parseAccount(v *fastjson.Value) (*okexapi.Account, error) {
@ -326,31 +363,3 @@ func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) {
return orderDetails, nil return orderDetails, nil
} }
func parseData(v *fastjson.Value) (interface{}, error) {
channel := string(v.GetStringBytes("arg", "channel"))
switch channel {
case "books5":
data, err := parseBookData(v)
data.channel = channel
return data, err
case "books":
data, err := parseBookData(v)
data.channel = channel
return data, err
case "account":
return parseAccount(v)
case "orders":
return parseOrder(v)
default:
if strings.HasPrefix(channel, "candle") {
data, err := parseCandle(channel, v)
return data, err
}
}
return nil, nil
}

View File

@ -0,0 +1,577 @@
package okex
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func TestParsePriceVolumeOrderSliceJSON(t *testing.T) {
t.Run("snapshot", func(t *testing.T) {
in := `
{
"arg": {
"channel": "books",
"instId": "BTC-USDT"
},
"action": "snapshot",
"data": [
{
"asks": [
["8476.98", "415", "0", "13"],
["8477", "7", "0", "2"]
],
"bids": [
["8476", "256", "0", "12"]
],
"ts": "1597026383085",
"checksum": -855196043,
"prevSeqId": -1,
"seqId": 123456
}
]
}
`
asks := PriceVolumeOrderSlice{
{
PriceVolume: types.PriceVolume{
Price: fixedpoint.NewFromFloat(8476.98),
Volume: fixedpoint.NewFromFloat(415),
},
NumLiquidated: fixedpoint.Zero.Int(),
NumOrders: fixedpoint.NewFromFloat(13).Int(),
},
{
PriceVolume: types.PriceVolume{
Price: fixedpoint.NewFromFloat(8477),
Volume: fixedpoint.NewFromFloat(7),
},
NumLiquidated: fixedpoint.Zero.Int(),
NumOrders: fixedpoint.NewFromFloat(2).Int(),
},
}
bids := PriceVolumeOrderSlice{
{
PriceVolume: types.PriceVolume{
Price: fixedpoint.NewFromFloat(8476),
Volume: fixedpoint.NewFromFloat(256),
},
NumLiquidated: fixedpoint.Zero.Int(),
NumOrders: fixedpoint.NewFromFloat(12).Int(),
},
}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*BookEvent)
assert.True(t, ok)
assert.Equal(t, "BTCUSDT", event.Symbol)
assert.Equal(t, ChannelBooks, event.channel)
assert.Equal(t, ActionTypeSnapshot, event.Action)
assert.Len(t, event.Data, 1)
assert.Len(t, event.Data[0].Asks, 2)
assert.Equal(t, asks, event.Data[0].Asks)
assert.Len(t, event.Data[0].Bids, 1)
assert.Equal(t, bids, event.Data[0].Bids)
})
t.Run("unexpected asks", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "books",
"instId": "BTC-USDT"
},
"action": "snapshot",
"data": [
{
"asks": [
["XYZ", "415", "0", "13"]
],
"bids": [
["8476", "256", "0", "12"]
],
"ts": "1597026383085",
"checksum": -855196043,
"prevSeqId": -1,
"seqId": 123456
}
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "price volume order")
})
}
func TestBookEvent_BookTicker(t *testing.T) {
in := `
{
"arg": {
"channel": "books",
"instId": "BTC-USDT"
},
"action": "snapshot",
"data": [
{
"asks": [
["8476.98", "415", "0", "13"],
["8477", "7", "0", "2"]
],
"bids": [
["8476", "256", "0", "12"]
],
"ts": "1597026383085",
"checksum": -855196043,
"prevSeqId": -1,
"seqId": 123456
}
]
}
`
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*BookEvent)
assert.True(t, ok)
ticker := event.BookTicker()
assert.Equal(t, types.BookTicker{
Symbol: "BTCUSDT",
Buy: fixedpoint.NewFromFloat(8476),
BuySize: fixedpoint.NewFromFloat(256),
Sell: fixedpoint.NewFromFloat(8476.98),
SellSize: fixedpoint.NewFromFloat(415),
}, ticker)
}
func TestBookEvent_Book(t *testing.T) {
in := `
{
"arg": {
"channel": "books",
"instId": "BTC-USDT"
},
"action": "snapshot",
"data": [
{
"asks": [
["8476.98", "415", "0", "13"],
["8477", "7", "0", "2"]
],
"bids": [
["8476", "256", "0", "12"]
],
"ts": "1597026383085",
"checksum": -855196043,
"prevSeqId": -1,
"seqId": 123456
}
]
}
`
bids := types.PriceVolumeSlice{
{
Price: fixedpoint.NewFromFloat(8476),
Volume: fixedpoint.NewFromFloat(256),
},
}
asks := types.PriceVolumeSlice{
{
Price: fixedpoint.NewFromFloat(8476.98),
Volume: fixedpoint.NewFromFloat(415),
},
{
Price: fixedpoint.NewFromFloat(8477),
Volume: fixedpoint.NewFromFloat(7),
},
}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*BookEvent)
assert.True(t, ok)
book := event.Book()
assert.Equal(t, types.SliceOrderBook{
Symbol: "BTCUSDT",
Time: types.NewMillisecondTimestampFromInt(1597026383085).Time(),
Bids: bids,
Asks: asks,
}, book)
}
func Test_parseKLineSliceJSON(t *testing.T) {
t.Run("snapshot", func(t *testing.T) {
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
exp := &KLineEvent{
Events: KLineSlice{
{
StartTime: types.NewMillisecondTimestampFromInt(1597026383085),
OpenPrice: fixedpoint.NewFromFloat(8533),
HighestPrice: fixedpoint.NewFromFloat(8553.74),
LowestPrice: fixedpoint.NewFromFloat(8527.17),
ClosePrice: fixedpoint.NewFromFloat(8548.26),
Volume: fixedpoint.NewFromFloat(45247),
VolumeCcy: fixedpoint.NewFromFloat(529.5858061),
VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061),
Confirm: fixedpoint.Zero,
},
},
InstrumentID: "BTC-USDT",
Symbol: "BTCUSDT",
Interval: "1d",
Channel: "candle1D",
}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*KLineEvent)
assert.True(t, ok)
assert.Len(t, event.Events, 1)
assert.Equal(t, exp, event)
})
t.Run("failed to convert timestamp", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"x",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "timestamp")
})
t.Run("failed to convert open price", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"x",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "open price")
})
t.Run("failed to convert highest price", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"x",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "highest price")
})
t.Run("failed to convert lowest price", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"x",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "lowest price")
})
t.Run("failed to convert close price", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"x",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "close price")
})
t.Run("failed to convert volume", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"x",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "volume")
})
t.Run("failed to convert volume currency", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"x",
"529.5858061",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "volume currency")
})
t.Run("failed to convert trading currency quote ", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"x",
"0"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "trading currency")
})
t.Run("failed to convert confirm", func(t *testing.T) {
t.Skip("this will cause panic, so i skip it")
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"g"
]
]
}
`
_, err := parseWebSocketEvent([]byte(in))
assert.ErrorContains(t, err, "confirm")
})
}
func TestKLine_ToGlobal(t *testing.T) {
t.Run("snapshot", func(t *testing.T) {
in := `
{
"arg": {
"channel": "candle1D",
"instId": "BTC-USDT"
},
"data": [
[
"1597026383085",
"8533",
"8553.74",
"8527.17",
"8548.26",
"45247",
"529.5858061",
"529.5858061",
"0"
]
]
}
`
exp := &KLineEvent{
Events: KLineSlice{
{
StartTime: types.NewMillisecondTimestampFromInt(1597026383085),
OpenPrice: fixedpoint.NewFromFloat(8533),
HighestPrice: fixedpoint.NewFromFloat(8553.74),
LowestPrice: fixedpoint.NewFromFloat(8527.17),
ClosePrice: fixedpoint.NewFromFloat(8548.26),
Volume: fixedpoint.NewFromFloat(45247),
VolumeCcy: fixedpoint.NewFromFloat(529.5858061),
VolumeCcyQuote: fixedpoint.NewFromFloat(529.5858061),
Confirm: fixedpoint.Zero,
},
},
InstrumentID: "BTC-USDT",
Symbol: "BTCUSDT",
Interval: "1d",
Channel: "candle1D",
}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.(*KLineEvent)
assert.True(t, ok)
assert.Equal(t, types.KLine{
Exchange: types.ExchangeOKEx,
Symbol: "BTCUSDT",
StartTime: types.Time(types.NewMillisecondTimestampFromInt(1597026383085)),
EndTime: types.Time(types.NewMillisecondTimestampFromInt(1597026383085).Time().Add(types.Interval(exp.Interval).Duration() - time.Millisecond)),
Interval: types.Interval(exp.Interval),
Open: exp.Events[0].OpenPrice,
Close: exp.Events[0].ClosePrice,
High: exp.Events[0].HighestPrice,
Low: exp.Events[0].LowestPrice,
Volume: exp.Events[0].Volume,
QuoteVolume: exp.Events[0].VolumeCcy,
TakerBuyBaseAssetVolume: fixedpoint.Zero,
TakerBuyQuoteAssetVolume: fixedpoint.Zero,
LastTradeID: 0,
NumberOfTrades: 0,
Closed: false,
}, event.Events[0].ToGlobal(types.Interval(event.Interval), event.Symbol))
})
}

View File

@ -28,32 +28,24 @@ type Stream struct {
client *okexapi.RestClient client *okexapi.RestClient
// public callbacks // public callbacks
candleEventCallbacks []func(candle Candle) kLineEventCallbacks []func(candle KLineEvent)
bookEventCallbacks []func(book BookEvent) bookEventCallbacks []func(book BookEvent)
eventCallbacks []func(event WebSocketEvent) eventCallbacks []func(event WebSocketEvent)
accountEventCallbacks []func(account okexapi.Account) accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails) orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
lastCandle map[CandleKey]Candle
}
type CandleKey struct {
InstrumentID string
Channel string
} }
func NewStream(client *okexapi.RestClient) *Stream { func NewStream(client *okexapi.RestClient) *Stream {
stream := &Stream{ stream := &Stream{
client: client, client: client,
StandardStream: types.NewStandardStream(), StandardStream: types.NewStandardStream(),
lastCandle: make(map[CandleKey]Candle),
} }
stream.SetParser(parseWebSocketEvent) stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent) stream.SetDispatcher(stream.dispatchEvent)
stream.SetEndpointCreator(stream.createEndpoint) stream.SetEndpointCreator(stream.createEndpoint)
stream.OnCandleEvent(stream.handleCandleEvent) stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnBookEvent(stream.handleBookEvent) stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountEvent(stream.handleAccountEvent) stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent) stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
@ -167,27 +159,22 @@ func (s *Stream) handleAccountEvent(account okexapi.Account) {
func (s *Stream) handleBookEvent(data BookEvent) { func (s *Stream) handleBookEvent(data BookEvent) {
book := data.Book() book := data.Book()
switch data.Action { switch data.Action {
case "snapshot": case ActionTypeSnapshot:
s.EmitBookSnapshot(book) s.EmitBookSnapshot(book)
case "update": case ActionTypeUpdate:
s.EmitBookUpdate(book) s.EmitBookUpdate(book)
} }
} }
func (s *Stream) handleCandleEvent(candle Candle) { func (s *Stream) handleKLineEvent(k KLineEvent) {
key := CandleKey{Channel: candle.Channel, InstrumentID: candle.InstrumentID} for _, event := range k.Events {
kline := candle.KLine() kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol)
if kline.Closed {
// check if we need to close previous kline s.EmitKLineClosed(kline)
lastCandle, ok := s.lastCandle[key] } else {
if ok && candle.StartTime.After(lastCandle.StartTime) {
lastKline := lastCandle.KLine()
lastKline.Closed = true
s.EmitKLineClosed(lastKline)
}
s.EmitKLine(kline) s.EmitKLine(kline)
s.lastCandle[key] = candle }
}
} }
func (s *Stream) createEndpoint(ctx context.Context) (string, error) { func (s *Stream) createEndpoint(ctx context.Context) (string, error) {
@ -207,12 +194,12 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *BookEvent: case *BookEvent:
// there's "books" for 400 depth and books5 for 5 depth // there's "books" for 400 depth and books5 for 5 depth
if et.channel != "books5" { if et.channel != ChannelBook5 {
s.EmitBookEvent(*et) s.EmitBookEvent(*et)
} }
s.EmitBookTickerUpdate(et.BookTicker()) s.EmitBookTickerUpdate(et.BookTicker())
case *Candle: case *KLineEvent:
s.EmitCandleEvent(*et) s.EmitKLineEvent(*et)
case *okexapi.Account: case *okexapi.Account:
s.EmitAccountEvent(*et) s.EmitAccountEvent(*et)

View File

@ -6,12 +6,12 @@ import (
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi" "github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
) )
func (s *Stream) OnCandleEvent(cb func(candle Candle)) { func (s *Stream) OnKLineEvent(cb func(candle KLineEvent)) {
s.candleEventCallbacks = append(s.candleEventCallbacks, cb) s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
} }
func (s *Stream) EmitCandleEvent(candle Candle) { func (s *Stream) EmitKLineEvent(candle KLineEvent) {
for _, cb := range s.candleEventCallbacks { for _, cb := range s.kLineEventCallbacks {
cb(candle) cb(candle)
} }
} }
@ -57,7 +57,7 @@ func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
} }
type StreamEventHub interface { type StreamEventHub interface {
OnCandleEvent(cb func(candle Candle)) OnKLineEvent(cb func(candle KLineEvent))
OnBookEvent(cb func(book BookEvent)) OnBookEvent(cb func(book BookEvent))

View File

@ -48,4 +48,21 @@ func TestStream(t *testing.T) {
c := make(chan struct{}) c := make(chan struct{})
<-c <-c
}) })
t.Run("kline test", func(t *testing.T) {
s.Subscribe(types.KLineChannel, "LTC-USD-200327", types.SubscribeOptions{
Interval: types.Interval1m,
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnKLine(func(kline types.KLine) {
t.Log("got update", kline)
})
s.OnKLineClosed(func(kline types.KLine) {
t.Log("got closed", kline)
})
c := make(chan struct{})
<-c
})
} }