pkg/exchange: support ping/pong

This commit is contained in:
Edwin 2023-10-24 22:17:08 +08:00
parent 32c75ea3b8
commit 39c3d23da3

View File

@ -1,12 +1,26 @@
package bitget
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
"github.com/c9s/bbgo/pkg/exchange/bitget/bitgetapi"
"github.com/c9s/bbgo/pkg/types"
"github.com/gorilla/websocket"
)
const (
// Client should keep ping the server in every 30 seconds. Server will close the connections which has no ping over
// 120 seconds(even when the client is still receiving data from the server)
pingInterval = 30 * time.Second
)
var (
pingBytes = []byte("ping")
pongBytes = []byte("pong")
)
//go:generate callbackgen -type Stream
@ -25,6 +39,7 @@ func NewStream() *Stream {
stream.SetEndpointCreator(stream.createEndpoint)
stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.SetHeartBeat(stream.ping)
stream.OnConnect(stream.handlerConnect)
stream.OnBookEvent(stream.handleBookEvent)
@ -92,6 +107,12 @@ func (s *Stream) dispatchEvent(event interface{}) {
case *MarketTradeEvent:
s.EmitMarketTradeEvent(*e)
case []byte:
// We only handle the 'pong' case. Others are unexpected.
if !bytes.Equal(e, pongBytes) {
log.Errorf("invalid event: %q", e)
}
}
}
@ -116,6 +137,16 @@ func (s *Stream) handleBookEvent(o BookEvent) {
}
}
// ping implements the bitget text message of WebSocket PingPong.
func (s *Stream) ping(conn *websocket.Conn) error {
err := conn.WriteMessage(websocket.TextMessage, pingBytes)
if err != nil {
log.WithError(err).Error("ping error", err)
return nil
}
return nil
}
func convertSubscription(sub types.Subscription) (WsArg, error) {
arg := WsArg{
// support spot only
@ -146,6 +177,18 @@ func convertSubscription(sub types.Subscription) (WsArg, error) {
}
func parseWebSocketEvent(in []byte) (interface{}, error) {
switch {
case bytes.Equal(in, pongBytes):
// Return the original raw data may seem redundant because we can validate the string and return nil,
// but we cannot return nil to a lower level handler. This can cause confusion in the next handler, such as
// the dispatch handler. Therefore, I return the original raw data.
return in, nil
default:
return parseEvent(in)
}
}
func parseEvent(in []byte) (interface{}, error) {
var event WsEvent
err := json.Unmarshal(in, &event)