From a6047f629d83557c55b23463197ff7ecdabe874e Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 2 Aug 2023 16:57:30 +0800 Subject: [PATCH] pkg/exchange: implement bybit stream ping --- pkg/exchange/bybit/convert_test.go | 18 +- pkg/exchange/bybit/stream.go | 95 ++++++++++- pkg/exchange/bybit/stream_callbacks.go | 15 ++ pkg/exchange/bybit/stream_test.go | 155 +++++++++++++++++ pkg/exchange/bybit/types.go | 113 ++++++++++++- pkg/exchange/bybit/types_test.go | 221 ++++++++++++++++++++++--- pkg/types/stream.go | 1 + 7 files changed, 572 insertions(+), 46 deletions(-) create mode 100644 pkg/exchange/bybit/stream_callbacks.go create mode 100644 pkg/exchange/bybit/stream_test.go diff --git a/pkg/exchange/bybit/convert_test.go b/pkg/exchange/bybit/convert_test.go index 3051a3376..118853de0 100644 --- a/pkg/exchange/bybit/convert_test.go +++ b/pkg/exchange/bybit/convert_test.go @@ -1,35 +1,19 @@ package bybit import ( - "context" "fmt" "math" "strconv" "testing" "time" - "github.com/pkg/errors" - "github.com/stretchr/testify/assert" - "go.uber.org/multierr" - "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi" v3 "github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi/v3" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" + "github.com/stretchr/testify/assert" ) -func TestU(t *testing.T) { - e := returnErr() - - t.Log(errors.Is(e, context.DeadlineExceeded)) - -} - -func returnErr() error { - var err error - return multierr.Append(multierr.Append(err, fmt.Errorf("got err: %w", context.DeadlineExceeded)), fmt.Errorf("GG")) -} - func TestToGlobalMarket(t *testing.T) { // sample: //{ diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 186cc3eeb..2f07181b2 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -3,6 +3,7 @@ package bybit import ( "context" "encoding/json" + "fmt" "time" "github.com/gorilla/websocket" @@ -15,10 +16,16 @@ const ( // Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds // to maintain the WebSocket connection. pingInterval = 20 * time.Second + + // spotArgsLimit can input up to 10 args for each subscription request sent to one connection. + spotArgsLimit = 10 ) +//go:generate callbackgen -type Stream type Stream struct { types.StandardStream + + bookEventCallbacks []func(e BookEvent) } func NewStream() *Stream { @@ -31,6 +38,8 @@ func NewStream() *Stream { stream.SetDispatcher(stream.dispatchEvent) stream.SetHeartBeat(stream.ping) + stream.OnConnect(stream.handlerConnect) + stream.OnBookEvent(stream.handleBookEvent) return stream } @@ -46,16 +55,43 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) { func (s *Stream) dispatchEvent(event interface{}) { switch e := event.(type) { - case *WebSocketEvent: + case *WebSocketOpEvent: if err := e.IsValid(); err != nil { log.Errorf("invalid event: %v", err) } + + case *BookEvent: + s.EmitBookEvent(*e) } } func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { - var resp WebSocketEvent - return &resp, json.Unmarshal(in, &resp) + var e WsEvent + + err := json.Unmarshal(in, &e) + if err != nil { + return nil, err + } + + switch { + case e.IsOp(): + return e.WebSocketOpEvent, nil + + case e.IsTopic(): + switch getTopicType(e.Topic) { + case TopicTypeOrderBook: + var book BookEvent + err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err) + } + + book.Type = e.WebSocketTopicEvent.Type + return &book, nil + } + } + + return nil, fmt.Errorf("unhandled websocket event: %+v", string(in)) } // ping implements the Bybit text message of WebSocket PingPong. @@ -94,3 +130,56 @@ func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc cont } } } + +func (s *Stream) handlerConnect() { + if s.PublicOnly { + var topics []string + + for _, subscription := range s.Subscriptions { + topic, err := convertSubscription(subscription) + if err != nil { + log.WithError(err).Errorf("subscription convert error") + continue + } + + topics = append(topics, topic) + } + if len(topics) > spotArgsLimit { + log.Debugf("topics exceeds limit: %d, drop of: %v", spotArgsLimit, topics[spotArgsLimit:]) + topics = topics[:spotArgsLimit] + } + log.Infof("subscribing channels: %+v", topics) + if err := s.Conn.WriteJSON(WebsocketOp{ + Op: "subscribe", + Args: topics, + }); err != nil { + log.WithError(err).Error("failed to send subscription request") + } + } +} + +func convertSubscription(s types.Subscription) (string, error) { + switch s.Channel { + case types.BookChannel: + depth := types.DepthLevel1 + if len(s.Options.Depth) > 0 && s.Options.Depth == types.DepthLevel50 { + depth = types.DepthLevel50 + } + return genTopic(TopicTypeOrderBook, depth, s.Symbol), nil + } + + return "", fmt.Errorf("unsupported stream channel: %s", s.Channel) +} + +func (s *Stream) handleBookEvent(e BookEvent) { + orderBook := e.OrderBook() + switch { + // Occasionally, you'll receive "UpdateId"=1, which is a snapshot data due to the restart of + // the service. So please overwrite your local orderbook + case e.Type == DataTypeSnapshot || e.UpdateId.Int() == 1: + s.EmitBookSnapshot(orderBook) + + case e.Type == DataTypeDelta: + s.EmitBookUpdate(orderBook) + } +} diff --git a/pkg/exchange/bybit/stream_callbacks.go b/pkg/exchange/bybit/stream_callbacks.go new file mode 100644 index 000000000..7940157b8 --- /dev/null +++ b/pkg/exchange/bybit/stream_callbacks.go @@ -0,0 +1,15 @@ +// Code generated by "callbackgen -type Stream"; DO NOT EDIT. + +package bybit + +import () + +func (s *Stream) OnBookEvent(cb func(e BookEvent)) { + s.bookEventCallbacks = append(s.bookEventCallbacks, cb) +} + +func (s *Stream) EmitBookEvent(e BookEvent) { + for _, cb := range s.bookEventCallbacks { + cb(e) + } +} diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go new file mode 100644 index 000000000..ea4183e04 --- /dev/null +++ b/pkg/exchange/bybit/stream_test.go @@ -0,0 +1,155 @@ +package bybit + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" +) + +func TestStream_parseWebSocketEvent(t *testing.T) { + s := Stream{} + + t.Run("op", func(t *testing.T) { + input := `{ + "success":true, + "ret_msg":"subscribe", + "conn_id":"a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5", + "op":"subscribe" + }` + res, err := s.parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + opEvent, ok := res.(*WebSocketOpEvent) + assert.True(t, ok) + expSucceeds := true + expRetMsg := "subscribe" + assert.Equal(t, WebSocketOpEvent{ + Success: &expSucceeds, + RetMsg: &expRetMsg, + ReqId: nil, + ConnId: "a403c8e5-e2b6-4edd-a8f0-1a64fa7227a5", + Op: WsOpTypeSubscribe, + Args: nil, + }, *opEvent) + }) + t.Run("TopicTypeOrderBook with delta", func(t *testing.T) { + input := `{ + "topic":"orderbook.50.BTCUSDT", + "ts":1691130685111, + "type":"delta", + "data":{ + "s":"BTCUSDT", + "b":[ + + ], + "a":[ + [ + "29239.37", + "0.082356" + ], + [ + "29236.1", + "0" + ] + ], + "u":1854104, + "seq":10559247733 + } + }` + + res, err := s.parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + book, ok := res.(*BookEvent) + assert.True(t, ok) + assert.Equal(t, BookEvent{ + Symbol: "BTCUSDT", + Bids: nil, + Asks: types.PriceVolumeSlice{ + { + fixedpoint.NewFromFloat(29239.37), + fixedpoint.NewFromFloat(0.082356), + }, + { + fixedpoint.NewFromFloat(29236.1), + fixedpoint.NewFromFloat(0), + }, + }, + UpdateId: fixedpoint.NewFromFloat(1854104), + SequenceId: fixedpoint.NewFromFloat(10559247733), + Type: DataTypeDelta, + }, *book) + }) + + t.Run("Parse fails", func(t *testing.T) { + input := `{ + "topic":"orderbook.50.BTCUSDT", + "ts":1691130685111, + "type":"delta", + "data":{ + "GG": "test", + } + }` + + res, err := s.parseWebSocketEvent([]byte(input)) + assert.Error(t, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", `{ + "GG": "test", + }`, err), err) + assert.Equal(t, nil, res) + }) +} + +func Test_convertSubscription(t *testing.T) { + t.Run("BookChannel.DepthLevel1", func(t *testing.T) { + res, err := convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: types.BookChannel, + Options: types.SubscribeOptions{ + Depth: types.DepthLevel1, + }, + }) + assert.NoError(t, err) + assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res) + }) + t.Run("BookChannel. with default depth", func(t *testing.T) { + res, err := convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: types.BookChannel, + }) + assert.NoError(t, err) + assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res) + }) + t.Run("BookChannel.DepthLevel50", func(t *testing.T) { + res, err := convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: types.BookChannel, + Options: types.SubscribeOptions{ + Depth: types.DepthLevel50, + }, + }) + assert.NoError(t, err) + assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"), res) + }) + t.Run("BookChannel. not support depth, use default level 1", func(t *testing.T) { + res, err := convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: types.BookChannel, + Options: types.SubscribeOptions{ + Depth: "20", + }, + }) + assert.NoError(t, err) + assert.Equal(t, genTopic(TopicTypeOrderBook, types.DepthLevel1, "BTCUSDT"), res) + }) + + t.Run("unsupported channel", func(t *testing.T) { + res, err := convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: "unsupported", + }) + assert.Error(t, fmt.Errorf("unsupported stream channel: %s", "unsupported"), err) + assert.Equal(t, "", res) + }) +} diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 750e3d3d4..8986451a2 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -1,17 +1,42 @@ package bybit import ( + "encoding/json" "fmt" + "strings" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" ) +type WsEvent struct { + // "op" and "topic" are exclusive. + *WebSocketOpEvent + *WebSocketTopicEvent +} + +func (w *WsEvent) IsOp() bool { + return w.WebSocketOpEvent != nil && w.WebSocketTopicEvent == nil +} + +func (w *WsEvent) IsTopic() bool { + return w.WebSocketOpEvent == nil && w.WebSocketTopicEvent != nil +} + type WsOpType string const ( - WsOpTypePing WsOpType = "ping" - WsOpTypePong WsOpType = "pong" + WsOpTypePing WsOpType = "ping" + WsOpTypePong WsOpType = "pong" + WsOpTypeSubscribe WsOpType = "subscribe" ) -type WebSocketEvent struct { +type WebsocketOp struct { + Op string `json:"op"` + Args []string `json:"args"` +} + +type WebSocketOpEvent struct { Success *bool `json:"success,omitempty"` RetMsg *string `json:"ret_msg,omitempty"` ReqId *string `json:"req_id,omitempty"` @@ -21,19 +46,95 @@ type WebSocketEvent struct { Args []string `json:"args"` } -func (w *WebSocketEvent) IsValid() error { +func (w *WebSocketOpEvent) IsValid() error { switch w.Op { case WsOpTypePing: // public event if (w.Success != nil && !*w.Success) || (w.RetMsg != nil && WsOpType(*w.RetMsg) != WsOpTypePong) { - return fmt.Errorf("unexpeted response of pong: %#v", w) + return fmt.Errorf("unexpeted response of pong: %+v", w) } return nil case WsOpTypePong: // private event return nil + case WsOpTypeSubscribe: + if w.Success != nil && !*w.Success { + return fmt.Errorf("unexpected subscribe result: %+v", w) + } + return nil default: - return fmt.Errorf("unexpected op type: %#v", w) + return fmt.Errorf("unexpected op type: %+v", w) } } + +type TopicType string + +const ( + TopicTypeOrderBook TopicType = "orderbook" +) + +type DataType string + +const ( + DataTypeSnapshot DataType = "snapshot" + DataTypeDelta DataType = "delta" +) + +type WebSocketTopicEvent struct { + Topic string `json:"topic"` + Type DataType `json:"type"` + // The timestamp (ms) that the system generates the data + Ts types.MillisecondTimestamp `json:"ts"` + Data json.RawMessage `json:"data"` +} + +// PriceVolumeSlice represents a slice of price and value. +// +// index 0 is Bid/Ask price. +// index 1 is Bid/Ask size. The *delta data* has size=0, which means that all quotations for this price have been filled or cancelled +type PriceVolumeSlice [2]fixedpoint.Value + +type BookEvent struct { + // Symbol name + Symbol string `json:"s"` + // Bids. For snapshot stream, the element is sorted by price in descending order + Bids types.PriceVolumeSlice `json:"b"` + // Asks. For snapshot stream, the element is sorted by price in ascending order + Asks types.PriceVolumeSlice `json:"a"` + // Update ID. Is a sequence. Occasionally, you'll receive "u"=1, which is a snapshot data due to the restart of + // the service. So please overwrite your local orderbook + UpdateId fixedpoint.Value `json:"u"` + // Cross sequence. You can use this field to compare different levels orderbook data, and for the smaller seq, + // then it means the data is generated earlier. + SequenceId fixedpoint.Value `json:"seq"` + + // internal use + // Type can be one of snapshot or delta. Copied from WebSocketTopicEvent.Type + Type DataType +} + +func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook) { + snapshot.Symbol = e.Symbol + snapshot.Bids = e.Bids + snapshot.Asks = e.Asks + return snapshot +} + +const topicSeparator = "." + +func genTopic(in ...interface{}) string { + out := make([]string, len(in)) + for k, v := range in { + out[k] = fmt.Sprintf("%v", v) + } + return strings.Join(out, topicSeparator) +} + +func getTopicType(topic string) TopicType { + slice := strings.Split(topic, topicSeparator) + if len(slice) == 0 { + return "" + } + return TopicType(slice[0]) +} diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 8cdbf86c8..27dd31054 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -5,6 +5,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" ) func Test_parseWebSocketEvent(t *testing.T) { @@ -16,9 +19,9 @@ func Test_parseWebSocketEvent(t *testing.T) { expSucceeds := true expRetMsg := string(WsOpTypePong) - e, ok := raw.(*WebSocketEvent) + e, ok := raw.(*WebSocketOpEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketEvent{ + assert.Equal(t, &WebSocketOpEvent{ Success: &expSucceeds, RetMsg: &expRetMsg, ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", @@ -39,9 +42,9 @@ func Test_parseWebSocketEvent(t *testing.T) { expSucceeds := true expRetMsg := string(WsOpTypePong) expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - e, ok := raw.(*WebSocketEvent) + e, ok := raw.(*WebSocketOpEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketEvent{ + assert.Equal(t, &WebSocketOpEvent{ Success: &expSucceeds, RetMsg: &expRetMsg, ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", @@ -59,9 +62,9 @@ func Test_parseWebSocketEvent(t *testing.T) { raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) - e, ok := raw.(*WebSocketEvent) + e, ok := raw.(*WebSocketOpEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketEvent{ + assert.Equal(t, &WebSocketOpEvent{ Success: nil, RetMsg: nil, ConnId: "civn4p1dcjmtvb69ome0-yrt1", @@ -80,9 +83,9 @@ func Test_parseWebSocketEvent(t *testing.T) { assert.NoError(t, err) expReqId := "78d36b57-a142-47b7-9143-5843df77d44d" - e, ok := raw.(*WebSocketEvent) + e, ok := raw.(*WebSocketOpEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketEvent{ + assert.Equal(t, &WebSocketOpEvent{ Success: nil, RetMsg: nil, ConnId: "civn4p1dcjmtvb69ome0-yrt1", @@ -101,7 +104,7 @@ func Test_WebSocketEventIsValid(t *testing.T) { expRetMsg := string(WsOpTypePong) expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ Success: &expSucceeds, RetMsg: &expRetMsg, ReqId: &expReqId, @@ -113,7 +116,7 @@ func Test_WebSocketEventIsValid(t *testing.T) { }) t.Run("[private] valid op ping", func(t *testing.T) { - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ Success: nil, RetMsg: nil, ReqId: nil, @@ -129,7 +132,7 @@ func Test_WebSocketEventIsValid(t *testing.T) { expRetMsg := string(WsOpTypePong) expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ Success: &expSucceeds, RetMsg: &expRetMsg, ReqId: &expReqId, @@ -137,21 +140,21 @@ func Test_WebSocketEventIsValid(t *testing.T) { Op: WsOpTypePing, Args: nil, } - assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) + assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid()) }) t.Run("[public] missing Success field", func(t *testing.T) { expRetMsg := string(WsOpTypePong) expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ RetMsg: &expRetMsg, ReqId: &expReqId, ConnId: "test-conndid", Op: WsOpTypePing, Args: nil, } - assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) + assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid()) }) t.Run("[public] invalid ret msg", func(t *testing.T) { @@ -159,7 +162,7 @@ func Test_WebSocketEventIsValid(t *testing.T) { expRetMsg := "PINGPONGPINGPONG" expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ Success: &expSucceeds, RetMsg: &expRetMsg, ReqId: &expReqId, @@ -167,25 +170,203 @@ func Test_WebSocketEventIsValid(t *testing.T) { Op: WsOpTypePing, Args: nil, } - assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) + assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid()) }) t.Run("[public] missing RetMsg field", func(t *testing.T) { expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ ReqId: &expReqId, ConnId: "test-conndid", Op: WsOpTypePing, Args: nil, } - assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid()) + assert.Error(t, fmt.Errorf("unexpeted response of pong: %+v", w), w.IsValid()) }) t.Run("unexpected op type", func(t *testing.T) { - w := &WebSocketEvent{ + w := &WebSocketOpEvent{ Op: WsOpType("unexpected"), } - assert.Error(t, fmt.Errorf("unexpected op type: %#v", w), w.IsValid()) + assert.Error(t, fmt.Errorf("unexpected op type: %+v", w), w.IsValid()) + }) + + t.Run("[subscribe] valid", func(t *testing.T) { + expSucceeds := true + expRetMsg := "" + w := &WebSocketOpEvent{ + Success: &expSucceeds, + RetMsg: &expRetMsg, + ReqId: nil, + ConnId: "test-conndid", + Op: WsOpTypeSubscribe, + Args: nil, + } + assert.NoError(t, w.IsValid()) + }) + + t.Run("[subscribe] un-succeeds", func(t *testing.T) { + expSucceeds := false + expRetMsg := "" + w := &WebSocketOpEvent{ + Success: &expSucceeds, + RetMsg: &expRetMsg, + ReqId: nil, + ConnId: "test-conndid", + Op: WsOpTypeSubscribe, + Args: nil, + } + assert.Error(t, fmt.Errorf("unexpected subscribe result: %+v", w), w.IsValid()) }) } + +func TestBookEvent_OrderBook(t *testing.T) { + t.Run("snapshot", func(t *testing.T) { + /* + { + "topic":"orderbook.50.BTCUSDT", + "ts":1691129753071, + "type":"snapshot", + "data":{ + "s":"BTCUSDT", + "b":[ + [ + "29230.81", + "4.713817" + ], + [ + "29230", + "0.1646" + ], + [ + "29229.92", + "0.036" + ], + ], + "a":[ + [ + "29230.82", + "2.745421" + ], + [ + "29231.41", + "1.6" + ], + [ + "29231.42", + "0.513654" + ], + ], + "u":1841364, + "seq":10558648910 + } + } + */ + event := &BookEvent{ + Symbol: "BTCUSDT", + Bids: types.PriceVolumeSlice{ + { + fixedpoint.NewFromFloat(29230.81), + fixedpoint.NewFromFloat(4.713817), + }, + { + fixedpoint.NewFromFloat(29230), + fixedpoint.NewFromFloat(0.1646), + }, + { + fixedpoint.NewFromFloat(29229.92), + fixedpoint.NewFromFloat(0.036), + }, + }, + Asks: types.PriceVolumeSlice{ + { + fixedpoint.NewFromFloat(29230.82), + fixedpoint.NewFromFloat(2.745421), + }, + { + fixedpoint.NewFromFloat(29231.41), + fixedpoint.NewFromFloat(1.6), + }, + { + fixedpoint.NewFromFloat(29231.42), + fixedpoint.NewFromFloat(0.513654), + }, + }, + UpdateId: fixedpoint.NewFromFloat(1841364), + SequenceId: fixedpoint.NewFromFloat(10558648910), + Type: DataTypeSnapshot, + } + + expSliceOrderBook := types.SliceOrderBook{ + Symbol: event.Symbol, + Bids: event.Bids, + Asks: event.Asks, + } + + assert.Equal(t, expSliceOrderBook, event.OrderBook()) + }) + t.Run("delta", func(t *testing.T) { + /* + { + "topic":"orderbook.50.BTCUSDT", + "ts":1691130685111, + "type":"delta", + "data":{ + "s":"BTCUSDT", + "b":[ + + ], + "a":[ + [ + "29239.37", + "0.082356" + ], + [ + "29236.1", + "0" + ] + ], + "u":1854104, + "seq":10559247733 + } + } + */ + event := &BookEvent{ + Symbol: "BTCUSDT", + Bids: types.PriceVolumeSlice{}, + Asks: types.PriceVolumeSlice{ + { + fixedpoint.NewFromFloat(29239.37), + fixedpoint.NewFromFloat(0.082356), + }, + { + fixedpoint.NewFromFloat(29236.1), + fixedpoint.NewFromFloat(0), + }, + }, + UpdateId: fixedpoint.NewFromFloat(1854104), + SequenceId: fixedpoint.NewFromFloat(10559247733), + Type: DataTypeDelta, + } + + expSliceOrderBook := types.SliceOrderBook{ + Symbol: event.Symbol, + Bids: types.PriceVolumeSlice{}, + Asks: event.Asks, + } + + assert.Equal(t, expSliceOrderBook, event.OrderBook()) + }) + +} + +func Test_genTopicName(t *testing.T) { + exp := "orderbook.50.BTCUSDT" + assert.Equal(t, exp, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT")) +} + +func Test_getTopicName(t *testing.T) { + exp := TopicTypeOrderBook + assert.Equal(t, exp, getTopicType("orderbook.50.BTCUSDT")) +} diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 1baa3700a..6bcd5e39e 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -440,6 +440,7 @@ const ( DepthLevel1 Depth = "1" DepthLevel5 Depth = "5" DepthLevel20 Depth = "20" + DepthLevel50 Depth = "50" ) type Speed string