From 39c3d23da323e3222df5d610695a8a776b92f5f0 Mon Sep 17 00:00:00 2001 From: Edwin Date: Tue, 24 Oct 2023 22:17:08 +0800 Subject: [PATCH] pkg/exchange: support ping/pong --- pkg/exchange/bitget/stream.go | 43 +++++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/pkg/exchange/bitget/stream.go b/pkg/exchange/bitget/stream.go index eadf25b48..5a2f6f23d 100644 --- a/pkg/exchange/bitget/stream.go +++ b/pkg/exchange/bitget/stream.go @@ -1,12 +1,26 @@ package bitget import ( + "bytes" "context" "encoding/json" "fmt" + "time" "github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi" "github.com/c9s/bbgo/pkg/types" + "github.com/gorilla/websocket" +) + +const ( + // Client should keep ping the server in every 30 seconds. Server will close the connections which has no ping over + // 120 seconds(even when the client is still receiving data from the server) + pingInterval = 30 * time.Second +) + +var ( + pingBytes = []byte("ping") + pongBytes = []byte("pong") ) //go:generate callbackgen -type Stream @@ -25,6 +39,7 @@ func NewStream() *Stream { stream.SetEndpointCreator(stream.createEndpoint) stream.SetParser(parseWebSocketEvent) stream.SetDispatcher(stream.dispatchEvent) + stream.SetHeartBeat(stream.ping) stream.OnConnect(stream.handlerConnect) stream.OnBookEvent(stream.handleBookEvent) @@ -92,6 +107,12 @@ func (s *Stream) dispatchEvent(event interface{}) { case *MarketTradeEvent: s.EmitMarketTradeEvent(*e) + + case []byte: + // We only handle the 'pong' case. Others are unexpected. + if !bytes.Equal(e, pongBytes) { + log.Errorf("invalid event: %q", e) + } } } @@ -116,6 +137,16 @@ func (s *Stream) handleBookEvent(o BookEvent) { } } +// ping implements the bitget text message of WebSocket PingPong. +func (s *Stream) ping(conn *websocket.Conn) error { + err := conn.WriteMessage(websocket.TextMessage, pingBytes) + if err != nil { + log.WithError(err).Error("ping error", err) + return nil + } + return nil +} + func convertSubscription(sub types.Subscription) (WsArg, error) { arg := WsArg{ // support spot only @@ -146,6 +177,18 @@ func convertSubscription(sub types.Subscription) (WsArg, error) { } func parseWebSocketEvent(in []byte) (interface{}, error) { + switch { + case bytes.Equal(in, pongBytes): + // Return the original raw data may seem redundant because we can validate the string and return nil, + // but we cannot return nil to a lower level handler. This can cause confusion in the next handler, such as + // the dispatch handler. Therefore, I return the original raw data. + return in, nil + default: + return parseEvent(in) + } +} + +func parseEvent(in []byte) (interface{}, error) { var event WsEvent err := json.Unmarshal(in, &event)