diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 5b9ced6eb..db7f25770 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -42,11 +42,12 @@ type Stream struct { // TODO: update the fee rate at 7:00 am UTC; rotation required. symbolFeeDetails map[string]*symbolFeeDetail - bookEventCallbacks []func(e BookEvent) - walletEventCallbacks []func(e []bybitapi.WalletBalances) - kLineEventCallbacks []func(e KLineEvent) - orderEventCallbacks []func(e []OrderEvent) - tradeEventCallbacks []func(e []TradeEvent) + bookEventCallbacks []func(e BookEvent) + marketTradeEventCallbacks []func(e []MarketTradeEvent) + walletEventCallbacks []func(e []bybitapi.WalletBalances) + kLineEventCallbacks []func(e KLineEvent) + orderEventCallbacks []func(e []OrderEvent) + tradeEventCallbacks []func(e []TradeEvent) } func NewStream(key, secret string, marketProvider MarketInfoProvider) *Stream { @@ -66,6 +67,7 @@ func NewStream(key, secret string, marketProvider MarketInfoProvider) *Stream { stream.OnConnect(stream.handlerConnect) stream.OnBookEvent(stream.handleBookEvent) + stream.OnMarketTradeEvent(stream.handleMarketTradeEvent) stream.OnKLineEvent(stream.handleKLineEvent) stream.OnWalletEvent(stream.handleWalletEvent) stream.OnOrderEvent(stream.handleOrderEvent) @@ -139,6 +141,9 @@ func (s *Stream) dispatchEvent(event interface{}) { case *BookEvent: s.EmitBookEvent(*e) + case []MarketTradeEvent: + s.EmitMarketTradeEvent(e) + case []bybitapi.WalletBalances: s.EmitWalletEvent(e) @@ -173,18 +178,28 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { var book BookEvent err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book) if err != nil { - return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err) + return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err) } book.Type = e.WebSocketTopicEvent.Type book.ServerTime = e.WebSocketTopicEvent.Ts.Time() return &book, nil + case TopicTypeMarketTrade: + // snapshot only + var trade []MarketTradeEvent + err = json.Unmarshal(e.WebSocketTopicEvent.Data, &trade) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err) + } + + return trade, nil + case TopicTypeKLine: var kLines []KLine err = json.Unmarshal(e.WebSocketTopicEvent.Data, &kLines) if err != nil { - return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, : %w", string(e.WebSocketTopicEvent.Data), err) + return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err) } symbol, err := getSymbolFromTopic(e.Topic) @@ -292,6 +307,9 @@ func (s *Stream) convertSubscription(sub types.Subscription) (string, error) { } return genTopic(TopicTypeOrderBook, depth, sub.Symbol), nil + case types.MarketTradeChannel: + return genTopic(TopicTypeMarketTrade, sub.Symbol), nil + case types.KLineChannel: interval, err := toLocalInterval(sub.Options.Interval) if err != nil { @@ -318,6 +336,18 @@ func (s *Stream) handleBookEvent(e BookEvent) { } } +func (s *Stream) handleMarketTradeEvent(events []MarketTradeEvent) { + for _, event := range events { + trade, err := event.toGlobalTrade() + if err != nil { + log.WithError(err).Error("failed to convert to market trade") + continue + } + + s.StandardStream.EmitMarketTrade(trade) + } +} + func (s *Stream) handleWalletEvent(events []bybitapi.WalletBalances) { s.StandardStream.EmitBalanceSnapshot(toGlobalBalanceMap(events)) } diff --git a/pkg/exchange/bybit/stream_callbacks.go b/pkg/exchange/bybit/stream_callbacks.go index fcd1f6859..463e7dbd7 100644 --- a/pkg/exchange/bybit/stream_callbacks.go +++ b/pkg/exchange/bybit/stream_callbacks.go @@ -16,6 +16,16 @@ func (s *Stream) EmitBookEvent(e BookEvent) { } } +func (s *Stream) OnMarketTradeEvent(cb func(e []MarketTradeEvent)) { + s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb) +} + +func (s *Stream) EmitMarketTradeEvent(e []MarketTradeEvent) { + for _, cb := range s.marketTradeEventCallbacks { + cb(e) + } +} + func (s *Stream) OnWalletEvent(cb func(e []bybitapi.WalletBalances)) { s.walletEventCallbacks = append(s.walletEventCallbacks, cb) } diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index 12e483765..daf839872 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -112,6 +112,19 @@ func TestStream(t *testing.T) { <-c }) + t.Run("market trade test", func(t *testing.T) { + s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{}) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnMarketTrade(func(trade types.Trade) { + t.Log("got update", trade) + }) + c := make(chan struct{}) + <-c + }) + t.Run("wallet test", func(t *testing.T) { err := s.Connect(context.Background()) assert.NoError(t, err) @@ -237,6 +250,42 @@ func TestStream_parseWebSocketEvent(t *testing.T) { }, *book) }) + t.Run("TopicTypeMarketTrade with snapshot", func(t *testing.T) { + input := `{ + "topic":"publicTrade.BTCUSDT", + "ts":1694348711526, + "type":"snapshot", + "data":[ + { + "i":"2290000000068683805", + "T":1694348711524, + "p":"25816.27", + "v":"0.000083", + "S":"Sell", + "s":"BTCUSDT", + "BT":false + } + ] +}` + + res, err := s.parseWebSocketEvent([]byte(input)) + assert.NoError(t, err) + book, ok := res.([]MarketTradeEvent) + assert.True(t, ok) + assert.Equal(t, []MarketTradeEvent{ + { + Timestamp: types.NewMillisecondTimestampFromInt(1694348711524), + Symbol: "BTCUSDT", + Side: bybitapi.SideSell, + Quantity: fixedpoint.NewFromFloat(0.000083), + Price: fixedpoint.NewFromFloat(25816.27), + Direction: "", + TradeId: "2290000000068683805", + BlockTrade: false, + }, + }, book) + }) + t.Run("TopicTypeKLine with snapshot", func(t *testing.T) { input := `{ "topic": "kline.5.BTCUSDT", @@ -379,6 +428,16 @@ func Test_convertSubscription(t *testing.T) { assert.Equal(t, fmt.Errorf("unsupported stream channel: %s", "unsupported"), err) assert.Equal(t, "", res) }) + + t.Run("MarketTradeChannel", func(t *testing.T) { + res, err := s.convertSubscription(types.Subscription{ + Symbol: "BTCUSDT", + Channel: types.MarketTradeChannel, + Options: types.SubscribeOptions{}, + }) + assert.NoError(t, err) + assert.Equal(t, genTopic(TopicTypeMarketTrade, "BTCUSDT"), res) + }) } func TestStream_getFeeRate(t *testing.T) { diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index c5acd4916..f5caedf8d 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -91,11 +91,12 @@ func (w *WebSocketOpEvent) IsValid() error { type TopicType string const ( - TopicTypeOrderBook TopicType = "orderbook" - TopicTypeWallet TopicType = "wallet" - TopicTypeOrder TopicType = "order" - TopicTypeKLine TopicType = "kline" - TopicTypeTrade TopicType = "execution" + TopicTypeOrderBook TopicType = "orderbook" + TopicTypeMarketTrade TopicType = "publicTrade" + TopicTypeWallet TopicType = "wallet" + TopicTypeOrder TopicType = "order" + TopicTypeKLine TopicType = "kline" + TopicTypeTrade TopicType = "execution" ) type DataType string @@ -143,6 +144,52 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook) { return snapshot } +type MarketTradeEvent struct { + // Timestamp is the timestamp (ms) that the order is filled + Timestamp types.MillisecondTimestamp `json:"T"` + Symbol string `json:"s"` + // Side of taker. Buy,Sell + Side bybitapi.Side `json:"S"` + // Quantity is the trade size + Quantity fixedpoint.Value `json:"v"` + // Price is the trade price + Price fixedpoint.Value `json:"p"` + // L is the direction of price change. Unique field for future + Direction string `json:"L"` + // trade ID + TradeId string `json:"i"` + // Whether it is a block trade order or not + BlockTrade bool `json:"BT"` +} + +func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) { + tradeId, err := strconv.ParseUint(m.TradeId, 10, 64) + if err != nil { + return types.Trade{}, fmt.Errorf("unexpected trade id: %s, err: %w", m.TradeId, err) + } + + side, err := toGlobalSideType(m.Side) + if err != nil { + return types.Trade{}, err + } + + return types.Trade{ + ID: tradeId, + OrderID: 0, // not supported + Exchange: types.ExchangeBybit, + Price: m.Price, + Quantity: m.Quantity, + QuoteQuantity: m.Price.Mul(m.Quantity), + Symbol: m.Symbol, + Side: side, + IsBuyer: side == types.SideTypeBuy, + IsMaker: false, // not supported + Time: types.Time(m.Timestamp.Time()), + Fee: fixedpoint.Zero, // not supported + FeeCurrency: "", // not supported + }, nil +} + const topicSeparator = "." func genTopic(in ...interface{}) string { diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 21fc04f4f..5f32abd77 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -406,6 +406,55 @@ func TestBookEvent_OrderBook(t *testing.T) { } +func TestMarketTradeEvent_Trade(t *testing.T) { + qty := fixedpoint.NewFromFloat(0.002289) + price := fixedpoint.NewFromFloat(28829.7600) + tradeId := uint64(2290000000068683542) + tradeTime := types.NewMillisecondTimestampFromInt(1691486100000) + event := MarketTradeEvent{ + Timestamp: tradeTime, + Symbol: "BTCUSDT", + Side: bybitapi.SideSell, + Quantity: qty, + Price: price, + Direction: "", + TradeId: strconv.FormatUint(tradeId, 10), + BlockTrade: false, + } + t.Run("succeeds", func(t *testing.T) { + expEvent := types.Trade{ + ID: tradeId, + Exchange: types.ExchangeBybit, + Price: price, + Quantity: qty, + QuoteQuantity: price.Mul(qty), + Symbol: event.Symbol, + Side: types.SideTypeSell, + IsBuyer: false, + IsMaker: false, + Time: types.Time(tradeTime.Time()), + } + + trade, err := event.toGlobalTrade() + assert.NoError(t, err) + assert.Equal(t, expEvent, trade) + }) + + t.Run("invalid side", func(t *testing.T) { + newEvent := event + newEvent.Side = "invalid" + _, err := newEvent.toGlobalTrade() + assert.ErrorContains(t, err, "unexpected side") + }) + + t.Run("invalid trade id", func(t *testing.T) { + newEvent := event + newEvent.TradeId = "invalid" + _, err := newEvent.toGlobalTrade() + assert.ErrorContains(t, err, "unexpected trade id") + }) +} + func Test_genTopicName(t *testing.T) { exp := "orderbook.50.BTCUSDT" assert.Equal(t, exp, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"))