From cb66f18b542e5a02c04ea98ac31b1c2b3a8fd430 Mon Sep 17 00:00:00 2001 From: zenix Date: Wed, 23 Mar 2022 15:51:19 +0900 Subject: [PATCH] feature: add ftx market trade implementation --- pkg/exchange/ftx/stream.go | 21 +++++++++--- pkg/exchange/ftx/stream_message_handler.go | 17 ++++++++++ pkg/exchange/ftx/websocket_messages.go | 39 +++++++++++++++++++++- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index 4d0b2e0f2..ba68934ba 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -111,25 +111,36 @@ func (s *Stream) addSubscription(request websocketRequest) { } func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.SubscribeOptions) { - if channel == types.BookChannel { + switch channel { + case types.BookChannel: s.addSubscription(websocketRequest{ Operation: subscribe, Channel: orderBookChannel, Market: toLocalSymbol(TrimUpperString(symbol)), }) - } else if channel == types.BookTickerChannel { + return + case types.BookTickerChannel: s.addSubscription(websocketRequest{ Operation: subscribe, Channel: bookTickerChannel, Market: toLocalSymbol(TrimUpperString(symbol)), }) - } else if channel == types.KLineChannel { + return + case types.KLineChannel: // FTX does not support kline channel, do polling interval := types.Interval(option.Interval) ks := klineSubscription{symbol: symbol, interval: interval} s.klineSubscriptions = append(s.klineSubscriptions, ks) - } else { - panic("only support book/kline channel now") + return + case types.MarketTradeChannel: + s.addSubscription(websocketRequest{ + Operation: subscribe, + Channel: marketTradeChannel, + Market: toLocalSymbol(TrimUpperString(symbol)), + }) + return + default: + panic("only support book/kline/trade channel now") } } diff --git a/pkg/exchange/ftx/stream_message_handler.go b/pkg/exchange/ftx/stream_message_handler.go index b88106142..98744622c 100644 --- a/pkg/exchange/ftx/stream_message_handler.go +++ b/pkg/exchange/ftx/stream_message_handler.go @@ -31,6 +31,8 @@ func (h *messageHandler) handleMessage(message []byte) { h.handleOrderBook(r) case bookTickerChannel: h.handleBookTicker(r) + case marketTradeChannel: + h.handleMarketTrade(r) case privateOrdersChannel: h.handlePrivateOrders(r) case privateTradesChannel: @@ -83,6 +85,21 @@ func (h *messageHandler) handleOrderBook(response websocketResponse) { } } +func (h *messageHandler) handleMarketTrade(response websocketResponse) { + if response.Type == subscribedRespType { + h.handleSubscribedMessage(response) + return + } + trades, err := response.toMarketTradeResponse() + if err != nil { + logger.WithError(err).Errorf("failed to generate market trade %v", response) + return + } + for _, trade := range trades { + h.EmitMarketTrade(trade) + } +} + func (h *messageHandler) handleBookTicker(response websocketResponse) { if response.Type == subscribedRespType { h.handleSubscribedMessage(response) diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index a45639946..f50be0b18 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -24,6 +24,7 @@ const unsubscribe operation = "unsubscribe" type channel string const orderBookChannel channel = "orderbook" +const marketTradeChannel channel = "trades" const bookTickerChannel channel = "ticker" const privateOrdersChannel channel = "orders" const privateTradesChannel channel = "fills" @@ -119,6 +120,42 @@ type orderUpdateResponse struct { Data ftxapi.Order `json:"data"` } +type trade struct { + Price fixedpoint.Value `json:"price"` + Size fixedpoint.Value `json:"size"` + Side string `json:"side"` + Liquidation bool `json:"liquidation"` + Time time.Time `json:"time"` +} +type tradeResponse struct { + mandatoryFields + Data []trade `json:"data"` +} + +func (r websocketResponse) toMarketTradeResponse() (t []types.Trade, err error) { + if r.Channel != marketTradeChannel { + return t, fmt.Errorf("type %s, channel %s: channel incorrect", r.Type, r.Channel) + } + var tds []trade + if err = json.Unmarshal(r.Data, &tds); err != nil { + return t, err + } + t = make([]types.Trade, len(tds)) + for i, td := range tds { + tt := &t[i] + tt.Exchange = types.ExchangeFTX + tt.Price = td.Price + tt.Quantity = td.Size + tt.QuoteQuantity = td.Size + tt.Symbol = r.Market + tt.Side = types.SideType(TrimUpperString(string(td.Side))) + tt.IsBuyer = true + tt.IsMaker = false + tt.Time = types.Time(td.Time) + } + return t, nil +} + func (r websocketResponse) toOrderUpdateResponse() (orderUpdateResponse, error) { if r.Channel != privateOrdersChannel { return orderUpdateResponse{}, fmt.Errorf("type %s, channel %s: %w", r.Type, r.Channel, errUnsupportedConversion) @@ -150,7 +187,7 @@ func (r websocketResponse) toTradeUpdateResponse() (tradeUpdateResponse, error) } /* -Private: + Private: order: {"type": "subscribed", "channel": "orders"} Public