pkg/exchange: set ping interval

This commit is contained in:
Edwin 2024-01-17 15:58:54 +08:00
parent 91913f021c
commit 80d8c000bc
2 changed files with 18 additions and 8 deletions

View File

@ -14,6 +14,9 @@ import (
var ( var (
marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) marketTradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1) tradeLogLimiter = rate.NewLimiter(rate.Every(time.Minute), 1)
// pingInterval the connection will break automatically if the subscription is not established or data has not been
// pushed for more than 30 seconds. Therefore, we set it to 20 seconds.
pingInterval = 20 * time.Second
) )
type WebsocketOp struct { type WebsocketOp struct {
@ -51,6 +54,7 @@ func NewStream(client *okexapi.RestClient) *Stream {
stream.SetParser(parseWebSocketEvent) stream.SetParser(parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent) stream.SetDispatcher(stream.dispatchEvent)
stream.SetEndpointCreator(stream.createEndpoint) stream.SetEndpointCreator(stream.createEndpoint)
stream.SetPingInterval(pingInterval)
stream.OnKLineEvent(stream.handleKLineEvent) stream.OnKLineEvent(stream.handleKLineEvent)
stream.OnBookEvent(stream.handleBookEvent) stream.OnBookEvent(stream.handleBookEvent)

View File

@ -70,8 +70,9 @@ type WebsocketPongEvent struct{}
//go:generate callbackgen -type StandardStream -interface //go:generate callbackgen -type StandardStream -interface
type StandardStream struct { type StandardStream struct {
parser Parser parser Parser
dispatcher Dispatcher dispatcher Dispatcher
pingInterval time.Duration
endpointCreator EndpointCreator endpointCreator EndpointCreator
@ -178,9 +179,10 @@ type StandardStreamEmitter interface {
func NewStandardStream() StandardStream { func NewStandardStream() StandardStream {
return StandardStream{ return StandardStream{
ReconnectC: make(chan struct{}, 1), ReconnectC: make(chan struct{}, 1),
CloseC: make(chan struct{}), CloseC: make(chan struct{}),
sg: NewSyncGroup(), sg: NewSyncGroup(),
pingInterval: pingInterval,
} }
} }
@ -315,15 +317,19 @@ func (s *StandardStream) Read(ctx context.Context, conn *websocket.Conn, cancel
} }
} }
func (s *StandardStream) SetPingInterval(interval time.Duration) {
s.pingInterval = interval
}
func (s *StandardStream) ping( func (s *StandardStream) ping(
ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc, interval time.Duration, ctx context.Context, conn *websocket.Conn, cancel context.CancelFunc,
) { ) {
defer func() { defer func() {
cancel() cancel()
log.Debug("[websocket] ping worker stopped") log.Debug("[websocket] ping worker stopped")
}() }()
var pingTicker = time.NewTicker(interval) var pingTicker = time.NewTicker(s.pingInterval)
defer pingTicker.Stop() defer pingTicker.Stop()
for { for {
@ -454,7 +460,7 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error {
s.Read(connCtx, conn, connCancel) s.Read(connCtx, conn, connCancel)
}) })
s.sg.Add(func() { s.sg.Add(func() {
s.ping(connCtx, conn, connCancel, pingInterval) s.ping(connCtx, conn, connCancel)
}) })
s.sg.Run() s.sg.Run()
return nil return nil