pkg/exchange: refactor order trade event by json.Unmarshal

This commit is contained in:
Edwin 2024-01-16 15:29:12 +08:00
parent 735123b3a2
commit 91913f021c
8 changed files with 512 additions and 54 deletions

View File

@ -175,6 +175,25 @@ func tradeToGlobal(trade okexapi.Trade) types.Trade {
}
}
func processMarketBuySize(o *okexapi.OrderDetail) (fixedpoint.Value, error) {
switch o.State {
case okexapi.OrderStateLive, okexapi.OrderStateCanceled:
return fixedpoint.Zero, nil
case okexapi.OrderStatePartiallyFilled:
if o.FillPrice.IsZero() {
return fixedpoint.Zero, fmt.Errorf("fillPrice for a partialFilled should not be zero")
}
return o.Size.Div(o.FillPrice), nil
case okexapi.OrderStateFilled:
return o.AccumulatedFillSize, nil
default:
return fixedpoint.Zero, fmt.Errorf("unexpected status: %s", o.State)
}
}
func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) {
side := toGlobalSide(order.Side)
@ -196,6 +215,17 @@ func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) {
return nil, err
}
size := order.Size
if order.Side == okexapi.SideTypeBuy &&
order.OrderType == okexapi.OrderTypeMarket &&
order.TargetCurrency == okexapi.TargetCurrencyQuote {
size, err = processMarketBuySize(order)
if err != nil {
return nil, err
}
}
return &types.Order{
SubmitOrder: types.SubmitOrder{
ClientOrderID: order.ClientOrderId,
@ -203,7 +233,8 @@ func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) {
Side: side,
Type: orderType,
Price: order.Price,
Quantity: order.Size,
Quantity: size,
AveragePrice: order.AvgPrice,
TimeInForce: timeInForce,
},
Exchange: types.ExchangeOKEx,

View File

@ -68,6 +68,26 @@ func Test_orderDetailToGlobal(t *testing.T) {
assert.Equal(expOrder, order)
})
t.Run("succeeds with market/buy/targetQuoteCurrency", func(t *testing.T) {
newOrder := *openOrder
newOrder.OrderType = okexapi.OrderTypeMarket
newOrder.Side = okexapi.SideTypeBuy
newOrder.TargetCurrency = okexapi.TargetCurrencyQuote
newOrder.FillPrice = fixedpoint.NewFromFloat(100)
newOrder.Size = fixedpoint.NewFromFloat(10000)
newOrder.State = okexapi.OrderStatePartiallyFilled
newExpOrder := *expOrder
newExpOrder.Side = types.SideTypeBuy
newExpOrder.Type = types.OrderTypeMarket
newExpOrder.Quantity = fixedpoint.NewFromFloat(100)
newExpOrder.Status = types.OrderStatusPartiallyFilled
newExpOrder.OriginalStatus = string(okexapi.OrderStatePartiallyFilled)
order, err := orderDetailToGlobal(&newOrder)
assert.NoError(err)
assert.Equal(&newExpOrder, order)
})
t.Run("unexpected order status", func(t *testing.T) {
newOrder := *openOrder
newOrder.State = "xxx"
@ -172,3 +192,54 @@ func Test_tradeToGlobal(t *testing.T) {
})
})
}
func Test_processMarketBuyQuantity(t *testing.T) {
var (
assert = assert.New(t)
)
t.Run("zero", func(t *testing.T) {
size, err := processMarketBuySize(&okexapi.OrderDetail{State: okexapi.OrderStateLive})
assert.NoError(err)
assert.Equal(fixedpoint.Zero, size)
size, err = processMarketBuySize(&okexapi.OrderDetail{State: okexapi.OrderStateCanceled})
assert.NoError(err)
assert.Equal(fixedpoint.Zero, size)
})
t.Run("estimated size", func(t *testing.T) {
size, err := processMarketBuySize(&okexapi.OrderDetail{
FillPrice: fixedpoint.NewFromFloat(2),
Size: fixedpoint.NewFromFloat(4),
State: okexapi.OrderStatePartiallyFilled,
})
assert.NoError(err)
assert.Equal(fixedpoint.NewFromFloat(2), size)
})
t.Run("unexpected fill price", func(t *testing.T) {
_, err := processMarketBuySize(&okexapi.OrderDetail{
FillPrice: fixedpoint.Zero,
Size: fixedpoint.NewFromFloat(4),
State: okexapi.OrderStatePartiallyFilled,
})
assert.ErrorContains(err, "fillPrice")
})
t.Run("accumulatedFillsize", func(t *testing.T) {
size, err := processMarketBuySize(&okexapi.OrderDetail{
AccumulatedFillSize: fixedpoint.NewFromFloat(1000),
State: okexapi.OrderStateFilled,
})
assert.NoError(err)
assert.Equal(fixedpoint.NewFromFloat(1000), size)
})
t.Run("unexpected status", func(t *testing.T) {
_, err := processMarketBuySize(&okexapi.OrderDetail{
State: "XXXXXXX",
})
assert.ErrorContains(err, "unexpected")
})
}

