Merge pull request #1504 from c9s/edwin/okx/implement-ping-interval

FEATURE: [okx] set ping interval
This commit is contained in:
bailantaotao 2024-01-18 09:17:08 +08:00 committed by GitHub
commit 8ceadd80f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 18 additions and 8 deletions

View File

@ -15,6 +15,9 @@ import (
var (
marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
// pingInterval the connection will break automatically if the subscription is not established or data has not been
// pushed for more than 30 seconds. Therefore, we set it to 20 seconds.
pingInterval = 20 * time.Second
)
type WebsocketOp struct {
@ -54,6 +57,7 @@ func NewStream(client *okexapi.RestClient, balanceProvider types.ExchangeAccount
stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.SetEndpointCreator(stream.createEndpoint)
stream.SetPingInterval(pingInterval)
stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnBookEvent(stream.handleBookEvent)

View File

@ -72,6 +72,7 @@ type WebsocketPongEvent struct{}
type StandardStream struct {
parser Parser
dispatcher Dispatcher
pingInterval time.Duration
endpointCreator EndpointCreator
@ -181,6 +182,7 @@ func NewStandardStream() StandardStream {
ReconnectC: make(chan struct{}, 1),
CloseC: make(chan struct{}),
sg: NewSyncGroup(),
pingInterval: pingInterval,
}
}
@ -315,15 +317,19 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel
}
}
func (s *StandardStream) SetPingInterval(interval time.Duration) {
s.pingInterval = interval
}
func (s *StandardStream) ping(
ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc, interval time.Duration,
ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc,
) {
defer func() {
cancel()
log.Debug("[websocket] ping worker stopped")
}()
var pingTicker = time.NewTicker(interval)
var pingTicker = time.NewTicker(s.pingInterval)
defer pingTicker.Stop()
for {
@ -454,7 +460,7 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error {
s.Read(connCtx, conn, connCancel)
})
s.sg.Add(func() {
s.ping(connCtx, conn, connCancel, pingInterval)
s.ping(connCtx, conn, connCancel)
})
s.sg.Run()
return nil