diff --git a/pkg/exchange/bitget/stream.go b/pkg/exchange/bitget/stream.go index 53e696490..00ecae426 100644 --- a/pkg/exchange/bitget/stream.go +++ b/pkg/exchange/bitget/stream.go @@ -129,11 +129,6 @@ func (s *Stream) dispatchEvent(event interface{}) { case *AccountEvent: s.EmitAccountEvent(*e) - case []byte: - // We only handle the 'pong' case. Others are unexpected. - if !bytes.Equal(e, pongBytes) { - log.Errorf("invalid event: %q", e) - } } } @@ -240,10 +235,9 @@ func convertSubscription(sub types.Subscription) (WsArg, error) { func parseWebSocketEvent(in []byte) (interface{}, error) { switch { case bytes.Equal(in, pongBytes): - // Return the original raw data may seem redundant because we can validate the string and return nil, - // but we cannot return nil to a lower level handler. This can cause confusion in the next handler, such as - // the dispatch handler. Therefore, I return the original raw data. - return in, nil + // return global pong event to avoid emit raw message + return types.WebsocketPongEvent{}, nil + default: return parseEvent(in) } diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 891112567..47581f2ae 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -159,9 +159,6 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) { func (s *Stream) dispatchEvent(event interface{}) { switch e := event.(type) { case *WebSocketOpEvent: - if err := e.IsValid(); err != nil { - log.Errorf("invalid event: %v", err) - } if e.IsAuthenticated() { s.EmitAuth() } @@ -197,6 +194,15 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) { switch { case e.IsOp(): + if err = e.IsValid(); err != nil { + log.Errorf("invalid event: %+v, err: %s", e, err) + return nil, err + } + + // return global pong event to avoid emit raw message + if ok, pongEvent := e.toGlobalPongEventIfValid(); ok { + return pongEvent, nil + } return e.WebSocketOpEvent, nil case e.IsTopic(): diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 7ae55bd85..1c95f638c 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -88,6 +88,13 @@ func (w *WebSocketOpEvent) IsValid() error { } } +func (w *WebSocketOpEvent) toGlobalPongEventIfValid() (bool, *types.WebsocketPongEvent) { + if w.Op == WsOpTypePing || w.Op == WsOpTypePong { + return true, &types.WebsocketPongEvent{} + } + return false, nil +} + func (w *WebSocketOpEvent) IsAuthenticated() bool { return w.Op == WsOpTypeAuth && w.Success } diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 5f32abd77..603c49f09 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -20,19 +20,9 @@ func Test_parseWebSocketEvent(t *testing.T) { raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) - expRetMsg := string(WsOpTypePong) - e, ok := raw.(*WebSocketOpEvent) + e, ok := raw.(*types.WebsocketPongEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketOpEvent{ - Success: true, - RetMsg: expRetMsg, - ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", - ReqId: "", - Op: WsOpTypePing, - Args: nil, - }, e) - - assert.NoError(t, e.IsValid()) + assert.Equal(t, &types.WebsocketPongEvent{}, e) }) t.Run("[public] PingEvent with req id", func(t *testing.T) { @@ -41,20 +31,9 @@ func Test_parseWebSocketEvent(t *testing.T) { raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) - expRetMsg := string(WsOpTypePong) - expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0" - e, ok := raw.(*WebSocketOpEvent) + e, ok := raw.(*types.WebsocketPongEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketOpEvent{ - Success: true, - RetMsg: expRetMsg, - ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44", - ReqId: expReqId, - Op: WsOpTypePing, - Args: nil, - }, e) - - assert.NoError(t, e.IsValid()) + assert.Equal(t, &types.WebsocketPongEvent{}, e) }) t.Run("[private] PingEvent without req id", func(t *testing.T) { @@ -63,18 +42,9 @@ func Test_parseWebSocketEvent(t *testing.T) { raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) - e, ok := raw.(*WebSocketOpEvent) + e, ok := raw.(*types.WebsocketPongEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketOpEvent{ - Success: false, - RetMsg: "", - ConnId: "civn4p1dcjmtvb69ome0-yrt1", - ReqId: "", - Op: WsOpTypePong, - Args: []string{"1690884539181"}, - }, e) - - assert.NoError(t, e.IsValid()) + assert.Equal(t, &types.WebsocketPongEvent{}, e) }) t.Run("[private] PingEvent with req id", func(t *testing.T) { @@ -83,19 +53,9 @@ func Test_parseWebSocketEvent(t *testing.T) { raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) - expReqId := "78d36b57-a142-47b7-9143-5843df77d44d" - e, ok := raw.(*WebSocketOpEvent) + e, ok := raw.(*types.WebsocketPongEvent) assert.True(t, ok) - assert.Equal(t, &WebSocketOpEvent{ - Success: false, - RetMsg: "", - ConnId: "civn4p1dcjmtvb69ome0-yrt1", - ReqId: expReqId, - Op: WsOpTypePong, - Args: []string{"1690884539181"}, - }, e) - - assert.NoError(t, e.IsValid()) + assert.Equal(t, &types.WebsocketPongEvent{}, e) }) } diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 4ce8c161f..96c66c4ea 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -62,6 +62,8 @@ type HeartBeat func(conn *websocket.Conn) error type BeforeConnect func(ctx context.Context) error +type WebsocketPongEvent struct{} + //go:generate callbackgen -type StandardStream -interface type StandardStream struct { parser Parser @@ -226,6 +228,9 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel // flag format: debug-{component}-{message type} debugRawMessage := viper.GetBool("debug-websocket-raw-message") + hasParser := s.parser != nil + hasDispatcher := s.dispatcher != nil + for { select { @@ -276,22 +281,30 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel continue } - s.EmitRawMessage(message) - if debugRawMessage { log.Info(string(message)) } - var e interface{} - if s.parser != nil { - e, err = s.parser(message) - if err != nil { - log.WithError(err).Errorf("websocket event parse error, message: %s", message) - continue - } + if !hasParser { + s.EmitRawMessage(message) + continue } - if s.dispatcher != nil { + var e interface{} + e, err = s.parser(message) + if err != nil { + log.WithError(err).Errorf("websocket event parse error, message: %s", message) + // emit raw message even if occurs error, because we want anything can be detected + s.EmitRawMessage(message) + continue + } + + // skip pong event to avoid the message like spam + if _, ok := e.(*WebsocketPongEvent); !ok { + s.EmitRawMessage(message) + } + + if hasDispatcher { s.dispatcher(e) } }