View File

@ -30,7 +30,7 @@ type OrderDetail struct {
Side SideType `json:"side"`
State OrderState `json:"state"`
Size fixedpoint.Value `json:"sz"`
TargetCurrency string `json:"tgtCcy"`
TargetCurrency TargetCurrency `json:"tgtCcy"`
UpdatedTime types.MillisecondTimestamp `json:"uTime"`
// Margin currency
@ -69,7 +69,7 @@ type OrderDetail struct {
// Self trade prevention mode. Return "" if self trade prevention is not applicable
StpMode string `json:"stpMode"`
Tag string `json:"tag"`
TradeMode string `json:"tdMode"`
TradeMode TradeMode `json:"tdMode"`
TpOrdPx fixedpoint.Value `json:"tpOrdPx"`
TpTriggerPx fixedpoint.Value `json:"tpTriggerPx"`
TpTriggerPxType string `json:"tpTriggerPxType"`

View File

@ -4,11 +4,10 @@ import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/valyala/fastjson"
"github.com/c9s/bbgo/pkg/exchange/okex/okexapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
@ -22,7 +21,7 @@ const (
ChannelCandlePrefix Channel = "candle"
ChannelAccount Channel = "account"
ChannelMarketTrades Channel = "trades"
ChannelOrders Channel = "orders"
ChannelOrderTrades Channel = "orders"
)
type ActionType string
@ -33,13 +32,8 @@ const (
)
func parseWebSocketEvent(in []byte) (interface{}, error) {
v, err := fastjson.ParseBytes(in)
if err != nil {
return nil, err
}
var event WebSocketEvent
err = json.Unmarshal(in, &event)
err := json.Unmarshal(in, &event)
if err != nil {
return nil, err
}
@ -73,9 +67,14 @@ func parseWebSocketEvent(in []byte) (interface{}, error) {
}
return trade, nil
case ChannelOrders:
// TODO: remove fastjson
return parseOrder(v)
case ChannelOrderTrades:
var orderTrade []OrderTradeEvent
err := json.Unmarshal(event.Data, &orderTrade)
if err != nil {
return nil, err
}
return orderTrade, nil
default:
if strings.HasPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix)) {
@ -391,16 +390,67 @@ func parseAccount(v []byte) (*okexapi.Account, error) {
return &accounts[0], nil
}
func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) {
data := v.Get("data").MarshalTo(nil)
type OrderTradeEvent struct {
okexapi.OrderDetail
var orderDetails []okexapi.OrderDetails
err := json.Unmarshal(data, &orderDetails)
Code types.StrInt64 `json:"code"`
Msg string `json:"msg"`
AmendResult string `json:"amendResult"`
ExecutionType okexapi.LiquidityType `json:"execType"`
// FillFee last filled fee amount or rebate amount:
// Negative number represents the user transaction fee charged by the platform;
// Positive number represents rebate
FillFee fixedpoint.Value `json:"fillFee"`
// FillFeeCurrency last filled fee currency or rebate currency.
// It is fee currency when fillFee is less than 0; It is rebate currency when fillFee>=0.
FillFeeCurrency string `json:"fillFeeCcy"`
// FillNotionalUsd Filled notional value in USD of order
FillNotionalUsd fixedpoint.Value `json:"fillNotionalUsd"`
FillPnl fixedpoint.Value `json:"fillPnl"`
// NotionalUsd Estimated national value in USD of order
NotionalUsd fixedpoint.Value `json:"notionalUsd"`
// ReqId Client Request ID as assigned by the client for order amendment. "" will be returned if there is no order amendment.
ReqId string `json:"reqId"`
LastPrice fixedpoint.Value `json:"lastPx"`
// QuickMgnType Quick Margin type, Only applicable to Quick Margin Mode of isolated margin
// manual, auto_borrow, auto_repay
QuickMgnType string `json:"quickMgnType"`
// AmendSource Source of the order amendation.
AmendSource string `json:"amendSource"`
// CancelSource Source of the order cancellation.
CancelSource string `json:"cancelSource"`
// Only applicable to options; return "" for other instrument types
FillPriceVolume string `json:"fillPxVol"`
FillPriceUsd string `json:"fillPxUsd"`
FillMarkVolume string `json:"fillMarkVol"`
FillFwdPrice string `json:"fillFwdPx"`
FillMarkPrice string `json:"fillMarkPx"`
}
func (o *OrderTradeEvent) toGlobalTrade() (types.Trade, error) {
side := toGlobalSide(o.Side)
tradeId, err := strconv.ParseUint(o.TradeId, 10, 64)
if err != nil {
return nil, err
return types.Trade{}, fmt.Errorf("unexpected trade id [%s] format: %w", o.TradeId, err)
}
return orderDetails, nil
return types.Trade{
ID: tradeId,
OrderID: uint64(o.OrderId),
Exchange: types.ExchangeOKEx,
Price: o.FillPrice,
Quantity: o.FillSize,
QuoteQuantity: o.FillPrice.Mul(o.FillSize),
Symbol: toGlobalSymbol(o.InstrumentID),
Side: side,
IsBuyer: side == types.SideTypeBuy,
IsMaker: o.ExecutionType == okexapi.LiquidityTypeMaker,
Time: types.Time(o.FillTime.Time()),
// charged by the platform is positive in our design, so added the `Neg()`.
Fee: o.FillFee.Neg(),
FeeCurrency: o.FeeCurrency,
FeeDiscounted: false,
}, nil
}
func toGlobalSideType(side okexapi.SideType) (types.SideType, error) {

View File

@ -850,3 +850,292 @@ func TestWebSocketEvent_IsValid(t *testing.T) {
assert.ErrorContains(t, opEvent.IsValid(), "unexpected event type")
})
}
func TestOrderTradeEvent(t *testing.T) {
//{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"0","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"0","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"","fee":"0","feeCcy":"OKB","fillFee":"0","fillFeeCcy":"","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"","fillPnl":"0","fillPx":"","fillPxUsd":"","fillPxVol":"","fillSz":"0","fillTime":"","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"live","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"","uTime":"1705384184502"}]}
//{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"1.84","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"54","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"T","fee":"-0.001844337","feeCcy":"OKB","fillFee":"-0.001844337","fillFeeCcy":"OKB","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"99.9","fillPnl":"0","fillPx":"54","fillPxUsd":"","fillPxVol":"","fillSz":"1.844337","fillTime":"1705384184503","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"partially_filled","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"590957341","uTime":"1705384184503"}]}
//{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"1.84","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"54","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"","fee":"-0.001844337","feeCcy":"OKB","fillFee":"0","fillFeeCcy":"","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"99.9","fillPnl":"0","fillPx":"","fillPxUsd":"","fillPxVol":"","fillSz":"0","fillTime":"","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"filled","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"","uTime":"1705384184504"}]}
t.Run("succeeds", func(t *testing.T) {
in := `
{
"arg":{
"channel":"orders",
"instType":"SPOT",
"uid":"530315546680502420"
},
"data":[
{
"accFillSz":"1.84",
"algoClOrdId":"",
"algoId":"",
"amendResult":"",
"amendSource":"",
"attachAlgoClOrdId":"",
"attachAlgoOrds":[
],
"avgPx":"54",
"cTime":"1705384184502",
"cancelSource":"",
"category":"normal",
"ccy":"",
"clOrdId":"",
"code":"0",
"execType":"T",
"fee":"-0.001844337",
"feeCcy":"OKB",
"fillFee":"-0.001844337",
"fillFeeCcy":"OKB",
"fillFwdPx":"",
"fillMarkPx":"",
"fillMarkVol":"",
"fillNotionalUsd":"99.9",
"fillPnl":"0",
"fillPx":"54",
"fillPxUsd":"",
"fillPxVol":"",
"fillSz":"1.84",
"fillTime":"1705384184503",
"instId":"OKB-USDT",
"instType":"SPOT",
"lastPx":"54",
"lever":"0",
"msg":"",
"notionalUsd":"99.929",
"ordId":"667364871905857536",
"ordType":"market",
"pnl":"0",
"posSide":"",
"px":"",
"pxType":"",
"pxUsd":"",
"pxVol":"",
"quickMgnType":"",
"rebate":"0",
"rebateCcy":"USDT",
"reduceOnly":"false",
"reqId":"",
"side":"buy",
"slOrdPx":"",
"slTriggerPx":"",
"slTriggerPxType":"",
"source":"",
"state":"partially_filled",
"stpId":"",
"stpMode":"",
"sz":"100",
"tag":"",
"tdMode":"cash",
"tgtCcy":"quote_ccy",
"tpOrdPx":"",
"tpTriggerPx":"",
"tpTriggerPxType":"",
"tradeId":"590957341",
"uTime":"1705384184503"
}
]
}
`
exp := []OrderTradeEvent{
{
OrderDetail: okexapi.OrderDetail{
AccumulatedFillSize: fixedpoint.NewFromFloat(1.84),
AvgPrice: fixedpoint.NewFromFloat(54),
CreatedTime: types.NewMillisecondTimestampFromInt(1705384184502),
Category: "normal",
ClientOrderId: "",
Fee: fixedpoint.NewFromFloat(-0.001844337),
FeeCurrency: "OKB",
FillTime: types.NewMillisecondTimestampFromInt(1705384184503),
InstrumentID: "OKB-USDT",
InstrumentType: okexapi.InstrumentTypeSpot,
OrderId: types.StrInt64(667364871905857536),
OrderType: okexapi.OrderTypeMarket,
Price: fixedpoint.Zero,
Side: okexapi.SideTypeBuy,
State: okexapi.OrderStatePartiallyFilled,
Size: fixedpoint.NewFromFloat(100),
TargetCurrency: okexapi.TargetCurrencyQuote,
UpdatedTime: types.NewMillisecondTimestampFromInt(1705384184503),
Currency: "",
TradeId: "590957341",
FillPrice: fixedpoint.NewFromFloat(54),
FillSize: fixedpoint.NewFromFloat(1.84),
Lever: "0",
Pnl: fixedpoint.Zero,
PositionSide: "",
PriceUsd: fixedpoint.Zero,
PriceVol: fixedpoint.Zero,
PriceType: "",
Rebate: fixedpoint.Zero,
RebateCcy: "USDT",
AttachAlgoClOrdId: "",
SlOrdPx: fixedpoint.Zero,
SlTriggerPx: fixedpoint.Zero,
SlTriggerPxType: "",
AttachAlgoOrds: []interface{}{},
Source: "",
StpId: "",
StpMode: "",
Tag: "",
TradeMode: okexapi.TradeModeCash,
TpOrdPx: fixedpoint.Zero,
TpTriggerPx: fixedpoint.Zero,
TpTriggerPxType: "",
ReduceOnly: "false",
AlgoClOrdId: "",
AlgoId: "",
},
Code: types.StrInt64(0),
Msg: "",
AmendResult: "",
ExecutionType: okexapi.LiquidityTypeTaker,
FillFee: fixedpoint.NewFromFloat(-0.001844337),
FillFeeCurrency: "OKB",
FillNotionalUsd: fixedpoint.NewFromFloat(99.9),
FillPnl: fixedpoint.Zero,
NotionalUsd: fixedpoint.NewFromFloat(99.929),
ReqId: "",
LastPrice: fixedpoint.NewFromFloat(54),
QuickMgnType: "",
AmendSource: "",
CancelSource: "",
FillPriceVolume: "",
FillPriceUsd: "",
FillMarkVolume: "",
FillFwdPrice: "",
FillMarkPrice: "",
},
}
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.([]OrderTradeEvent)
assert.True(t, ok)
assert.Equal(t, exp, event)
})
}
func TestOrderTradeEvent_toGlobalTrade1(t *testing.T) {
var (
in = `
{
"arg":{
"channel":"orders",
"instType":"SPOT",
"uid":"530315546680502420"
},
"data":[
{
"accFillSz":"1.84",
"algoClOrdId":"",
"algoId":"",
"amendResult":"",
"amendSource":"",
"attachAlgoClOrdId":"",
"attachAlgoOrds":[
],
"avgPx":"54",
"cTime":"1705384184502",
"cancelSource":"",
"category":"normal",
"ccy":"",
"clOrdId":"",
"code":"0",
"execType":"T",
"fee":"-0.001844337",
"feeCcy":"OKB",
"fillFee":"-0.001844337",
"fillFeeCcy":"OKB",
"fillFwdPx":"",
"fillMarkPx":"",
"fillMarkVol":"",
"fillNotionalUsd":"99.9",
"fillPnl":"0",
"fillPx":"54",
"fillPxUsd":"",
"fillPxVol":"",
"fillSz":"1.84",
"fillTime":"1705384184503",
"instId":"OKB-USDT",
"instType":"SPOT",
"lastPx":"54",
"lever":"0",
"msg":"",
"notionalUsd":"99.929",
"ordId":"667364871905857536",
"ordType":"market",
"pnl":"0",
"posSide":"",
"px":"",
"pxType":"",
"pxUsd":"",
"pxVol":"",
"quickMgnType":"",
"rebate":"0",
"rebateCcy":"USDT",
"reduceOnly":"false",
"reqId":"",
"side":"buy",
"slOrdPx":"",
"slTriggerPx":"",
"slTriggerPxType":"",
"source":"",
"state":"partially_filled",
"stpId":"",
"stpMode":"",
"sz":"100",
"tag":"",
"tdMode":"cash",
"tgtCcy":"quote_ccy",
"tpOrdPx":"",
"tpTriggerPx":"",
"tpTriggerPxType":"",
"tradeId":"590957341",
"uTime":"1705384184503"
}
]
}
`
expTrade = types.Trade{
ID: uint64(590957341),
OrderID: uint64(667364871905857536),
Exchange: types.ExchangeOKEx,
Price: fixedpoint.NewFromFloat(54),
Quantity: fixedpoint.NewFromFloat(1.84),
QuoteQuantity: fixedpoint.NewFromFloat(54).Mul(fixedpoint.NewFromFloat(1.84)),
Symbol: "OKBUSDT",
Side: types.SideTypeBuy,
IsBuyer: true,
IsMaker: false,
Time: types.Time(types.NewMillisecondTimestampFromInt(1705384184503).Time()),
Fee: fixedpoint.NewFromFloat(0.001844337),
FeeCurrency: "OKB",
}
)
t.Run("succeeds", func(t *testing.T) {
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.([]OrderTradeEvent)
assert.True(t, ok)
trade, err := event[0].toGlobalTrade()
assert.NoError(t, err)
assert.Equal(t, expTrade, trade)
})
t.Run("unexpected trade id", func(t *testing.T) {
res, err := parseWebSocketEvent([]byte(in))
assert.NoError(t, err)
event, ok := res.([]OrderTradeEvent)
assert.True(t, ok)
event[0].TradeId = "XXXX"
_, err = event[0].toGlobalTrade()
assert.ErrorContains(t, err, "trade id")
})
}

View File

@ -12,7 +12,8 @@ import (
)
var (
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
)
type WebsocketOp struct {
@ -34,11 +35,11 @@ type Stream struct {
client *okexapi.RestClient
// public callbacks
kLineEventCallbacks []func(candle KLineEvent)
bookEventCallbacks []func(book BookEvent)
accountEventCallbacks []func(account okexapi.Account)
orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails)
marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent)
kLineEventCallbacks []func(candle KLineEvent)
bookEventCallbacks []func(book BookEvent)
accountEventCallbacks []func(account okexapi.Account)
orderTradesEventCallbacks []func(orderTrades []OrderTradeEvent)
marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent)
}
func NewStream(client *okexapi.RestClient) *Stream {
@ -55,7 +56,7 @@ func NewStream(client *okexapi.RestClient) *Stream {
stream.OnBookEvent(stream.handleBookEvent)
stream.OnAccountEvent(stream.handleAccountEvent)
stream.OnMarketTradeEvent(stream.handleMarketTradeEvent)
stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent)
stream.OnOrderTradesEvent(stream.handleOrderDetailsEvent)
stream.OnConnect(stream.handleConnect)
stream.OnAuth(stream.handleAuth)
return stream
@ -167,24 +168,26 @@ func (s *Stream) handleAuth() {
}
}
func (s *Stream) handleOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
detailTrades, detailOrders := segmentOrderDetails(orderDetails)
trades, err := toGlobalTrades(detailTrades)
if err != nil {
log.WithError(err).Errorf("error converting order details into trades")
} else {
for _, trade := range trades {
s.EmitTradeUpdate(trade)
func (s *Stream) handleOrderDetailsEvent(orderTrades []OrderTradeEvent) {
for _, evt := range orderTrades {
if evt.TradeId != "" {
trade, err := evt.toGlobalTrade()
if err != nil {
if tradeLogLimiter.Allow() {
log.WithError(err).Errorf("failed to convert global trade")
}
} else {
s.EmitTradeUpdate(trade)
}
}
}
orders, err := toGlobalOrders(detailOrders)
if err != nil {
log.WithError(err).Errorf("error converting order details into orders")
} else {
for _, order := range orders {
s.EmitOrderUpdate(order)
order, err := orderDetailToGlobal(&evt.OrderDetail)
if err != nil {
if tradeLogLimiter.Allow() {
log.WithError(err).Errorf("failed to convert global order")
}
} else {
s.EmitOrderUpdate(*order)
}
}
}
@ -208,7 +211,7 @@ func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) {
for _, event := range data {
trade, err := event.toGlobalTrade()
if err != nil {
if tradeLogLimiter.Allow() {
if marketTradeLogLimiter.Allow() {
log.WithError(err).Error("failed to convert to market trade")
}
continue
@ -262,8 +265,8 @@ func (s *Stream) dispatchEvent(e interface{}) {
case *okexapi.Account:
s.EmitAccountEvent(*et)
case []okexapi.OrderDetails:
s.EmitOrderDetailsEvent(et)
case []OrderTradeEvent:
s.EmitOrderTradesEvent(et)
case []MarketTradeEvent:
s.EmitMarketTradeEvent(et)

View File

@ -36,13 +36,13 @@ func (s *Stream) EmitAccountEvent(account okexapi.Account) {
}
}
func (s *Stream) OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) {
s.orderDetailsEventCallbacks = append(s.orderDetailsEventCallbacks, cb)
func (s *Stream) OnOrderTradesEvent(cb func(orderTrades []OrderTradeEvent)) {
s.orderTradesEventCallbacks = append(s.orderTradesEventCallbacks, cb)
}
func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) {
for _, cb := range s.orderDetailsEventCallbacks {
cb(orderDetails)
func (s *Stream) EmitOrderTradesEvent(orderTrades []OrderTradeEvent) {
for _, cb := range s.orderTradesEventCallbacks {
cb(orderTrades)
}
}
@ -63,7 +63,7 @@ type StreamEventHub interface {
OnAccountEvent(cb func(account okexapi.Account))
OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails))
OnOrderTradesEvent(cb func(orderTrades []OrderTradeEvent))
OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent))
}

View File

@ -140,4 +140,18 @@ func TestStream(t *testing.T) {
c := make(chan struct{})
<-c
})
t.Run("order trade test", func(t *testing.T) {
err := s.Connect(context.Background())
assert.NoError(t, err)
s.OnOrderUpdate(func(order types.Order) {
t.Log("order update", order)
})
s.OnTradeUpdate(func(trade types.Trade) {
t.Log("trade update", trade)
})
c := make(chan struct{})
<-c
})
}