diff --git a/pkg/exchange/okex/convert.go b/pkg/exchange/okex/convert.go index b21f285d6..6a455a774 100644 --- a/pkg/exchange/okex/convert.go +++ b/pkg/exchange/okex/convert.go @@ -175,6 +175,25 @@ func tradeToGlobal(trade okexapi.Trade) types.Trade { } } +func processMarketBuySize(o *okexapi.OrderDetail) (fixedpoint.Value, error) { + switch o.State { + case okexapi.OrderStateLive, okexapi.OrderStateCanceled: + return fixedpoint.Zero, nil + + case okexapi.OrderStatePartiallyFilled: + if o.FillPrice.IsZero() { + return fixedpoint.Zero, fmt.Errorf("fillPrice for a partialFilled should not be zero") + } + return o.Size.Div(o.FillPrice), nil + + case okexapi.OrderStateFilled: + return o.AccumulatedFillSize, nil + + default: + return fixedpoint.Zero, fmt.Errorf("unexpected status: %s", o.State) + } +} + func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) { side := toGlobalSide(order.Side) @@ -196,6 +215,17 @@ func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) { return nil, err } + size := order.Size + if order.Side == okexapi.SideTypeBuy && + order.OrderType == okexapi.OrderTypeMarket && + order.TargetCurrency == okexapi.TargetCurrencyQuote { + + size, err = processMarketBuySize(order) + if err != nil { + return nil, err + } + } + return &types.Order{ SubmitOrder: types.SubmitOrder{ ClientOrderID: order.ClientOrderId, @@ -203,7 +233,8 @@ func orderDetailToGlobal(order *okexapi.OrderDetail) (*types.Order, error) { Side: side, Type: orderType, Price: order.Price, - Quantity: order.Size, + Quantity: size, + AveragePrice: order.AvgPrice, TimeInForce: timeInForce, }, Exchange: types.ExchangeOKEx, diff --git a/pkg/exchange/okex/convert_test.go b/pkg/exchange/okex/convert_test.go index 84288cc40..2a23f4a6a 100644 --- a/pkg/exchange/okex/convert_test.go +++ b/pkg/exchange/okex/convert_test.go @@ -68,6 +68,26 @@ func Test_orderDetailToGlobal(t *testing.T) { assert.Equal(expOrder, order) }) + t.Run("succeeds with market/buy/targetQuoteCurrency", func(t *testing.T) { + newOrder := *openOrder + newOrder.OrderType = okexapi.OrderTypeMarket + newOrder.Side = okexapi.SideTypeBuy + newOrder.TargetCurrency = okexapi.TargetCurrencyQuote + newOrder.FillPrice = fixedpoint.NewFromFloat(100) + newOrder.Size = fixedpoint.NewFromFloat(10000) + newOrder.State = okexapi.OrderStatePartiallyFilled + + newExpOrder := *expOrder + newExpOrder.Side = types.SideTypeBuy + newExpOrder.Type = types.OrderTypeMarket + newExpOrder.Quantity = fixedpoint.NewFromFloat(100) + newExpOrder.Status = types.OrderStatusPartiallyFilled + newExpOrder.OriginalStatus = string(okexapi.OrderStatePartiallyFilled) + order, err := orderDetailToGlobal(&newOrder) + assert.NoError(err) + assert.Equal(&newExpOrder, order) + }) + t.Run("unexpected order status", func(t *testing.T) { newOrder := *openOrder newOrder.State = "xxx" @@ -172,3 +192,54 @@ func Test_tradeToGlobal(t *testing.T) { }) }) } + +func Test_processMarketBuyQuantity(t *testing.T) { + var ( + assert = assert.New(t) + ) + + t.Run("zero", func(t *testing.T) { + size, err := processMarketBuySize(&okexapi.OrderDetail{State: okexapi.OrderStateLive}) + assert.NoError(err) + assert.Equal(fixedpoint.Zero, size) + + size, err = processMarketBuySize(&okexapi.OrderDetail{State: okexapi.OrderStateCanceled}) + assert.NoError(err) + assert.Equal(fixedpoint.Zero, size) + }) + + t.Run("estimated size", func(t *testing.T) { + size, err := processMarketBuySize(&okexapi.OrderDetail{ + FillPrice: fixedpoint.NewFromFloat(2), + Size: fixedpoint.NewFromFloat(4), + State: okexapi.OrderStatePartiallyFilled, + }) + assert.NoError(err) + assert.Equal(fixedpoint.NewFromFloat(2), size) + }) + + t.Run("unexpected fill price", func(t *testing.T) { + _, err := processMarketBuySize(&okexapi.OrderDetail{ + FillPrice: fixedpoint.Zero, + Size: fixedpoint.NewFromFloat(4), + State: okexapi.OrderStatePartiallyFilled, + }) + assert.ErrorContains(err, "fillPrice") + }) + + t.Run("accumulatedFillsize", func(t *testing.T) { + size, err := processMarketBuySize(&okexapi.OrderDetail{ + AccumulatedFillSize: fixedpoint.NewFromFloat(1000), + State: okexapi.OrderStateFilled, + }) + assert.NoError(err) + assert.Equal(fixedpoint.NewFromFloat(1000), size) + }) + + t.Run("unexpected status", func(t *testing.T) { + _, err := processMarketBuySize(&okexapi.OrderDetail{ + State: "XXXXXXX", + }) + assert.ErrorContains(err, "unexpected") + }) +} diff --git a/pkg/exchange/okex/okexapi/get_order_history_request.go b/pkg/exchange/okex/okexapi/get_order_history_request.go index 197410ee3..6fa318b35 100644 --- a/pkg/exchange/okex/okexapi/get_order_history_request.go +++ b/pkg/exchange/okex/okexapi/get_order_history_request.go @@ -30,7 +30,7 @@ type OrderDetail struct { Side SideType `json:"side"` State OrderState `json:"state"` Size fixedpoint.Value `json:"sz"` - TargetCurrency string `json:"tgtCcy"` + TargetCurrency TargetCurrency `json:"tgtCcy"` UpdatedTime types.MillisecondTimestamp `json:"uTime"` // Margin currency @@ -69,7 +69,7 @@ type OrderDetail struct { // Self trade prevention mode. Return "" if self trade prevention is not applicable StpMode string `json:"stpMode"` Tag string `json:"tag"` - TradeMode string `json:"tdMode"` + TradeMode TradeMode `json:"tdMode"` TpOrdPx fixedpoint.Value `json:"tpOrdPx"` TpTriggerPx fixedpoint.Value `json:"tpTriggerPx"` TpTriggerPxType string `json:"tpTriggerPxType"` diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index c09e21c15..008d700dd 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -4,11 +4,10 @@ import ( "encoding/json" "errors" "fmt" + "strconv" "strings" "time" - "github.com/valyala/fastjson" - "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -22,7 +21,7 @@ const ( ChannelCandlePrefix Channel = "candle" ChannelAccount Channel = "account" ChannelMarketTrades Channel = "trades" - ChannelOrders Channel = "orders" + ChannelOrderTrades Channel = "orders" ) type ActionType string @@ -33,13 +32,8 @@ const ( ) func parseWebSocketEvent(in []byte) (interface{}, error) { - v, err := fastjson.ParseBytes(in) - if err != nil { - return nil, err - } - var event WebSocketEvent - err = json.Unmarshal(in, &event) + err := json.Unmarshal(in, &event) if err != nil { return nil, err } @@ -73,9 +67,14 @@ func parseWebSocketEvent(in []byte) (interface{}, error) { } return trade, nil - case ChannelOrders: - // TODO: remove fastjson - return parseOrder(v) + case ChannelOrderTrades: + var orderTrade []OrderTradeEvent + err := json.Unmarshal(event.Data, &orderTrade) + if err != nil { + return nil, err + } + + return orderTrade, nil default: if strings.HasPrefix(string(event.Arg.Channel), string(ChannelCandlePrefix)) { @@ -391,16 +390,67 @@ func parseAccount(v []byte) (*okexapi.Account, error) { return &accounts[0], nil } -func parseOrder(v *fastjson.Value) ([]okexapi.OrderDetails, error) { - data := v.Get("data").MarshalTo(nil) +type OrderTradeEvent struct { + okexapi.OrderDetail - var orderDetails []okexapi.OrderDetails - err := json.Unmarshal(data, &orderDetails) + Code types.StrInt64 `json:"code"` + Msg string `json:"msg"` + AmendResult string `json:"amendResult"` + ExecutionType okexapi.LiquidityType `json:"execType"` + // FillFee last filled fee amount or rebate amount: + // Negative number represents the user transaction fee charged by the platform; + // Positive number represents rebate + FillFee fixedpoint.Value `json:"fillFee"` + // FillFeeCurrency last filled fee currency or rebate currency. + // It is fee currency when fillFee is less than 0; It is rebate currency when fillFee>=0. + FillFeeCurrency string `json:"fillFeeCcy"` + // FillNotionalUsd Filled notional value in USD of order + FillNotionalUsd fixedpoint.Value `json:"fillNotionalUsd"` + FillPnl fixedpoint.Value `json:"fillPnl"` + // NotionalUsd Estimated national value in USD of order + NotionalUsd fixedpoint.Value `json:"notionalUsd"` + // ReqId Client Request ID as assigned by the client for order amendment. "" will be returned if there is no order amendment. + ReqId string `json:"reqId"` + LastPrice fixedpoint.Value `json:"lastPx"` + // QuickMgnType Quick Margin type, Only applicable to Quick Margin Mode of isolated margin + // manual, auto_borrow, auto_repay + QuickMgnType string `json:"quickMgnType"` + // AmendSource Source of the order amendation. + AmendSource string `json:"amendSource"` + // CancelSource Source of the order cancellation. + CancelSource string `json:"cancelSource"` + + // Only applicable to options; return "" for other instrument types + FillPriceVolume string `json:"fillPxVol"` + FillPriceUsd string `json:"fillPxUsd"` + FillMarkVolume string `json:"fillMarkVol"` + FillFwdPrice string `json:"fillFwdPx"` + FillMarkPrice string `json:"fillMarkPx"` +} + +func (o *OrderTradeEvent) toGlobalTrade() (types.Trade, error) { + side := toGlobalSide(o.Side) + tradeId, err := strconv.ParseUint(o.TradeId, 10, 64) if err != nil { - return nil, err + return types.Trade{}, fmt.Errorf("unexpected trade id [%s] format: %w", o.TradeId, err) } - - return orderDetails, nil + return types.Trade{ + ID: tradeId, + OrderID: uint64(o.OrderId), + Exchange: types.ExchangeOKEx, + Price: o.FillPrice, + Quantity: o.FillSize, + QuoteQuantity: o.FillPrice.Mul(o.FillSize), + Symbol: toGlobalSymbol(o.InstrumentID), + Side: side, + IsBuyer: side == types.SideTypeBuy, + IsMaker: o.ExecutionType == okexapi.LiquidityTypeMaker, + Time: types.Time(o.FillTime.Time()), + // charged by the platform is positive in our design, so added the `Neg()`. + Fee: o.FillFee.Neg(), + FeeCurrency: o.FeeCurrency, + FeeDiscounted: false, + }, nil } func toGlobalSideType(side okexapi.SideType) (types.SideType, error) { diff --git a/pkg/exchange/okex/parse_test.go b/pkg/exchange/okex/parse_test.go index 84395b58a..f95934df1 100644 --- a/pkg/exchange/okex/parse_test.go +++ b/pkg/exchange/okex/parse_test.go @@ -850,3 +850,292 @@ func TestWebSocketEvent_IsValid(t *testing.T) { assert.ErrorContains(t, opEvent.IsValid(), "unexpected event type") }) } + +func TestOrderTradeEvent(t *testing.T) { + //{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"0","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"0","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"","fee":"0","feeCcy":"OKB","fillFee":"0","fillFeeCcy":"","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"","fillPnl":"0","fillPx":"","fillPxUsd":"","fillPxVol":"","fillSz":"0","fillTime":"","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"live","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"","uTime":"1705384184502"}]} + //{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"1.84","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"54","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"T","fee":"-0.001844337","feeCcy":"OKB","fillFee":"-0.001844337","fillFeeCcy":"OKB","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"99.9","fillPnl":"0","fillPx":"54","fillPxUsd":"","fillPxVol":"","fillSz":"1.844337","fillTime":"1705384184503","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"partially_filled","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"590957341","uTime":"1705384184503"}]} + //{"arg":{"channel":"orders","instType":"SPOT","uid":"530315546680502420"},"data":[{"accFillSz":"1.84","algoClOrdId":"","algoId":"","amendResult":"","amendSource":"","attachAlgoClOrdId":"","attachAlgoOrds":[],"avgPx":"54","cTime":"1705384184502","cancelSource":"","category":"normal","ccy":"","clOrdId":"","code":"0","execType":"","fee":"-0.001844337","feeCcy":"OKB","fillFee":"0","fillFeeCcy":"","fillFwdPx":"","fillMarkPx":"","fillMarkVol":"","fillNotionalUsd":"99.9","fillPnl":"0","fillPx":"","fillPxUsd":"","fillPxVol":"","fillSz":"0","fillTime":"","instId":"OKB-USDT","instType":"SPOT","lastPx":"54","lever":"0","msg":"","notionalUsd":"99.929","ordId":"667364871905857536","ordType":"market","pnl":"0","posSide":"","px":"","pxType":"","pxUsd":"","pxVol":"","quickMgnType":"","rebate":"0","rebateCcy":"USDT","reduceOnly":"false","reqId":"","side":"buy","slOrdPx":"","slTriggerPx":"","slTriggerPxType":"","source":"","state":"filled","stpId":"","stpMode":"","sz":"100","tag":"","tdMode":"cash","tgtCcy":"quote_ccy","tpOrdPx":"","tpTriggerPx":"","tpTriggerPxType":"","tradeId":"","uTime":"1705384184504"}]} + t.Run("succeeds", func(t *testing.T) { + in := ` +{ + "arg":{ + "channel":"orders", + "instType":"SPOT", + "uid":"530315546680502420" + }, + "data":[ + { + "accFillSz":"1.84", + "algoClOrdId":"", + "algoId":"", + "amendResult":"", + "amendSource":"", + "attachAlgoClOrdId":"", + "attachAlgoOrds":[ + + ], + "avgPx":"54", + "cTime":"1705384184502", + "cancelSource":"", + "category":"normal", + "ccy":"", + "clOrdId":"", + "code":"0", + "execType":"T", + "fee":"-0.001844337", + "feeCcy":"OKB", + "fillFee":"-0.001844337", + "fillFeeCcy":"OKB", + "fillFwdPx":"", + "fillMarkPx":"", + "fillMarkVol":"", + "fillNotionalUsd":"99.9", + "fillPnl":"0", + "fillPx":"54", + "fillPxUsd":"", + "fillPxVol":"", + "fillSz":"1.84", + "fillTime":"1705384184503", + "instId":"OKB-USDT", + "instType":"SPOT", + "lastPx":"54", + "lever":"0", + "msg":"", + "notionalUsd":"99.929", + "ordId":"667364871905857536", + "ordType":"market", + "pnl":"0", + "posSide":"", + "px":"", + "pxType":"", + "pxUsd":"", + "pxVol":"", + "quickMgnType":"", + "rebate":"0", + "rebateCcy":"USDT", + "reduceOnly":"false", + "reqId":"", + "side":"buy", + "slOrdPx":"", + "slTriggerPx":"", + "slTriggerPxType":"", + "source":"", + "state":"partially_filled", + "stpId":"", + "stpMode":"", + "sz":"100", + "tag":"", + "tdMode":"cash", + "tgtCcy":"quote_ccy", + "tpOrdPx":"", + "tpTriggerPx":"", + "tpTriggerPxType":"", + "tradeId":"590957341", + "uTime":"1705384184503" + } + ] +} +` + + exp := []OrderTradeEvent{ + { + OrderDetail: okexapi.OrderDetail{ + AccumulatedFillSize: fixedpoint.NewFromFloat(1.84), + AvgPrice: fixedpoint.NewFromFloat(54), + CreatedTime: types.NewMillisecondTimestampFromInt(1705384184502), + Category: "normal", + ClientOrderId: "", + Fee: fixedpoint.NewFromFloat(-0.001844337), + FeeCurrency: "OKB", + FillTime: types.NewMillisecondTimestampFromInt(1705384184503), + InstrumentID: "OKB-USDT", + InstrumentType: okexapi.InstrumentTypeSpot, + OrderId: types.StrInt64(667364871905857536), + OrderType: okexapi.OrderTypeMarket, + Price: fixedpoint.Zero, + Side: okexapi.SideTypeBuy, + State: okexapi.OrderStatePartiallyFilled, + Size: fixedpoint.NewFromFloat(100), + TargetCurrency: okexapi.TargetCurrencyQuote, + UpdatedTime: types.NewMillisecondTimestampFromInt(1705384184503), + Currency: "", + TradeId: "590957341", + FillPrice: fixedpoint.NewFromFloat(54), + FillSize: fixedpoint.NewFromFloat(1.84), + Lever: "0", + Pnl: fixedpoint.Zero, + PositionSide: "", + PriceUsd: fixedpoint.Zero, + PriceVol: fixedpoint.Zero, + PriceType: "", + Rebate: fixedpoint.Zero, + RebateCcy: "USDT", + AttachAlgoClOrdId: "", + SlOrdPx: fixedpoint.Zero, + SlTriggerPx: fixedpoint.Zero, + SlTriggerPxType: "", + AttachAlgoOrds: []interface{}{}, + Source: "", + StpId: "", + StpMode: "", + Tag: "", + TradeMode: okexapi.TradeModeCash, + TpOrdPx: fixedpoint.Zero, + TpTriggerPx: fixedpoint.Zero, + TpTriggerPxType: "", + ReduceOnly: "false", + AlgoClOrdId: "", + AlgoId: "", + }, + Code: types.StrInt64(0), + Msg: "", + AmendResult: "", + ExecutionType: okexapi.LiquidityTypeTaker, + FillFee: fixedpoint.NewFromFloat(-0.001844337), + FillFeeCurrency: "OKB", + FillNotionalUsd: fixedpoint.NewFromFloat(99.9), + FillPnl: fixedpoint.Zero, + NotionalUsd: fixedpoint.NewFromFloat(99.929), + ReqId: "", + LastPrice: fixedpoint.NewFromFloat(54), + QuickMgnType: "", + AmendSource: "", + CancelSource: "", + FillPriceVolume: "", + FillPriceUsd: "", + FillMarkVolume: "", + FillFwdPrice: "", + FillMarkPrice: "", + }, + } + + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.([]OrderTradeEvent) + assert.True(t, ok) + assert.Equal(t, exp, event) + }) +} + +func TestOrderTradeEvent_toGlobalTrade1(t *testing.T) { + var ( + in = ` +{ + "arg":{ + "channel":"orders", + "instType":"SPOT", + "uid":"530315546680502420" + }, + "data":[ + { + "accFillSz":"1.84", + "algoClOrdId":"", + "algoId":"", + "amendResult":"", + "amendSource":"", + "attachAlgoClOrdId":"", + "attachAlgoOrds":[ + + ], + "avgPx":"54", + "cTime":"1705384184502", + "cancelSource":"", + "category":"normal", + "ccy":"", + "clOrdId":"", + "code":"0", + "execType":"T", + "fee":"-0.001844337", + "feeCcy":"OKB", + "fillFee":"-0.001844337", + "fillFeeCcy":"OKB", + "fillFwdPx":"", + "fillMarkPx":"", + "fillMarkVol":"", + "fillNotionalUsd":"99.9", + "fillPnl":"0", + "fillPx":"54", + "fillPxUsd":"", + "fillPxVol":"", + "fillSz":"1.84", + "fillTime":"1705384184503", + "instId":"OKB-USDT", + "instType":"SPOT", + "lastPx":"54", + "lever":"0", + "msg":"", + "notionalUsd":"99.929", + "ordId":"667364871905857536", + "ordType":"market", + "pnl":"0", + "posSide":"", + "px":"", + "pxType":"", + "pxUsd":"", + "pxVol":"", + "quickMgnType":"", + "rebate":"0", + "rebateCcy":"USDT", + "reduceOnly":"false", + "reqId":"", + "side":"buy", + "slOrdPx":"", + "slTriggerPx":"", + "slTriggerPxType":"", + "source":"", + "state":"partially_filled", + "stpId":"", + "stpMode":"", + "sz":"100", + "tag":"", + "tdMode":"cash", + "tgtCcy":"quote_ccy", + "tpOrdPx":"", + "tpTriggerPx":"", + "tpTriggerPxType":"", + "tradeId":"590957341", + "uTime":"1705384184503" + } + ] +} +` + expTrade = types.Trade{ + ID: uint64(590957341), + OrderID: uint64(667364871905857536), + Exchange: types.ExchangeOKEx, + Price: fixedpoint.NewFromFloat(54), + Quantity: fixedpoint.NewFromFloat(1.84), + QuoteQuantity: fixedpoint.NewFromFloat(54).Mul(fixedpoint.NewFromFloat(1.84)), + Symbol: "OKBUSDT", + Side: types.SideTypeBuy, + IsBuyer: true, + IsMaker: false, + Time: types.Time(types.NewMillisecondTimestampFromInt(1705384184503).Time()), + Fee: fixedpoint.NewFromFloat(0.001844337), + FeeCurrency: "OKB", + } + ) + + t.Run("succeeds", func(t *testing.T) { + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.([]OrderTradeEvent) + assert.True(t, ok) + + trade, err := event[0].toGlobalTrade() + assert.NoError(t, err) + assert.Equal(t, expTrade, trade) + }) + + t.Run("unexpected trade id", func(t *testing.T) { + res, err := parseWebSocketEvent([]byte(in)) + assert.NoError(t, err) + event, ok := res.([]OrderTradeEvent) + assert.True(t, ok) + + event[0].TradeId = "XXXX" + _, err = event[0].toGlobalTrade() + assert.ErrorContains(t, err, "trade id") + }) + +} diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 96b955c73..e7dcc7f8e 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -12,7 +12,8 @@ import ( ) var ( - tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) + marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) + tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) ) type WebsocketOp struct { @@ -34,11 +35,11 @@ type Stream struct { client *okexapi.RestClient // public callbacks - kLineEventCallbacks []func(candle KLineEvent) - bookEventCallbacks []func(book BookEvent) - accountEventCallbacks []func(account okexapi.Account) - orderDetailsEventCallbacks []func(orderDetails []okexapi.OrderDetails) - marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent) + kLineEventCallbacks []func(candle KLineEvent) + bookEventCallbacks []func(book BookEvent) + accountEventCallbacks []func(account okexapi.Account) + orderTradesEventCallbacks []func(orderTrades []OrderTradeEvent) + marketTradeEventCallbacks []func(tradeDetail []MarketTradeEvent) } func NewStream(client *okexapi.RestClient) *Stream { @@ -55,7 +56,7 @@ func NewStream(client *okexapi.RestClient) *Stream { stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountEvent(stream.handleAccountEvent) stream.OnMarketTradeEvent(stream.handleMarketTradeEvent) - stream.OnOrderDetailsEvent(stream.handleOrderDetailsEvent) + stream.OnOrderTradesEvent(stream.handleOrderDetailsEvent) stream.OnConnect(stream.handleConnect) stream.OnAuth(stream.handleAuth) return stream @@ -167,24 +168,26 @@ func (s *Stream) handleAuth() { } } -func (s *Stream) handleOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { - detailTrades, detailOrders := segmentOrderDetails(orderDetails) - - trades, err := toGlobalTrades(detailTrades) - if err != nil { - log.WithError(err).Errorf("error converting order details into trades") - } else { - for _, trade := range trades { - s.EmitTradeUpdate(trade) +func (s *Stream) handleOrderDetailsEvent(orderTrades []OrderTradeEvent) { + for _, evt := range orderTrades { + if evt.TradeId != "" { + trade, err := evt.toGlobalTrade() + if err != nil { + if tradeLogLimiter.Allow() { + log.WithError(err).Errorf("failed to convert global trade") + } + } else { + s.EmitTradeUpdate(trade) + } } - } - orders, err := toGlobalOrders(detailOrders) - if err != nil { - log.WithError(err).Errorf("error converting order details into orders") - } else { - for _, order := range orders { - s.EmitOrderUpdate(order) + order, err := orderDetailToGlobal(&evt.OrderDetail) + if err != nil { + if tradeLogLimiter.Allow() { + log.WithError(err).Errorf("failed to convert global order") + } + } else { + s.EmitOrderUpdate(*order) } } } @@ -208,7 +211,7 @@ func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) { for _, event := range data { trade, err := event.toGlobalTrade() if err != nil { - if tradeLogLimiter.Allow() { + if marketTradeLogLimiter.Allow() { log.WithError(err).Error("failed to convert to market trade") } continue @@ -262,8 +265,8 @@ func (s *Stream) dispatchEvent(e interface{}) { case *okexapi.Account: s.EmitAccountEvent(*et) - case []okexapi.OrderDetails: - s.EmitOrderDetailsEvent(et) + case []OrderTradeEvent: + s.EmitOrderTradesEvent(et) case []MarketTradeEvent: s.EmitMarketTradeEvent(et) diff --git a/pkg/exchange/okex/stream_callbacks.go b/pkg/exchange/okex/stream_callbacks.go index 089da09aa..79ae916e9 100644 --- a/pkg/exchange/okex/stream_callbacks.go +++ b/pkg/exchange/okex/stream_callbacks.go @@ -36,13 +36,13 @@ func (s *Stream) EmitAccountEvent(account okexapi.Account) { } } -func (s *Stream) OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) { - s.orderDetailsEventCallbacks = append(s.orderDetailsEventCallbacks, cb) +func (s *Stream) OnOrderTradesEvent(cb func(orderTrades []OrderTradeEvent)) { + s.orderTradesEventCallbacks = append(s.orderTradesEventCallbacks, cb) } -func (s *Stream) EmitOrderDetailsEvent(orderDetails []okexapi.OrderDetails) { - for _, cb := range s.orderDetailsEventCallbacks { - cb(orderDetails) +func (s *Stream) EmitOrderTradesEvent(orderTrades []OrderTradeEvent) { + for _, cb := range s.orderTradesEventCallbacks { + cb(orderTrades) } } @@ -63,7 +63,7 @@ type StreamEventHub interface { OnAccountEvent(cb func(account okexapi.Account)) - OnOrderDetailsEvent(cb func(orderDetails []okexapi.OrderDetails)) + OnOrderTradesEvent(cb func(orderTrades []OrderTradeEvent)) OnMarketTradeEvent(cb func(tradeDetail []MarketTradeEvent)) } diff --git a/pkg/exchange/okex/stream_test.go b/pkg/exchange/okex/stream_test.go index 7f85973ad..0d63c4809 100644 --- a/pkg/exchange/okex/stream_test.go +++ b/pkg/exchange/okex/stream_test.go @@ -140,4 +140,18 @@ func TestStream(t *testing.T) { c := make(chan struct{}) <-c }) + + t.Run("order trade test", func(t *testing.T) { + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnOrderUpdate(func(order types.Order) { + t.Log("order update", order) + }) + s.OnTradeUpdate(func(trade types.Trade) { + t.Log("trade update", trade) + }) + c := make(chan struct{}) + <-c + }) }