pkg/exchange: implement bybit stream ping

This commit is contained in:
Edwin 2023-08-02 16:57:30 +08:00
parent 8118e55b72
commit a6047f629d
7 changed files with 572 additions and 46 deletions

View File

@ -1,35 +1,19 @@
package bybit package bybit
import ( import (
"context"
"fmt" "fmt"
"math" "math"
"strconv" "strconv"
"testing" "testing"
"time" "time"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi" "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3" v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
) )
func TestU(t *testing.T) {
e := returnErr()
t.Log(errors.Is(e, context.DeadlineExceeded))
}
func returnErr() error {
var err error
return multierr.Append(multierr.Append(err, fmt.Errorf("got err: %w", context.DeadlineExceeded)), fmt.Errorf("GG"))
}
func TestToGlobalMarket(t *testing.T) { func TestToGlobalMarket(t *testing.T) {
// sample: // sample:
//{ //{

View File

@ -3,6 +3,7 @@ package bybit
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"fmt"
"time" "time"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
@ -15,10 +16,16 @@ const (
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds // Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
// to maintain the WebSocket connection. // to maintain the WebSocket connection.
pingInterval = 20 * time.Second pingInterval = 20 * time.Second
// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
spotArgsLimit = 10
) )
//go:generate callbackgen -type Stream
type Stream struct { type Stream struct {
types.StandardStream types.StandardStream
bookEventCallbacks []func(e BookEvent)
} }
func NewStream() *Stream { func NewStream() *Stream {
@ -31,6 +38,8 @@ func NewStream() *Stream {
stream.SetDispatcher(stream.dispatchEvent) stream.SetDispatcher(stream.dispatchEvent)
stream.SetHeartBeat(stream.ping) stream.SetHeartBeat(stream.ping)
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
return stream return stream
} }
@ -46,16 +55,43 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) {
func (s *Stream) dispatchEvent(event interface{}) { func (s *Stream) dispatchEvent(event interface{}) {
switch e := event.(type) { switch e := event.(type) {
case *WebSocketEvent: case *WebSocketOpEvent:
if err := e.IsValid(); err != nil { if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err) log.Errorf("invalid event: %v", err)
} }
case *BookEvent:
s.EmitBookEvent(*e)
} }
} }
func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
var resp WebSocketEvent var e WsEvent
return &resp, json.Unmarshal(in, &resp)
err := json.Unmarshal(in, &e)
if err != nil {
return nil, err
}
switch {
case e.IsOp():
return e.WebSocketOpEvent, nil
case e.IsTopic():
switch getTopicType(e.Topic) {
case TopicTypeOrderBook:
var book BookEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
}
book.Type = e.WebSocketTopicEvent.Type
return &book, nil
}
}
return nil, fmt.Errorf("unhandled websocket event: %+v", string(in))
} }
// ping implements the Bybit text message of WebSocket PingPong. // ping implements the Bybit text message of WebSocket PingPong.
@ -94,3 +130,56 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc cont
} }
} }
} }
func (s *Stream) handlerConnect() {
if s.PublicOnly {
var topics []string
for _, subscription := range s.Subscriptions {
topic, err := convertSubscription(subscription)
if err != nil {
log.WithError(err).Errorf("subscription convert error")
continue
}
topics = append(topics, topic)
}
if len(topics) > spotArgsLimit {
log.Debugf("topics exceeds limit: %d, drop of: %v", spotArgsLimit, topics[spotArgsLimit:])
topics = topics[:spotArgsLimit]
}
log.Infof("subscribing channels: %+v", topics)
if err := s.Conn.WriteJSON(WebsocketOp{
Op: "subscribe",
Args: topics,
}); err != nil {
log.WithError(err).Error("failed to send subscription request")
}
}
}
func convertSubscription(s types.Subscription) (string, error) {
switch s.Channel {
case types.BookChannel:
depth := types.DepthLevel1
if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 {
depth = types.DepthLevel50
}
return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil
}
return "", fmt.Errorf("unsupported stream channel: %s", s.Channel)
}
func (s *Stream) handleBookEvent(e BookEvent) {
orderBook := e.OrderBook()
switch {
// Occasionally, you'll receive "UpdateId"=1, which is a snapshot data due to the restart of
// the service. So please overwrite your local orderbook
case e.Type == DataTypeSnapshot || e.UpdateId.Int() == 1:
s.EmitBookSnapshot(orderBook)
case e.Type == DataTypeDelta:
s.EmitBookUpdate(orderBook)
}
}

