pkg/exchage: support k line websocket event

This commit is contained in:
Edwin 2023-08-09 11:41:16 +08:00
parent e9d0ce5bbf
commit 4cee22ce31
5 changed files with 297 additions and 11 deletions

View File

@ -34,6 +34,7 @@ type Stream struct {
bookEventCallbacks []func(e BookEvent)
walletEventCallbacks []func(e []bybitapi.WalletBalances)
kLineEventCallbacks []func(e KLineEvent)
orderEventCallbacks []func(e []OrderEvent)
}
@ -52,6 +53,7 @@ func NewStream(key, secret string) *Stream {
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnWalletEvent(stream.handleWalletEvent)
stream.OnOrderEvent(stream.handleOrderEvent)
return stream
@ -80,6 +82,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
case []bybitapi.WalletBalances:
s.EmitWalletEvent(e)
case *KLineEvent:
s.EmitKLineEvent(*e)
case []OrderEvent:
s.EmitOrderEvent(e)
}
@ -99,6 +104,7 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
case e.IsTopic():
switch getTopicType(e.Topic) {
case TopicTypeOrderBook:
var book BookEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
@ -109,6 +115,20 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
book.Type = e.WebSocketTopicEvent.Type
return &book, nil
case TopicTypeKLine:
var kLines []KLine
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &kLines)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
}
symbol, err := getSymbolFromTopic(e.Topic)
if err != nil {
return nil, err
}
return &KLineEvent{KLines: kLines, Symbol: symbol, Type: e.WebSocketTopicEvent.Type}, nil
case TopicTypeWallet:
var wallets []bybitapi.WalletBalances
return wallets, json.Unmarshal(e.WebSocketTopicEvent.Data, &wallets)
@ -165,7 +185,7 @@ func (s *Stream) handlerConnect() {
var topics []string
for _, subscription := range s.Subscriptions {
topic, err := convertSubscription(subscription)
topic, err := s.convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
@ -213,17 +233,27 @@ func (s *Stream) handlerConnect() {
}
}
func convertSubscription(s types.Subscription) (string, error) {
switch s.Channel {
func (s *Stream) convertSubscription(sub types.Subscription) (string, error) {
switch sub.Channel {
case types.BookChannel:
depth := types.DepthLevel1
if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 {
if len(sub.Options.Depth) > 0 && sub.Options.Depth == types.DepthLevel50 {
depth = types.DepthLevel50
}
return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil
return genTopic(TopicTypeOrderBook, depth, sub.Symbol), nil
case types.KLineChannel:
interval, err := toLocalInterval(sub.Options.Interval)
if err != nil {
return "", err
}
return "", fmt.Errorf("unsupported stream channel: %s", s.Channel)
return genTopic(TopicTypeKLine, interval, sub.Symbol), nil
}
return "", fmt.Errorf("unsupported stream channel: %s", sub.Channel)
}
func (s *Stream) handleBookEvent(e BookEvent) {
@ -257,3 +287,23 @@ func (s *Stream) handleOrderEvent(events []OrderEvent) {
s.StandardStream.EmitOrderUpdate(*gOrder)
}
}
func (s *Stream) handleKLineEvent(klineEvent KLineEvent) {
if klineEvent.Type != DataTypeSnapshot {
return
}
for _, event := range klineEvent.KLines {
kline, err := event.toGlobalKLine(klineEvent.Symbol)
if err != nil {
log.WithError(err).Error("failed to convert to global k line")
continue
}
if kline.Closed {
s.EmitKLineClosed(kline)
} else {
s.EmitKLine(kline)
}
}
}

View File

@ -26,6 +26,16 @@ func (s *Stream) EmitWalletEvent(e []bybitapi.WalletBalances) {
}
}
func (s *Stream) OnKLineEvent(cb func(e KLineEvent)) {
s.kLineEventCallbacks = append(s.kLineEventCallbacks, cb)
}
func (s *Stream) EmitKLineEvent(e KLineEvent) {
for _, cb := range s.kLineEventCallbacks {
cb(e)
}
}
func (s *Stream) OnOrderEvent(cb func(e []OrderEvent)) {
s.orderEventCallbacks = append(s.orderEventCallbacks, cb)
}

View File

@ -2,6 +2,7 @@ package bybit
import (
"context"
"errors"
"fmt"
"os"
"strconv"
@ -76,6 +77,23 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})
t.Run("kline test", func(t *testing.T) {
s.Subscribe(types.KLineChannel, "BTCUSDT", types.SubscribeOptions{
Interval: types.Interval30m,
Depth: "",
Speed: "",
})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnKLine(func(kline types.KLine) {
t.Log(kline)
})
c := make(chan struct{})
<-c
})
}
func TestStream_parseWebSocketEvent(t *testing.T) {
@ -151,6 +169,80 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
}, *book)
})
t.Run("TopicTypeKLine with snapshot", func(t *testing.T) {
input := `{
"topic": "kline.5.BTCUSDT",
"data": [
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882
}
],
"ts": 1672324988882,
"type": "snapshot"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
book, ok := res.(*KLineEvent)
assert.True(t, ok)
assert.Equal(t, KLineEvent{
Symbol: "BTCUSDT",
Type: DataTypeSnapshot,
KLines: []KLine{
{
StartTime: types.NewMillisecondTimestampFromInt(1672324800000),
EndTime: types.NewMillisecondTimestampFromInt(1672325099999),
Interval: "5",
OpenPrice: fixedpoint.NewFromFloat(16649.5),
ClosePrice: fixedpoint.NewFromFloat(16677),
HighPrice: fixedpoint.NewFromFloat(16677),
LowPrice: fixedpoint.NewFromFloat(16608),
Volume: fixedpoint.NewFromFloat(2.081),
Turnover: fixedpoint.NewFromFloat(34666.4005),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1672324988882),
},
},
}, *book)
})
t.Run("TopicTypeKLine with invalid topic", func(t *testing.T) {
input := `{
"topic": "kline.5",
"data": [
{
"start": 1672324800000,
"end": 1672325099999,
"interval": "5",
"open": "16649.5",
"close": "16677",
"high": "16677",
"low": "16608",
"volume": "2.081",
"turnover": "34666.4005",
"confirm": false,
"timestamp": 1672324988882
}
],
"ts": 1672324988882,
"type": "snapshot"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.Equal(t, errors.New("unexpected topic: kline.5"), err)
assert.Nil(t, res)
})
t.Run("Parse fails", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
@ -168,8 +260,9 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
}
func Test_convertSubscription(t *testing.T) {
s := Stream{}
t.Run("BookChannel.DepthLevel1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -180,7 +273,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel. with default depth", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
})
@ -188,7 +281,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel.DepthLevel50", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -199,7 +292,7 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res)
})
t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
@ -211,7 +304,7 @@ func Test_convertSubscription(t *testing.T) {
})
t.Run("unsupported channel", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: "unsupported",
})

