From 80d8c000bc2bfdc1e685c29db8c799d6734fadf2 Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 17 Jan 2024 15:58:54 +0800 Subject: [PATCH] pkg/exchange: set ping interval --- pkg/exchange/okex/stream.go | 4 ++++ pkg/types/stream.go | 22 ++++++++++++++-------- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index e7dcc7f8e..5e9707a2c 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -14,6 +14,9 @@ import ( var ( marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) + // pingInterval the connection will break automatically if the subscription is not established or data has not been + // pushed for more than 30 seconds. Therefore, we set it to 20 seconds. + pingInterval = 20 * time.Second ) type WebsocketOp struct { @@ -51,6 +54,7 @@ func NewStream(client *okexapi.RestClient) *Stream { stream.SetParser(parseWebSocketEvent) stream.SetDispatcher(stream.dispatchEvent) stream.SetEndpointCreator(stream.createEndpoint) + stream.SetPingInterval(pingInterval) stream.OnKLineEvent(stream.handleKLineEvent) stream.OnBookEvent(stream.handleBookEvent) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index aaa9efab5..5a937e5fb 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -70,8 +70,9 @@ type WebsocketPongEvent struct{} //go:generate callbackgen -type StandardStream -interface type StandardStream struct { - parser Parser - dispatcher Dispatcher + parser Parser + dispatcher Dispatcher + pingInterval time.Duration endpointCreator EndpointCreator @@ -178,9 +179,10 @@ type StandardStreamEmitter interface { func NewStandardStream() StandardStream { return StandardStream{ - ReconnectC: make(chan struct{}, 1), - CloseC: make(chan struct{}), - sg: NewSyncGroup(), + ReconnectC: make(chan struct{}, 1), + CloseC: make(chan struct{}), + sg: NewSyncGroup(), + pingInterval: pingInterval, } } @@ -315,15 +317,19 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel } } +func (s *StandardStream) SetPingInterval(interval time.Duration) { + s.pingInterval = interval +} + func (s *StandardStream) ping( - ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc, interval time.Duration, + ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc, ) { defer func() { cancel() log.Debug("[websocket] ping worker stopped") }() - var pingTicker = time.NewTicker(interval) + var pingTicker = time.NewTicker(s.pingInterval) defer pingTicker.Stop() for { @@ -454,7 +460,7 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error { s.Read(connCtx, conn, connCancel) }) s.sg.Add(func() { - s.ping(connCtx, conn, connCancel, pingInterval) + s.ping(connCtx, conn, connCancel) }) s.sg.Run() return nil