diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 05986208e..1baa3700a 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -41,6 +41,9 @@ type Parser func(message []byte) (interface{}, error) type Dispatcher func(e interface{}) +// HeartBeat keeps connection alive by sending the heartbeat packet. +type HeartBeat func(ctxConn context.Context, conn *websocket.Conn, cancelConn context.CancelFunc) + //go:generate callbackgen -type StandardStream -interface type StandardStream struct { parser Parser @@ -106,6 +109,8 @@ type StandardStream struct { FuturesPositionUpdateCallbacks []func(futuresPositions FuturesPositionMap) FuturesPositionSnapshotCallbacks []func(futuresPositions FuturesPositionMap) + + heartBeat HeartBeat } type StandardStreamEmitter interface { @@ -350,6 +355,9 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error { go s.Read(connCtx, conn, connCancel) go s.ping(connCtx, conn, connCancel, pingInterval) + if s.heartBeat != nil { + go s.heartBeat(connCtx, conn, connCancel) + } return nil } @@ -419,6 +427,11 @@ func (s *StandardStream) Close() error { return nil } +// SetHeartBeat sets the custom heart beat implementation if needed +func (s *StandardStream) SetHeartBeat(fn HeartBeat) { + s.heartBeat = fn +} + type Depth string const (