View File

@ -82,6 +82,7 @@ const (
TopicTypeOrderBook TopicType = "orderbook"
TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
TopicTypeKLine TopicType = "kline"
)
type DataType string
@ -143,8 +144,69 @@ func getTopicType(topic string) TopicType {
return TopicType(slice[0])
}
func getSymbolFromTopic(topic string) (string, error) {
slice := strings.Split(topic, topicSeparator)
if len(slice) != 3 {
return "", fmt.Errorf("unexpected topic: %s", topic)
}
return slice[2], nil
}
type OrderEvent struct {
bybitapi.Order
Category bybitapi.Category `json:"category"`
}
type KLineEvent struct {
KLines []KLine
// internal use
// Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type
Type DataType
// Symbol. Copied from WebSocketTopicEvent.Topic
Symbol string
}
type KLine struct {
// The start timestamp (ms)
StartTime types.MillisecondTimestamp `json:"start"`
// The end timestamp (ms)
EndTime types.MillisecondTimestamp `json:"end"`
// Kline interval
Interval string `json:"interval"`
OpenPrice fixedpoint.Value `json:"open"`
ClosePrice fixedpoint.Value `json:"close"`
HighPrice fixedpoint.Value `json:"high"`
LowPrice fixedpoint.Value `json:"low"`
// Trade volume
Volume fixedpoint.Value `json:"volume"`
// Turnover. Unit of figure: quantity of quota coin
Turnover fixedpoint.Value `json:"turnover"`
// Weather the tick is ended or not
Confirm bool `json:"confirm"`
// The timestamp (ms) of the last matched order in the candle
Timestamp types.MillisecondTimestamp `json:"timestamp"`
}
func (k *KLine) toGlobalKLine(symbol string) (types.KLine, error) {
interval, found := bybitapi.ToGlobalInterval[k.Interval]
if !found {
return types.KLine{}, fmt.Errorf("unexpected k line interval: %+v", k)
}
return types.KLine{
Exchange: types.ExchangeBybit,
Symbol: symbol,
StartTime: types.Time(k.StartTime.Time()),
EndTime: types.Time(k.EndTime.Time()),
Interval: interval,
Open: k.OpenPrice,
Close: k.ClosePrice,
High: k.HighPrice,
Low: k.LowPrice,
Volume: k.Volume,
QuoteVolume: k.Turnover,
Closed: k.Confirm,
}, nil
}

