pkg/exchange: support market trade streaming

This commit is contained in:
Edwin 2024-01-08 15:05:32 +08:00
parent 2afc72d14d
commit 2e34f7840a
6 changed files with 214 additions and 6 deletions

View File

@ -55,9 +55,9 @@ func toGlobalBalance(account *okexapi.Account) types.BalanceMap {
}
type WebsocketSubscription struct {
Channel string `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
Channel Channel `json:"channel"`
InstrumentID string `json:"instId,omitempty"`
InstrumentType string `json:"instType,omitempty"`
}
var CandleChannels = []string{
@ -92,18 +92,23 @@ func convertSubscription(s types.Subscription) (WebsocketSubscription, error) {
case types.KLineChannel:
// Channel names are:
return WebsocketSubscription{
Channel: convertIntervalToCandle(s.Options.Interval),
Channel: Channel(convertIntervalToCandle(s.Options.Interval)),
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
case types.BookChannel:
return WebsocketSubscription{
Channel: "books",
Channel: ChannelBooks,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
case types.BookTickerChannel:
return WebsocketSubscription{
Channel: "books5",
Channel: ChannelBook5,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
case types.MarketTradeChannel:
return WebsocketSubscription{
Channel: ChannelMarketTrades,
InstrumentID: toLocalSymbol(s.Symbol),
}, nil
}

View File

@ -21,6 +21,7 @@ const (
ChannelBook5 Channel = "book5"
ChannelCandlePrefix Channel = "candle"
ChannelAccount Channel = "account"
ChannelMarketTrades Channel = "trades"
ChannelOrders Channel = "orders"
)
@ -66,6 +67,14 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
bookEvent.Action = event.ActionType
return &bookEvent, nil
case ChannelMarketTrades:
var trade []MarketTradeEvent
err = json.Unmarshal(event.Data, &trade)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal data into MarketTradeEvent: %+v, err: %w", string(event.Data), err)
}
return trade, nil
case ChannelOrders:
// TODO: remove fastjson
return parseOrder(v)
@ -363,3 +372,54 @@ func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) {
return orderDetails, nil
}
func toGlobalSideType(side okexapi.SideType) (types.SideType, error) {
switch side {
case okexapi.SideTypeBuy:
return types.SideTypeBuy, nil
case okexapi.SideTypeSell:
return types.SideTypeSell, nil
default:
return types.SideType(side), fmt.Errorf("unexpected side: %s", side)
}
}
type MarketTradeEvent struct {
InstId string `json:"instId"`
TradeId types.StrInt64 `json:"tradeId"`
Px fixedpoint.Value `json:"px"`
Sz fixedpoint.Value `json:"sz"`
Side okexapi.SideType `json:"side"`
Timestamp types.MillisecondTimestamp `json:"ts"`
Count types.StrInt64 `json:"count"`
}
func (m *MarketTradeEvent) toGlobalTrade() (types.Trade, error) {
symbol := toGlobalSymbol(m.InstId)
if symbol == "" {
return types.Trade{}, fmt.Errorf("unexpected inst id: %s", m.InstId)
}
side, err := toGlobalSideType(m.Side)
if err != nil {
return types.Trade{}, err
}
return types.Trade{
ID: uint64(m.TradeId),
OrderID: 0, // not supported
Exchange: types.ExchangeOKEx,
Price: m.Px,
Quantity: m.Sz,
QuoteQuantity: m.Px.Mul(m.Sz),
Symbol: symbol,
Side: side,
IsBuyer: side == types.SideTypeBuy,
IsMaker: false, // not supported
Time: types.Time(m.Timestamp.Time()),
Fee: fixedpoint.Zero, // not supported
FeeCurrency: "", // not supported
}, nil
}

View File

@ -6,6 +6,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -575,3 +576,95 @@ func TestKLine_ToGlobal(t *testing.T) {
})
}
func Test_parseWebSocketEvent(t *testing.T) {
in := `
{
"arg": {
"channel": "trades",
"instId": "BTC-USDT"
},
"data": [
{
"instId": "BTC-USDT",
"tradeId": "130639474",
"px": "42219.9",
"sz": "0.12060306",
"side": "buy",
"ts": "1630048897897",
"count": "3"
}
]
}
`
exp := []MarketTradeEvent{{
InstId: "BTC-USDT",
TradeId: 130639474,
Px: fixedpoint.NewFromFloat(42219.9),
Sz: fixedpoint.NewFromFloat(0.12060306),
Side: okexapi.SideTypeBuy,
Timestamp: types.NewMillisecondTimestampFromInt(1630048897897),
Count: 3,
}}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.([]MarketTradeEvent)
assert.True(t, ok)
assert.Len(t, event, 1)
assert.Equal(t, exp, event)
}
func Test_toGlobalTrade(t *testing.T) {
// {
// "instId": "BTC-USDT",
// "tradeId": "130639474",
// "px": "42219.9",
// "sz": "0.12060306",
// "side": "buy",
// "ts": "1630048897897",
// "count": "3"
// }
marketTrade := MarketTradeEvent{
InstId: "BTC-USDT",
TradeId: 130639474,
Px: fixedpoint.NewFromFloat(42219.9),
Sz: fixedpoint.NewFromFloat(0.12060306),
Side: okexapi.SideTypeBuy,
Timestamp: types.NewMillisecondTimestampFromInt(1630048897897),
Count: 3,
}
t.Run("succeeds", func(t *testing.T) {
trade, err := marketTrade.toGlobalTrade()
assert.NoError(t, err)
assert.Equal(t, types.Trade{
ID: uint64(130639474),
OrderID: uint64(0),
Exchange: types.ExchangeOKEx,
Price: fixedpoint.NewFromFloat(42219.9),
Quantity: fixedpoint.NewFromFloat(0.12060306),
QuoteQuantity: marketTrade.Px.Mul(marketTrade.Sz),
Symbol: "BTCUSDT",
Side: types.SideTypeBuy,
IsBuyer: true,
IsMaker: false,
Time: types.Time(types.NewMillisecondTimestampFromInt(1630048897897)),
Fee: fixedpoint.Zero,
FeeCurrency: "",
FeeDiscounted: false,
}, trade)
})
t.Run("unexpected side", func(t *testing.T) {
newTrade := marketTrade
newTrade.Side = "both"
_, err := newTrade.toGlobalTrade()
assert.ErrorContains(t, err, "both")
})
t.Run("unexpected symbol", func(t *testing.T) {
newTrade := marketTrade
newTrade.InstId = ""
_, err := newTrade.toGlobalTrade()
assert.ErrorContains(t, err, "unexpected inst id")
})
}

View File

@ -2,6 +2,7 @@ package okex
import (
"context"
"golang.org/x/time/rate"
"strconv"
"time"
@ -9,6 +10,10 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
var (
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
)
type WebsocketOp struct {
Op string `json:"op"`
Args interface{} `json:"args"`
@ -33,6 +38,7 @@ type Stream struct {
eventCallbacks []func(event WebSocketEvent)
accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent)
}
func NewStream(client *okexapi.RestClient) *Stream {
@ -48,6 +54,7 @@ func NewStream(client *okexapi.RestClient) *Stream {
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
stream.OnEvent(stream.handleEvent)
stream.OnConnect(stream.handleConnect)
@ -166,6 +173,20 @@ func (s *Stream) handleBookEvent(data BookEvent) {
}
}
func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) {
for _, event := range data {
trade, err := event.toGlobalTrade()
if err != nil {
if tradeLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to market trade")
}
continue
}
s.EmitMarketTrade(trade)
}
}
func (s *Stream) handleKLineEvent(k KLineEvent) {
for _, event := range k.Events {
kline := event.ToGlobal(types.Interval(k.Interval), k.Symbol)
@ -207,5 +228,8 @@ func (s *Stream) dispatchEvent(e interface{}) {
case []okexapi.OrderDetails:
s.EmitOrderDetailsEvent(et)
case []MarketTradeEvent:
s.EmitMarketTradeEvent(et)
}
}

View File

@ -56,6 +56,16 @@ func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
}
}
func (s *Stream) OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent)) {
s.marketTradeEventCallbacks = append(s.marketTradeEventCallbacks, cb)
}
func (s *Stream) EmitMarketTradeEvent(tradeDetail []MarketTradeEvent) {
for _, cb := range s.marketTradeEventCallbacks {
cb(tradeDetail)
}
}
type StreamEventHub interface {
OnKLineEvent(cb func(candle KLineEvent))
@ -66,4 +76,6 @@ type StreamEventHub interface {
OnAccountEvent(cb func(account okexapi.Account))
OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails))
OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent))
}

View File

@ -48,6 +48,20 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})
t.Run("market trade test", func(t *testing.T) {
s.Subscribe(types.MarketTradeChannel, "BTCUSDT", types.SubscribeOptions{})
s.SetPublicOnly()
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnMarketTrade(func(trade types.Trade) {
t.Log("got trade upgrade", trade)
})
c := make(chan struct{})
<-c
})
t.Run("kline test", func(t *testing.T) {
s.Subscribe(types.KLineChannel, "LTC-USD-200327", types.SubscribeOptions{
Interval: types.Interval1m,