mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-25 00:05:15 +00:00
Merge pull request #1370 from bailantaotao/edwin/update-low-level-ping-pong
REFACTOR: [stream] move ping into stream level
This commit is contained in:
commit
32c75ea3b8
|
@ -15,10 +15,6 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
|
|
||||||
// to maintain the WebSocket connection.
|
|
||||||
pingInterval = 20 * time.Second
|
|
||||||
|
|
||||||
// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
|
// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
|
||||||
spotArgsLimit = 10
|
spotArgsLimit = 10
|
||||||
)
|
)
|
||||||
|
@ -244,40 +240,18 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ping implements the Bybit text message of WebSocket PingPong.
|
// ping implements the Bybit text message of WebSocket PingPong.
|
||||||
func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc context.CancelFunc) {
|
func (s *Stream) ping(conn *websocket.Conn) error {
|
||||||
defer func() {
|
err := conn.WriteJSON(struct {
|
||||||
log.Debug("[bybit] ping worker stopped")
|
Op WsOpType `json:"op"`
|
||||||
cancelFunc()
|
}{
|
||||||
}()
|
Op: WsOpTypePing,
|
||||||
|
})
|
||||||
var pingTicker = time.NewTicker(pingInterval)
|
if err != nil {
|
||||||
defer pingTicker.Stop()
|
log.WithError(err).Error("ping error")
|
||||||
|
return err
|
||||||
for {
|
|
||||||
select {
|
|
||||||
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-s.CloseC:
|
|
||||||
return
|
|
||||||
|
|
||||||
case <-pingTicker.C:
|
|
||||||
// it's just for maintaining the liveliness of the connection, so comment out ReqId.
|
|
||||||
err := conn.WriteJSON(struct {
|
|
||||||
//ReqId string `json:"req_id"`
|
|
||||||
Op WsOpType `json:"op"`
|
|
||||||
}{
|
|
||||||
//ReqId: uuid.NewString(),
|
|
||||||
Op: WsOpTypePing,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.WithError(err).Error("ping error", err)
|
|
||||||
s.Reconnect()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Stream) handlerConnect() {
|
func (s *Stream) handlerConnect() {
|
||||||
|
|
|
@ -57,8 +57,8 @@ type Parser func(message []byte) (interface{}, error)
|
||||||
|
|
||||||
type Dispatcher func(e interface{})
|
type Dispatcher func(e interface{})
|
||||||
|
|
||||||
// HeartBeat keeps connection alive by sending the heartbeat packet.
|
// HeartBeat keeps connection alive by sending the ping packet.
|
||||||
type HeartBeat func(ctxConn context.Context, conn *websocket.Conn, cancelConn context.CancelFunc)
|
type HeartBeat func(conn *websocket.Conn) error
|
||||||
|
|
||||||
type BeforeConnect func(ctx context.Context) error
|
type BeforeConnect func(ctx context.Context) error
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ type StandardStream struct {
|
||||||
|
|
||||||
// sg is used to wait until the previous routines are closed.
|
// sg is used to wait until the previous routines are closed.
|
||||||
// only handle routines used internally, avoid including external callback func to prevent issues if they have
|
// only handle routines used internally, avoid including external callback func to prevent issues if they have
|
||||||
// bugs and cannot terminate. e.q. heartBeat
|
// bugs and cannot terminate.
|
||||||
sg SyncGroup
|
sg SyncGroup
|
||||||
|
|
||||||
// ReconnectC is a signal channel for reconnecting
|
// ReconnectC is a signal channel for reconnecting
|
||||||
|
@ -319,6 +319,14 @@ func (s *StandardStream) ping(
|
||||||
return
|
return
|
||||||
|
|
||||||
case <-pingTicker.C:
|
case <-pingTicker.C:
|
||||||
|
if s.heartBeat != nil {
|
||||||
|
if err := s.heartBeat(conn); err != nil {
|
||||||
|
// log errors at the concrete class so that we can identify which exchange encountered an error
|
||||||
|
s.Reconnect()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil {
|
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil {
|
||||||
log.WithError(err).Error("ping error", err)
|
log.WithError(err).Error("ping error", err)
|
||||||
s.Reconnect()
|
s.Reconnect()
|
||||||
|
@ -432,11 +440,6 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error {
|
||||||
s.ping(connCtx, conn, connCancel, pingInterval)
|
s.ping(connCtx, conn, connCancel, pingInterval)
|
||||||
})
|
})
|
||||||
s.sg.Run()
|
s.sg.Run()
|
||||||
|
|
||||||
if s.heartBeat != nil {
|
|
||||||
// not included in wg, as it is an external callback func.
|
|
||||||
go s.heartBeat(connCtx, conn, connCancel)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user