diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index e1bfb7c31..3d2417758 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -25,6 +25,7 @@ type Stream struct { key string secret string + // subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them subscriptions []websocketRequest } @@ -38,12 +39,15 @@ func NewStream(key, secret string) *Stream { s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) s.ws.OnConnected(func(conn *websocket.Conn) { - for _, sub := range s.subscriptions { + subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now())} + subs = append(subs, s.subscriptions...) + for _, sub := range subs { if err := conn.WriteJSON(sub); err != nil { s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub)) } } }) + return s } @@ -58,13 +62,29 @@ func (s *Stream) Connect(ctx context.Context) error { return err } + go func() { + // https://docs.ftx.com/?javascript#request-process + tk := time.NewTicker(15 * time.Second) + defer tk.Stop() + for { + select { + case <-ctx.Done(): + if err := ctx.Err(); err != nil { + logger.WithError(err).Errorf("websocket ping goroutine is terminated") + } + case <-tk.C: + if err := s.ws.Conn().WriteJSON(websocketRequest{ + Operation: ping, + }); err != nil { + logger.WithError(err).Warnf("failed to ping, try in next tick") + } + } + } + }() return nil } func (s *Stream) subscribePrivateEvents() { - s.addSubscription( - newLoginRequest(s.key, s.secret, time.Now()), - ) s.addSubscription(websocketRequest{ Operation: subscribe, Channel: privateOrdersChannel, diff --git a/pkg/exchange/ftx/stream_message_handler.go b/pkg/exchange/ftx/stream_message_handler.go index 0fd931cee..894abce65 100644 --- a/pkg/exchange/ftx/stream_message_handler.go +++ b/pkg/exchange/ftx/stream_message_handler.go @@ -11,13 +11,21 @@ type messageHandler struct { } func (h *messageHandler) handleMessage(message []byte) { - logger.Infof("raw: %+v", string(message)) var r websocketResponse if err := json.Unmarshal(message, &r); err != nil { logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message)) return } + if r.Type == errRespType { + logger.Errorf("receives err: %+v", r) + return + } + + if r.Type == pongRespType { + return + } + switch r.Channel { case orderBookChannel: h.handleOrderBook(r) @@ -26,11 +34,7 @@ func (h *messageHandler) handleMessage(message []byte) { case privateTradesChannel: h.handleTrades(r) default: - if r.Type != errRespType { - logger.Errorf("unsupported message type: %+v", r.Type) - return - } - logger.Errorf("received err: %s", r.toErrResponse()) + logger.Warnf("unsupported message type: %+v", r.Type) } } diff --git a/pkg/exchange/ftx/websocket_messages.go b/pkg/exchange/ftx/websocket_messages.go index 0b99012a0..a10c6cfb9 100644 --- a/pkg/exchange/ftx/websocket_messages.go +++ b/pkg/exchange/ftx/websocket_messages.go @@ -15,6 +15,7 @@ import ( type operation string +const ping operation = "ping" const login operation = "login" const subscribe operation = "subscribe" const unsubscribe operation = "unsubscribe" @@ -77,6 +78,7 @@ func loginBody(millis int64) string { type respType string +const pongRespType respType = "pong" const errRespType respType = "error" const subscribedRespType respType = "subscribed" const unsubscribedRespType respType = "unsubscribed"