ftx: websocket keepalive

This commit is contained in:
ycdesu 2021-03-28 16:29:36 +08:00
parent 53c9b0a606
commit f526a937d1
3 changed files with 36 additions and 10 deletions

View File

@ -25,6 +25,7 @@ type Stream struct {
key string key string
secret string secret string
// subscriptions are only accessed in single goroutine environment, so I don't use mutex to protect them
subscriptions []websocketRequest subscriptions []websocketRequest
} }
@ -38,12 +39,15 @@ func NewStream(key, secret string) *Stream {
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage) s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
s.ws.OnConnected(func(conn *websocket.Conn) { s.ws.OnConnected(func(conn *websocket.Conn) {
for _, sub := range s.subscriptions { subs := []websocketRequest{newLoginRequest(s.key, s.secret, time.Now())}
subs = append(subs, s.subscriptions...)
for _, sub := range subs {
if err := conn.WriteJSON(sub); err != nil { if err := conn.WriteJSON(sub); err != nil {
s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub)) s.ws.EmitError(fmt.Errorf("failed to send subscription: %+v", sub))
} }
} }
}) })
return s return s
} }
@ -58,13 +62,29 @@ func (s *Stream) Connect(ctx context.Context) error {
return err return err
} }
go func() {
// https://docs.ftx.com/?javascript#request-process
tk := time.NewTicker(15 * time.Second)
defer tk.Stop()
for {
select {
case <-ctx.Done():
if err := ctx.Err(); err != nil {
logger.WithError(err).Errorf("websocket ping goroutine is terminated")
}
case <-tk.C:
if err := s.ws.Conn().WriteJSON(websocketRequest{
Operation: ping,
}); err != nil {
logger.WithError(err).Warnf("failed to ping, try in next tick")
}
}
}
}()
return nil return nil
} }
func (s *Stream) subscribePrivateEvents() { func (s *Stream) subscribePrivateEvents() {
s.addSubscription(
newLoginRequest(s.key, s.secret, time.Now()),
)
s.addSubscription(websocketRequest{ s.addSubscription(websocketRequest{
Operation: subscribe, Operation: subscribe,
Channel: privateOrdersChannel, Channel: privateOrdersChannel,

View File

@ -11,13 +11,21 @@ type messageHandler struct {
} }
func (h *messageHandler) handleMessage(message []byte) { func (h *messageHandler) handleMessage(message []byte) {
logger.Infof("raw: %+v", string(message))
var r websocketResponse var r websocketResponse
if err := json.Unmarshal(message, &r); err != nil { if err := json.Unmarshal(message, &r); err != nil {
logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message)) logger.WithError(err).Errorf("failed to unmarshal resp: %s", string(message))
return return
} }
if r.Type == errRespType {
logger.Errorf("receives err: %+v", r)
return
}
if r.Type == pongRespType {
return
}
switch r.Channel { switch r.Channel {
case orderBookChannel: case orderBookChannel:
h.handleOrderBook(r) h.handleOrderBook(r)
@ -26,11 +34,7 @@ func (h *messageHandler) handleMessage(message []byte) {
case privateTradesChannel: case privateTradesChannel:
h.handleTrades(r) h.handleTrades(r)
default: default:
if r.Type != errRespType { logger.Warnf("unsupported message type: %+v", r.Type)
logger.Errorf("unsupported message type: %+v", r.Type)
return
}
logger.Errorf("received err: %s", r.toErrResponse())
} }
} }

View File

@ -15,6 +15,7 @@ import (
type operation string type operation string
const ping operation = "ping"
const login operation = "login" const login operation = "login"
const subscribe operation = "subscribe" const subscribe operation = "subscribe"
const unsubscribe operation = "unsubscribe" const unsubscribe operation = "unsubscribe"
@ -77,6 +78,7 @@ func loginBody(millis int64) string {
type respType string type respType string
const pongRespType respType = "pong"
const errRespType respType = "error" const errRespType respType = "error"
const subscribedRespType respType = "subscribed" const subscribedRespType respType = "subscribed"
const unsubscribedRespType respType = "unsubscribed" const unsubscribedRespType respType = "unsubscribed"