View File

@ -386,3 +386,74 @@ func Test_getTopicName(t *testing.T) {
exp := TopicTypeOrderBook
assert.Equal(t, exp, getTopicType("orderbook.50.BTCUSDT"))
}
func Test_getSymbolFromTopic(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
exp := "BTCUSDT"
res, err := getSymbolFromTopic("kline.1.BTCUSDT")
assert.NoError(t, err)
assert.Equal(t, exp, res)
})
t.Run("unexpected topic", func(t *testing.T) {
res, err := getSymbolFromTopic("kline.1")
assert.Empty(t, res)
assert.Equal(t, err, fmt.Errorf("unexpected topic: kline.1"))
})
}
func TestKLine_toGlobalKLine(t *testing.T) {
t.Run("succeeds", func(t *testing.T) {
k := KLine{
StartTime: types.NewMillisecondTimestampFromInt(1691486100000),
EndTime: types.NewMillisecondTimestampFromInt(1691487000000),
Interval: "1",
OpenPrice: fixedpoint.NewFromFloat(29045.3),
ClosePrice: fixedpoint.NewFromFloat(29228.56),
HighPrice: fixedpoint.NewFromFloat(29228.56),
LowPrice: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
Turnover: fixedpoint.NewFromFloat(270447.43520753),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1691486100000),
}
gKline, err := k.toGlobalKLine("BTCUSDT")
assert.NoError(t, err)
assert.Equal(t, types.KLine{
Exchange: types.ExchangeBybit,
Symbol: "BTCUSDT",
StartTime: types.Time(k.StartTime.Time()),
EndTime: types.Time(k.EndTime.Time()),
Interval: types.Interval1m,
Open: fixedpoint.NewFromFloat(29045.3),
Close: fixedpoint.NewFromFloat(29228.56),
High: fixedpoint.NewFromFloat(29228.56),
Low: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
QuoteVolume: fixedpoint.NewFromFloat(270447.43520753),
Closed: false,
}, gKline)
})
t.Run("interval not supported", func(t *testing.T) {
k := KLine{
StartTime: types.NewMillisecondTimestampFromInt(1691486100000),
EndTime: types.NewMillisecondTimestampFromInt(1691487000000),
Interval: "112",
OpenPrice: fixedpoint.NewFromFloat(29045.3),
ClosePrice: fixedpoint.NewFromFloat(29228.56),
HighPrice: fixedpoint.NewFromFloat(29228.56),
LowPrice: fixedpoint.NewFromFloat(29045.3),
Volume: fixedpoint.NewFromFloat(9.265593),
Turnover: fixedpoint.NewFromFloat(270447.43520753),
Confirm: false,
Timestamp: types.NewMillisecondTimestampFromInt(1691486100000),
}
gKline, err := k.toGlobalKLine("BTCUSDT")
assert.Equal(t, fmt.Errorf("unexpected k line interval: %+v", &k), err)
assert.Equal(t, gKline, types.KLine{})
})
}