View File

@ -0,0 +1,15 @@
// Code generated by "callbackgen -type Stream"; DO NOT EDIT.
package bybit
import ()
func (s *Stream) OnBookEvent(cb func(e BookEvent)) {
s.bookEventCallbacks = append(s.bookEventCallbacks, cb)
}
func (s *Stream) EmitBookEvent(e BookEvent) {
for _, cb := range s.bookEventCallbacks {
cb(e)
}
}

View File

@ -0,0 +1,155 @@
package bybit
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
func TestStream_parseWebSocketEvent(t *testing.T) {
s := Stream{}
t.Run("op", func(t *testing.T) {
input := `{
"success":true,
"ret_msg":"subscribe",
"conn_id":"a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5",
"op":"subscribe"
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
opEvent, ok := res.(*WebSocketOpEvent)
assert.True(t, ok)
expSucceeds := true
expRetMsg := "subscribe"
assert.Equal(t, WebSocketOpEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: nil,
ConnId: "a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5",
Op: WsOpTypeSubscribe,
Args: nil,
}, *opEvent)
})
t.Run("TopicTypeOrderBook with delta", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
"ts":1691130685111,
"type":"delta",
"data":{
"s":"BTCUSDT",
"b":[
],
"a":[
[
"29239.37",
"0.082356"
],
[
"29236.1",
"0"
]
],
"u":1854104,
"seq":10559247733
}
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
book, ok := res.(*BookEvent)
assert.True(t, ok)
assert.Equal(t, BookEvent{
Symbol: "BTCUSDT",
Bids: nil,
Asks: types.PriceVolumeSlice{
{
fixedpoint.NewFromFloat(29239.37),
fixedpoint.NewFromFloat(0.082356),
},
{
fixedpoint.NewFromFloat(29236.1),
fixedpoint.NewFromFloat(0),
},
},
UpdateId: fixedpoint.NewFromFloat(1854104),
SequenceId: fixedpoint.NewFromFloat(10559247733),
Type: DataTypeDelta,
}, *book)
})
t.Run("Parse fails", func(t *testing.T) {
input := `{
"topic":"orderbook.50.BTCUSDT",
"ts":1691130685111,
"type":"delta",
"data":{
"GG": "test",
}
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.Error(t, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", `{
"GG": "test",
}`, err), err)
assert.Equal(t, nil, res)
})
}
func Test_convertSubscription(t *testing.T) {
t.Run("BookChannel.DepthLevel1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel1,
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel. with default depth", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("BookChannel.DepthLevel50", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: types.DepthLevel50,
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res)
})
t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.BookChannel,
Options: types.SubscribeOptions{
Depth: "20",
},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res)
})
t.Run("unsupported channel", func(t *testing.T) {
res, err := convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: "unsupported",
})
assert.Error(t, fmt.Errorf("unsupported stream channel: %s", "unsupported"), err)
assert.Equal(t, "", res)
})
}

View File

@ -1,17 +1,42 @@
package bybit package bybit
import ( import (
"encoding/json"
"fmt" "fmt"
"strings"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
) )
type WsEvent struct {
// "op" and "topic" are exclusive.
*WebSocketOpEvent
*WebSocketTopicEvent
}
func (w *WsEvent) IsOp() bool {
return w.WebSocketOpEvent != nil && w.WebSocketTopicEvent == nil
}
func (w *WsEvent) IsTopic() bool {
return w.WebSocketOpEvent == nil && w.WebSocketTopicEvent != nil
}
type WsOpType string type WsOpType string
const ( const (
WsOpTypePing WsOpType = "ping" WsOpTypePing WsOpType = "ping"
WsOpTypePong WsOpType = "pong" WsOpTypePong WsOpType = "pong"
WsOpTypeSubscribe WsOpType = "subscribe"
) )
type WebSocketEvent struct { type WebsocketOp struct {
Op string `json:"op"`
Args []string `json:"args"`
}
type WebSocketOpEvent struct {
Success *bool `json:"success,omitempty"` Success *bool `json:"success,omitempty"`
RetMsg *string `json:"ret_msg,omitempty"` RetMsg *string `json:"ret_msg,omitempty"`
ReqId *string `json:"req_id,omitempty"` ReqId *string `json:"req_id,omitempty"`
@ -21,19 +46,95 @@ type WebSocketEvent struct {
Args []string `json:"args"` Args []string `json:"args"`
} }
func (w *WebSocketEvent) IsValid() error { func (w *WebSocketOpEvent) IsValid() error {
switch w.Op { switch w.Op {
case WsOpTypePing: case WsOpTypePing:
// public event // public event
if (w.Success != nil && !*w.Success) || if (w.Success != nil && !*w.Success) ||
(w.RetMsg != nil && WsOpType(*w.RetMsg) != WsOpTypePong) { (w.RetMsg != nil && WsOpType(*w.RetMsg) != WsOpTypePong) {
return fmt.Errorf("unexpeted response of pong: %#v", w) return fmt.Errorf("unexpeted response of pong: %+v", w)
} }
return nil return nil
case WsOpTypePong: case WsOpTypePong:
// private event // private event
return nil return nil
case WsOpTypeSubscribe:
if w.Success != nil && !*w.Success {
return fmt.Errorf("unexpected subscribe result: %+v", w)
}
return nil
default: default:
return fmt.Errorf("unexpected op type: %#v", w) return fmt.Errorf("unexpected op type: %+v", w)
} }
} }
type TopicType string
const (
TopicTypeOrderBook TopicType = "orderbook"
)
type DataType string
const (
DataTypeSnapshot DataType = "snapshot"
DataTypeDelta DataType = "delta"
)
type WebSocketTopicEvent struct {
Topic string `json:"topic"`
Type DataType `json:"type"`
// The timestamp (ms) that the system generates the data
Ts types.MillisecondTimestamp `json:"ts"`
Data json.RawMessage `json:"data"`
}
// PriceVolumeSlice represents a slice of price and value.
//
// index 0 is Bid/Ask price.
// index 1 is Bid/Ask size. The *delta data* has size=0, which means that all quotations for this price have been filled or cancelled
type PriceVolumeSlice [2]fixedpoint.Value
type BookEvent struct {
// Symbol name
Symbol string `json:"s"`
// Bids. For snapshot stream, the element is sorted by price in descending order
Bids types.PriceVolumeSlice `json:"b"`
// Asks. For snapshot stream, the element is sorted by price in ascending order
Asks types.PriceVolumeSlice `json:"a"`
// Update ID. Is a sequence. Occasionally, you'll receive "u"=1, which is a snapshot data due to the restart of
// the service. So please overwrite your local orderbook
UpdateId fixedpoint.Value `json:"u"`
// Cross sequence. You can use this field to compare different levels orderbook data, and for the smaller seq,
// then it means the data is generated earlier.
SequenceId fixedpoint.Value `json:"seq"`
// internal use
// Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type
Type DataType
}
func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook) {
snapshot.Symbol = e.Symbol
snapshot.Bids = e.Bids
snapshot.Asks = e.Asks
return snapshot
}
const topicSeparator = "."
func genTopic(in ...interface{}) string {
out := make([]string, len(in))
for k, v := range in {
out[k] = fmt.Sprintf("%v", v)
}
return strings.Join(out, topicSeparator)
}
func getTopicType(topic string) TopicType {
slice := strings.Split(topic, topicSeparator)
if len(slice) == 0 {
return ""
}
return TopicType(slice[0])
}

View File

@ -5,6 +5,9 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
) )
func Test_parseWebSocketEvent(t *testing.T) { func Test_parseWebSocketEvent(t *testing.T) {
@ -16,9 +19,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
expSucceeds := true expSucceeds := true
expRetMsg := string(WsOpTypePong) expRetMsg := string(WsOpTypePong)
e, ok := raw.(*WebSocketEvent) e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{ assert.Equal(t, &WebSocketOpEvent{
Success: &expSucceeds, Success: &expSucceeds,
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
@ -39,9 +42,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
expSucceeds := true expSucceeds := true
expRetMsg := string(WsOpTypePong) expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
e, ok := raw.(*WebSocketEvent) e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{ assert.Equal(t, &WebSocketOpEvent{
Success: &expSucceeds, Success: &expSucceeds,
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
@ -59,9 +62,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
raw, err := s.parseWebSocketEvent([]byte(msg)) raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err) assert.NoError(t, err)
e, ok := raw.(*WebSocketEvent) e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{ assert.Equal(t, &WebSocketOpEvent{
Success: nil, Success: nil,
RetMsg: nil, RetMsg: nil,
ConnId: "civn4p1dcjmtvb69ome0-yrt1", ConnId: "civn4p1dcjmtvb69ome0-yrt1",
@ -80,9 +83,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
expReqId := "78d36b57-a142-47b7-9143-5843df77d44d" expReqId := "78d36b57-a142-47b7-9143-5843df77d44d"
e, ok := raw.(*WebSocketEvent) e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{ assert.Equal(t, &WebSocketOpEvent{
Success: nil, Success: nil,
RetMsg: nil, RetMsg: nil,
ConnId: "civn4p1dcjmtvb69ome0-yrt1", ConnId: "civn4p1dcjmtvb69ome0-yrt1",
@ -101,7 +104,7 @@ func Test_WebSocketEventIsValid(t *testing.T) {
expRetMsg := string(WsOpTypePong) expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{ w := &WebSocketOpEvent{
Success: &expSucceeds, Success: &expSucceeds,
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ReqId: &expReqId, ReqId: &expReqId,
@ -113,7 +116,7 @@ func Test_WebSocketEventIsValid(t *testing.T) {
}) })
t.Run("[private] valid op ping", func(t *testing.T) { t.Run("[private] valid op ping", func(t *testing.T) {
w := &WebSocketEvent{ w := &WebSocketOpEvent{
Success: nil, Success: nil,
RetMsg: nil, RetMsg: nil,
ReqId: nil, ReqId: nil,
@ -129,7 +132,7 @@ func Test_WebSocketEventIsValid(t *testing.T) {
expRetMsg := string(WsOpTypePong) expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{ w := &WebSocketOpEvent{
Success: &expSucceeds, Success: &expSucceeds,
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ReqId: &expReqId, ReqId: &expReqId,
@ -137,21 +140,21 @@ func Test_WebSocketEventIsValid(t *testing.T) {
Op: WsOpTypePing, Op: WsOpTypePing,
Args: nil, Args: nil,
} }
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid())
}) })
t.Run("[public] missing Success field", func(t *testing.T) { t.Run("[public] missing Success field", func(t *testing.T) {
expRetMsg := string(WsOpTypePong) expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{ w := &WebSocketOpEvent{
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ReqId: &expReqId, ReqId: &expReqId,
ConnId: "test-conndid", ConnId: "test-conndid",
Op: WsOpTypePing, Op: WsOpTypePing,
Args: nil, Args: nil,
} }
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid())
}) })
t.Run("[public] invalid ret msg", func(t *testing.T) { t.Run("[public] invalid ret msg", func(t *testing.T) {
@ -159,7 +162,7 @@ func Test_WebSocketEventIsValid(t *testing.T) {
expRetMsg := "PINGPONGPINGPONG" expRetMsg := "PINGPONGPINGPONG"
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{ w := &WebSocketOpEvent{
Success: &expSucceeds, Success: &expSucceeds,
RetMsg: &expRetMsg, RetMsg: &expRetMsg,
ReqId: &expReqId, ReqId: &expReqId,
@ -167,25 +170,203 @@ func Test_WebSocketEventIsValid(t *testing.T) {
Op: WsOpTypePing, Op: WsOpTypePing,
Args: nil, Args: nil,
} }
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid())
}) })
t.Run("[public] missing RetMsg field", func(t *testing.T) { t.Run("[public] missing RetMsg field", func(t *testing.T) {
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{ w := &WebSocketOpEvent{
ReqId: &expReqId, ReqId: &expReqId,
ConnId: "test-conndid", ConnId: "test-conndid",
Op: WsOpTypePing, Op: WsOpTypePing,
Args: nil, Args: nil,
} }
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid())
}) })
t.Run("unexpected op type", func(t *testing.T) { t.Run("unexpected op type", func(t *testing.T) {
w := &WebSocketEvent{ w := &WebSocketOpEvent{
Op: WsOpType("unexpected"), Op: WsOpType("unexpected"),
} }
assert.Error(t, fmt.Errorf("unexpected op type: %#v", w), w.IsValid()) assert.Error(t, fmt.Errorf("unexpected op type: %+v", w), w.IsValid())
})
t.Run("[subscribe] valid", func(t *testing.T) {
expSucceeds := true
expRetMsg := ""
w := &WebSocketOpEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: nil,
ConnId: "test-conndid",
Op: WsOpTypeSubscribe,
Args: nil,
}
assert.NoError(t, w.IsValid())
})
t.Run("[subscribe] un-succeeds", func(t *testing.T) {
expSucceeds := false
expRetMsg := ""
w := &WebSocketOpEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: nil,
ConnId: "test-conndid",
Op: WsOpTypeSubscribe,
Args: nil,
}
assert.Error(t, fmt.Errorf("unexpected subscribe result: %+v", w), w.IsValid())
}) })
} }
func TestBookEvent_OrderBook(t *testing.T) {
t.Run("snapshot", func(t *testing.T) {
/*
{
"topic":"orderbook.50.BTCUSDT",
"ts":1691129753071,
"type":"snapshot",
"data":{
"s":"BTCUSDT",
"b":[
[
"29230.81",
"4.713817"
],
[
"29230",
"0.1646"
],
[
"29229.92",
"0.036"
],
],
"a":[
[
"29230.82",
"2.745421"
],
[
"29231.41",
"1.6"
],
[
"29231.42",
"0.513654"
],
],
"u":1841364,
"seq":10558648910
}
}
*/
event := &BookEvent{
Symbol: "BTCUSDT",
Bids: types.PriceVolumeSlice{
{
fixedpoint.NewFromFloat(29230.81),
fixedpoint.NewFromFloat(4.713817),
},
{
fixedpoint.NewFromFloat(29230),
fixedpoint.NewFromFloat(0.1646),
},
{
fixedpoint.NewFromFloat(29229.92),
fixedpoint.NewFromFloat(0.036),
},
},
Asks: types.PriceVolumeSlice{
{
fixedpoint.NewFromFloat(29230.82),
fixedpoint.NewFromFloat(2.745421),
},
{
fixedpoint.NewFromFloat(29231.41),
fixedpoint.NewFromFloat(1.6),
},
{
fixedpoint.NewFromFloat(29231.42),
fixedpoint.NewFromFloat(0.513654),
},
},
UpdateId: fixedpoint.NewFromFloat(1841364),
SequenceId: fixedpoint.NewFromFloat(10558648910),
Type: DataTypeSnapshot,
}
expSliceOrderBook := types.SliceOrderBook{
Symbol: event.Symbol,
Bids: event.Bids,
Asks: event.Asks,
}
assert.Equal(t, expSliceOrderBook, event.OrderBook())
})
t.Run("delta", func(t *testing.T) {
/*
{
"topic":"orderbook.50.BTCUSDT",
"ts":1691130685111,
"type":"delta",
"data":{
"s":"BTCUSDT",
"b":[
],
"a":[
[
"29239.37",
"0.082356"
],
[
"29236.1",
"0"
]
],
"u":1854104,
"seq":10559247733
}
}
*/
event := &BookEvent{
Symbol: "BTCUSDT",
Bids: types.PriceVolumeSlice{},
Asks: types.PriceVolumeSlice{
{
fixedpoint.NewFromFloat(29239.37),
fixedpoint.NewFromFloat(0.082356),
},
{
fixedpoint.NewFromFloat(29236.1),
fixedpoint.NewFromFloat(0),
},
},
UpdateId: fixedpoint.NewFromFloat(1854104),
SequenceId: fixedpoint.NewFromFloat(10559247733),
Type: DataTypeDelta,
}
expSliceOrderBook := types.SliceOrderBook{
Symbol: event.Symbol,
Bids: types.PriceVolumeSlice{},
Asks: event.Asks,
}
assert.Equal(t, expSliceOrderBook, event.OrderBook())
})
}
func Test_genTopicName(t *testing.T) {
exp := "orderbook.50.BTCUSDT"
assert.Equal(t, exp, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"))
}
func Test_getTopicName(t *testing.T) {
exp := TopicTypeOrderBook
assert.Equal(t, exp, getTopicType("orderbook.50.BTCUSDT"))
}

View File

@ -440,6 +440,7 @@ const (
DepthLevel1 Depth = "1" DepthLevel1 Depth = "1"
DepthLevel5 Depth = "5" DepthLevel5 Depth = "5"
DepthLevel20 Depth = "20" DepthLevel20 Depth = "20"
DepthLevel50 Depth = "50"
) )
type Speed string type Speed string