Merge pull request #1308 from bailantaotao/edwin/add-trade-stream

FEATURE: [bybit] support market trade
This commit is contained in:
bailantaotao 2023-09-11 18:02:03 +08:00 committed by GitHub
commit 388b9c3f9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 207 additions and 12 deletions

View File

@ -42,11 +42,12 @@ type Stream struct {
// TODO: update the fee rate at 7:00 am UTC; rotation required.
symbolFeeDetails map[string]*symbolFeeDetail
bookEventCallbacks []func(e BookEvent)
walletEventCallbacks []func(e []bybitapi.WalletBalances)
kLineEventCallbacks []func(e KLineEvent)
orderEventCallbacks []func(e []OrderEvent)
tradeEventCallbacks []func(e []TradeEvent)
bookEventCallbacks []func(e BookEvent)
marketTradeEventCallbacks []func(e []MarketTradeEvent)
walletEventCallbacks []func(e []bybitapi.WalletBalances)
kLineEventCallbacks []func(e KLineEvent)
orderEventCallbacks []func(e []OrderEvent)
tradeEventCallbacks []func(e []TradeEvent)
}
func NewStream(key, secret string, marketProvider MarketInfoProvider) *Stream {
@ -66,6 +67,7 @@ func NewStream(key, secret string, marketProvider MarketInfoProvider) *Stream {
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnWalletEvent(stream.handleWalletEvent)
stream.OnOrderEvent(stream.handleOrderEvent)
@ -139,6 +141,9 @@ func (s *Stream) dispatchEvent(event interface{}) {
case *BookEvent:
s.EmitBookEvent(*e)
case []MarketTradeEvent:
s.EmitMarketTradeEvent(e)
case []bybitapi.WalletBalances:
s.EmitWalletEvent(e)
@ -173,18 +178,28 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
var book BookEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &book)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
return nil, fmt.Errorf("failed to unmarshal data into BookEvent: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err)
}
book.Type = e.WebSocketTopicEvent.Type
book.ServerTime = e.WebSocketTopicEvent.Ts.Time()
return &book, nil
case TopicTypeMarketTrade:
// snapshot only
var trade []MarketTradeEvent
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &trade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err)
}
return trade, nil
case TopicTypeKLine:
var kLines []KLine
err = json.Unmarshal(e.WebSocketTopicEvent.Data, &kLines)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, : %w", string(e.WebSocketTopicEvent.Data), err)
return nil, fmt.Errorf("failed to unmarshal data into KLine: %+v, err: %w", string(e.WebSocketTopicEvent.Data), err)
}
symbol, err := getSymbolFromTopic(e.Topic)
@ -292,6 +307,9 @@ func (s *Stream) convertSubscription(sub types.Subscription) (string, error) {
}
return genTopic(TopicTypeOrderBook, depth, sub.Symbol), nil
case types.MarketTradeChannel:
return genTopic(TopicTypeMarketTrade, sub.Symbol), nil
case types.KLineChannel:
interval, err := toLocalInterval(sub.Options.Interval)
if err != nil {
@ -318,6 +336,18 @@ func (s *Stream) handleBookEvent(e BookEvent) {
}
}
func (s *Stream) handleMarketTradeEvent(events []MarketTradeEvent) {
for _, event := range events {
trade, err := event.toGlobalTrade()
if err != nil {
log.WithError(err).Error("failed to convert to market trade")
continue
}
s.StandardStream.EmitMarketTrade(trade)
}
}
func (s *Stream) handleWalletEvent(events []bybitapi.WalletBalances) {
s.StandardStream.EmitBalanceSnapshot(toGlobalBalanceMap(events))
}

View File

@ -16,6 +16,16 @@ func (s *Stream) EmitBookEvent(e BookEvent) {
}
}
func (s *Stream) OnMarketTradeEvent(cb func(e []MarketTradeEvent)) {
s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb)
}
func (s *Stream) EmitMarketTradeEvent(e []MarketTradeEvent) {
for _, cb := range s.marketTradeEventCallbacks {
cb(e)
}
}
func (s *Stream) OnWalletEvent(cb func(e []bybitapi.WalletBalances)) {
s.walletEventCallbacks = append(s.walletEventCallbacks, cb)
}

View File

