Merge pull request #1417 from c9s/edwin/skip-ping-pong-event

REFACTOR: [stream] skip pong event on emitting raw message
This commit is contained in:
bailantaotao 2023-11-14 20:49:13 +08:00 committed by GitHub
commit 580c6d2030
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 50 additions and 70 deletions

View File

@ -129,11 +129,6 @@ func (s *Stream) dispatchEvent(event interface{}) {
case *AccountEvent: case *AccountEvent:
s.EmitAccountEvent(*e) s.EmitAccountEvent(*e)
case []byte:
// We only handle the 'pong' case. Others are unexpected.
if !bytes.Equal(e, pongBytes) {
log.Errorf("invalid event: %q", e)
}
} }
} }
@ -240,10 +235,9 @@ func convertSubscription(sub types.Subscription) (WsArg, error) {
func parseWebSocketEvent(in []byte) (interface{}, error) { func parseWebSocketEvent(in []byte) (interface{}, error) {
switch { switch {
case bytes.Equal(in, pongBytes): case bytes.Equal(in, pongBytes):
// Return the original raw data may seem redundant because we can validate the string and return nil, // return global pong event to avoid emit raw message
// but we cannot return nil to a lower level handler. This can cause confusion in the next handler, such as return types.WebsocketPongEvent{}, nil
// the dispatch handler. Therefore, I return the original raw data.
return in, nil
default: default:
return parseEvent(in) return parseEvent(in)
} }

View File

@ -159,9 +159,6 @@ func (s *Stream) createEndpoint(_ context.Context) (string, error) {
func (s *Stream) dispatchEvent(event interface{}) { func (s *Stream) dispatchEvent(event interface{}) {
switch e := event.(type) { switch e := event.(type) {
case *WebSocketOpEvent: case *WebSocketOpEvent:
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
}
if e.IsAuthenticated() { if e.IsAuthenticated() {
s.EmitAuth() s.EmitAuth()
} }
@ -197,6 +194,15 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
switch { switch {
case e.IsOp(): case e.IsOp():
if err = e.IsValid(); err != nil {
log.Errorf("invalid event: %+v, err: %s", e, err)
return nil, err
}
// return global pong event to avoid emit raw message
if ok, pongEvent := e.toGlobalPongEventIfValid(); ok {
return pongEvent, nil
}
return e.WebSocketOpEvent, nil return e.WebSocketOpEvent, nil
case e.IsTopic(): case e.IsTopic():

View File

@ -88,6 +88,13 @@ func (w *WebSocketOpEvent) IsValid() error {
} }
} }
func (w *WebSocketOpEvent) toGlobalPongEventIfValid() (bool, *types.WebsocketPongEvent) {
if w.Op == WsOpTypePing || w.Op == WsOpTypePong {
return true, &types.WebsocketPongEvent{}
}
return false, nil
}
func (w *WebSocketOpEvent) IsAuthenticated() bool { func (w *WebSocketOpEvent) IsAuthenticated() bool {
return w.Op == WsOpTypeAuth && w.Success return w.Op == WsOpTypeAuth && w.Success
} }

View File

@ -20,19 +20,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
raw, err := s.parseWebSocketEvent([]byte(msg)) raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err) assert.NoError(t, err)
expRetMsg := string(WsOpTypePong) e, ok := raw.(*types.WebsocketPongEvent)
e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketOpEvent{ assert.Equal(t, &types.WebsocketPongEvent{}, e)
Success: true,
RetMsg: expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
ReqId: "",
Op: WsOpTypePing,
Args: nil,
}, e)
assert.NoError(t, e.IsValid())
}) })
t.Run("[public] PingEvent with req id", func(t *testing.T) { t.Run("[public] PingEvent with req id", func(t *testing.T) {
@ -41,20 +31,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
raw, err := s.parseWebSocketEvent([]byte(msg)) raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err) assert.NoError(t, err)
expRetMsg := string(WsOpTypePong) e, ok := raw.(*types.WebsocketPongEvent)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketOpEvent{ assert.Equal(t, &types.WebsocketPongEvent{}, e)
Success: true,
RetMsg: expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
ReqId: expReqId,
Op: WsOpTypePing,
Args: nil,
}, e)
assert.NoError(t, e.IsValid())
}) })
t.Run("[private] PingEvent without req id", func(t *testing.T) { t.Run("[private] PingEvent without req id", func(t *testing.T) {
@ -63,18 +42,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
raw, err := s.parseWebSocketEvent([]byte(msg)) raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err) assert.NoError(t, err)
e, ok := raw.(*WebSocketOpEvent) e, ok := raw.(*types.WebsocketPongEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketOpEvent{ assert.Equal(t, &types.WebsocketPongEvent{}, e)
Success: false,
RetMsg: "",
ConnId: "civn4p1dcjmtvb69ome0-yrt1",
ReqId: "",
Op: WsOpTypePong,
Args: []string{"1690884539181"},
}, e)
assert.NoError(t, e.IsValid())
}) })
t.Run("[private] PingEvent with req id", func(t *testing.T) { t.Run("[private] PingEvent with req id", func(t *testing.T) {
@ -83,19 +53,9 @@ func Test_parseWebSocketEvent(t *testing.T) {
raw, err := s.parseWebSocketEvent([]byte(msg)) raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err) assert.NoError(t, err)
expReqId := "78d36b57-a142-47b7-9143-5843df77d44d" e, ok := raw.(*types.WebsocketPongEvent)
e, ok := raw.(*WebSocketOpEvent)
assert.True(t, ok) assert.True(t, ok)
assert.Equal(t, &WebSocketOpEvent{ assert.Equal(t, &types.WebsocketPongEvent{}, e)
Success: false,
RetMsg: "",
ConnId: "civn4p1dcjmtvb69ome0-yrt1",
ReqId: expReqId,
Op: WsOpTypePong,
Args: []string{"1690884539181"},
}, e)
assert.NoError(t, e.IsValid())
}) })
} }

View File

@ -62,6 +62,8 @@ type HeartBeat func(conn *websocket.Conn) error
type BeforeConnect func(ctx context.Context) error type BeforeConnect func(ctx context.Context) error
type WebsocketPongEvent struct{}
//go:generate callbackgen -type StandardStream -interface //go:generate callbackgen -type StandardStream -interface
type StandardStream struct { type StandardStream struct {
parser Parser parser Parser
@ -226,6 +228,9 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel
// flag format: debug-{component}-{message type} // flag format: debug-{component}-{message type}
debugRawMessage := viper.GetBool("debug-websocket-raw-message") debugRawMessage := viper.GetBool("debug-websocket-raw-message")
hasParser := s.parser != nil
hasDispatcher := s.dispatcher != nil
for { for {
select { select {
@ -276,22 +281,30 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel
continue continue
} }
s.EmitRawMessage(message)
if debugRawMessage { if debugRawMessage {
log.Info(string(message)) log.Info(string(message))
} }
if !hasParser {
s.EmitRawMessage(message)
continue
}
var e interface{} var e interface{}
if s.parser != nil {
e, err = s.parser(message) e, err = s.parser(message)
if err != nil { if err != nil {
log.WithError(err).Errorf("websocket event parse error, message: %s", message) log.WithError(err).Errorf("websocket event parse error, message: %s", message)
// emit raw message even if occurs error, because we want anything can be detected
s.EmitRawMessage(message)
continue continue
} }
// skip pong event to avoid the message like spam
if _, ok := e.(*WebsocketPongEvent); !ok {
s.EmitRawMessage(message)
} }
if s.dispatcher != nil { if hasDispatcher {
s.dispatcher(e) s.dispatcher(e)
} }
} }