pkg/exchange: Use the same conn to avoid concurrent write issues.

This commit is contained in:
Edwin 2023-10-27 16:03:03 +08:00
parent c4f1af00d7
commit d07b766939
2 changed files with 22 additions and 45 deletions

View File

@ -15,10 +15,6 @@ import (
)
const (
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
// to maintain the WebSocket connection.
pingInterval = 20 * time.Second
// spotArgsLimit can input up to 10 args for each subscription request sent to one connection.
spotArgsLimit = 10
)
@ -244,40 +240,18 @@ func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
}
// ping implements the Bybit text message of WebSocket PingPong.
func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc context.CancelFunc) {
defer func() {
log.Debug("[bybit] ping worker stopped")
cancelFunc()
}()
var pingTicker = time.NewTicker(pingInterval)
defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-s.CloseC:
return
case <-pingTicker.C:
// it's just for maintaining the liveliness of the connection, so comment out ReqId.
err := conn.WriteJSON(struct {
//ReqId string `json:"req_id"`
Op WsOpType `json:"op"`
}{
//ReqId: uuid.NewString(),
Op: WsOpTypePing,
})
if err != nil {
log.WithError(err).Error("ping error", err)
s.Reconnect()
return
}
}
func (s *Stream) ping(conn *websocket.Conn) error {
err := conn.WriteJSON(struct {
Op WsOpType `json:"op"`
}{
Op: WsOpTypePing,
})
if err != nil {
log.WithError(err).Error("ping error")
return err
}
return nil
}
func (s *Stream) handlerConnect() {

View File

@ -57,8 +57,8 @@ type Parser func(message []byte) (interface{}, error)
type Dispatcher func(e interface{})
// HeartBeat keeps connection alive by sending the heartbeat packet.
type HeartBeat func(ctxConn context.Context, conn *websocket.Conn, cancelConn context.CancelFunc)
// HeartBeat keeps connection alive by sending the ping packet.
type HeartBeat func(conn *websocket.Conn) error
type BeforeConnect func(ctx context.Context) error
@ -86,7 +86,7 @@ type StandardStream struct {
// sg is used to wait until the previous routines are closed.
// only handle routines used internally, avoid including external callback func to prevent issues if they have
// bugs and cannot terminate. e.q. heartBeat
// bugs and cannot terminate.
sg SyncGroup
// ReconnectC is a signal channel for reconnecting
@ -319,6 +319,14 @@ func (s *StandardStream) ping(
return
case <-pingTicker.C:
if s.heartBeat != nil {
if err := s.heartBeat(conn); err != nil {
// log errors at the concrete class so that we can identify which exchange encountered an error
s.Reconnect()
return
}
}
if err := conn.WriteControl(websocket.PingMessage, nil, time.Now().Add(writeTimeout)); err != nil {
log.WithError(err).Error("ping error", err)
s.Reconnect()
@ -432,11 +440,6 @@ func (s *StandardStream) DialAndConnect(ctx context.Context) error {
s.ping(connCtx, conn, connCancel, pingInterval)
})
s.sg.Run()
if s.heartBeat != nil {
// not included in wg, as it is an external callback func.
go s.heartBeat(connCtx, conn, connCancel)
}
return nil
}