@ -112,6 +112,19 @@ func TestStream(t *testing.T) {
<-c
})
t.Run("market trade test", func(t *testing.T) {
s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnMarketTrade(func(trade types.Trade) {
t.Log("got update", trade)
})
c := make(chan struct{})
<-c
})
t.Run("wallet test", func(t *testing.T) {
err := s.Connect(context.Background())
assert.NoError(t, err)
@ -237,6 +250,42 @@ func TestStream_parseWebSocketEvent(t *testing.T) {
}, *book)
})
t.Run("TopicTypeMarketTrade with snapshot", func(t *testing.T) {
input := `{
"topic":"publicTrade.BTCUSDT",
"ts":1694348711526,
"type":"snapshot",
"data":[
{
"i":"2290000000068683805",
"T":1694348711524,
"p":"25816.27",
"v":"0.000083",
"S":"Sell",
"s":"BTCUSDT",
"BT":false
}
]
}`
res, err := s.parseWebSocketEvent([]byte(input))
assert.NoError(t, err)
book, ok := res.([]MarketTradeEvent)
assert.True(t, ok)
assert.Equal(t, []MarketTradeEvent{
{
Timestamp: types.NewMillisecondTimestampFromInt(1694348711524),
Symbol: "BTCUSDT",
Side: bybitapi.SideSell,
Quantity: fixedpoint.NewFromFloat(0.000083),
Price: fixedpoint.NewFromFloat(25816.27),
Direction: "",
TradeId: "2290000000068683805",
BlockTrade: false,
},
}, book)
})
t.Run("TopicTypeKLine with snapshot", func(t *testing.T) {
input := `{
"topic": "kline.5.BTCUSDT",
@ -379,6 +428,16 @@ func Test_convertSubscription(t *testing.T) {
assert.Equal(t, fmt.Errorf("unsupported stream channel: %s", "unsupported"), err)
assert.Equal(t, "", res)
})
t.Run("MarketTradeChannel", func(t *testing.T) {
res, err := s.convertSubscription(types.Subscription{
Symbol: "BTCUSDT",
Channel: types.MarketTradeChannel,
Options: types.SubscribeOptions{},
})
assert.NoError(t, err)
assert.Equal(t, genTopic(TopicTypeMarketTrade, "BTCUSDT"), res)
})
}
func TestStream_getFeeRate(t *testing.T) {

View File

@ -91,11 +91,12 @@ func (w *WebSocketOpEvent) IsValid() error {
type TopicType string
const (
TopicTypeOrderBook TopicType = "orderbook"
TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
TopicTypeKLine TopicType = "kline"
TopicTypeTrade TopicType = "execution"
TopicTypeOrderBook TopicType = "orderbook"
TopicTypeMarketTrade TopicType = "publicTrade"
TopicTypeWallet TopicType = "wallet"
TopicTypeOrder TopicType = "order"
TopicTypeKLine TopicType = "kline"
TopicTypeTrade TopicType = "execution"
)
type DataType string
@ -143,6 +144,52 @@ func (e *BookEvent) OrderBook() (snapshot types.SliceOrderBook) {
return snapshot
}
type MarketTradeEvent struct {
// Timestamp is the timestamp (ms) that the order is filled
Timestamp types.MillisecondTimestamp `json:"T"`
Symbol string `json:"s"`
// Side of taker. Buy,Sell
Side bybitapi.Side `json:"S"`
// Quantity is the trade size
Quantity fixedpoint.Value `json:"v"`
// Price is the trade price
Price fixedpoint.Value `json:"p"`
// L is the direction of price change. Unique field for future
Direction string `json:"L"`
// trade ID
TradeId string `json:"i"`
// Whether it is a block trade order or not
BlockTrade bool `json:"BT"`
}
func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) {
tradeId, err := strconv.ParseUint(m.TradeId, 10, 64)
if err != nil {
return types.Trade{}, fmt.Errorf("unexpected trade id: %s, err: %w", m.TradeId, err)
}
side, err := toGlobalSideType(m.Side)
if err != nil {
return types.Trade{}, err
}
return types.Trade{
ID: tradeId,
OrderID: 0, // not supported
Exchange: types.ExchangeBybit,
Price: m.Price,
Quantity: m.Quantity,
QuoteQuantity: m.Price.Mul(m.Quantity),
Symbol: m.Symbol,
Side: side,
IsBuyer: side == types.SideTypeBuy,
IsMaker: false, // not supported
Time: types.Time(m.Timestamp.Time()),
Fee: fixedpoint.Zero, // not supported
FeeCurrency: "", // not supported
}, nil
}
const topicSeparator = "."
func genTopic(in ...interface{}) string {

View File

@ -406,6 +406,55 @@ func TestBookEvent_OrderBook(t *testing.T) {
}
func TestMarketTradeEvent_Trade(t *testing.T) {
qty := fixedpoint.NewFromFloat(0.002289)
price := fixedpoint.NewFromFloat(28829.7600)
tradeId := uint64(2290000000068683542)
tradeTime := types.NewMillisecondTimestampFromInt(1691486100000)
event := MarketTradeEvent{
Timestamp: tradeTime,
Symbol: "BTCUSDT",
Side: bybitapi.SideSell,
Quantity: qty,
Price: price,
Direction: "",
TradeId: strconv.FormatUint(tradeId, 10),
BlockTrade: false,
}
t.Run("succeeds", func(t *testing.T) {
expEvent := types.Trade{
ID: tradeId,
Exchange: types.ExchangeBybit,
Price: price,
Quantity: qty,
QuoteQuantity: price.Mul(qty),
Symbol: event.Symbol,
Side: types.SideTypeSell,
IsBuyer: false,
IsMaker: false,
Time: types.Time(tradeTime.Time()),
}
trade, err := event.toGlobalTrade()
assert.NoError(t, err)
assert.Equal(t, expEvent, trade)
})
t.Run("invalid side", func(t *testing.T) {
newEvent := event
newEvent.Side = "invalid"
_, err := newEvent.toGlobalTrade()
assert.ErrorContains(t, err, "unexpected side")
})
t.Run("invalid trade id", func(t *testing.T) {
newEvent := event
newEvent.TradeId = "invalid"
_, err := newEvent.toGlobalTrade()
assert.ErrorContains(t, err, "unexpected trade id")
})
}
func Test_genTopicName(t *testing.T) {
exp := "orderbook.50.BTCUSDT"
assert.Equal(t, exp, genTopic(TopicTypeOrderBook, types.DepthLevel50, "BTCUSDT"))