Merge pull request #1265 from bailantaotao/edwin/add-stream

FEATURE: [bybit] implement stream ping
This commit is contained in:
bailantaotao 2023-08-03 12:33:20 +08:00 committed by GitHub
commit 8118e55b72
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 337 additions and 2 deletions

View File

@ -19,8 +19,13 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
const defaultHTTPTimeout = time.Second * 15 const (
const RestBaseURL = "https://api.bybit.com" defaultHTTPTimeout = time.Second * 15
RestBaseURL = "https://api.bybit.com"
WsSpotPublicSpotUrl = "wss://stream.bybit.com/v5/public/spot"
WsSpotPrivateUrl = "wss://stream.bybit.com/v5/private"
)
// defaultRequestWindowMilliseconds specify how long an HTTP request is valid. It is also used to prevent replay attacks. // defaultRequestWindowMilliseconds specify how long an HTTP request is valid. It is also used to prevent replay attacks.
var defaultRequestWindowMilliseconds = fmt.Sprintf("%d", 5*time.Second.Milliseconds()) var defaultRequestWindowMilliseconds = fmt.Sprintf("%d", 5*time.Second.Milliseconds())

View File

@ -387,3 +387,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
return trades, nil return trades, nil
} }
func (e *Exchange) NewStream() types.Stream {
return NewStream()
}

View File

@ -0,0 +1,96 @@
package bybit
import (
"context"
"encoding/json"
"time"
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/exchange/bybit/bybitapi"
"github.com/c9s/bbgo/pkg/types"
)
const (
// Bybit: To avoid network or program issues, we recommend that you send the ping heartbeat packet every 20 seconds
// to maintain the WebSocket connection.
pingInterval = 20 * time.Second
)
type Stream struct {
types.StandardStream
}
func NewStream() *Stream {
stream := &Stream{
StandardStream: types.NewStandardStream(),
}
stream.SetEndpointCreator(stream.createEndpoint)
stream.SetParser(stream.parseWebSocketEvent)
stream.SetDispatcher(stream.dispatchEvent)
stream.SetHeartBeat(stream.ping)
return stream
}
func (s *Stream) createEndpoint(_ context.Context) (string, error) {
var url string
if s.PublicOnly {
url = bybitapi.WsSpotPublicSpotUrl
} else {
url = bybitapi.WsSpotPrivateUrl
}
return url, nil
}
func (s *Stream) dispatchEvent(event interface{}) {
switch e := event.(type) {
case *WebSocketEvent:
if err := e.IsValid(); err != nil {
log.Errorf("invalid event: %v", err)
}
}
}
func (s *Stream) parseWebSocketEvent(in []byte) (interface{}, error) {
var resp WebSocketEvent
return &resp, json.Unmarshal(in, &resp)
}
// ping implements the Bybit text message of WebSocket PingPong.
func (s *Stream) ping(ctx context.Context, conn *websocket.Conn, cancelFunc context.CancelFunc) {
defer func() {
log.Debug("[bybit] ping worker stopped")
cancelFunc()
}()
var pingTicker = time.NewTicker(pingInterval)
defer pingTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-s.CloseC:
return
case <-pingTicker.C:
// it's just for maintaining the liveliness of the connection, so comment out ReqId.
err := conn.WriteJSON(struct {
//ReqId string `json:"req_id"`
Op WsOpType `json:"op"`
}{
//ReqId: uuid.NewString(),
Op: WsOpTypePing,
})
if err != nil {
log.WithError(err).Error("ping error", err)
s.Reconnect()
return
}
}
}
}

View File

@ -0,0 +1,39 @@
package bybit
import (
"fmt"
)
type WsOpType string
const (
WsOpTypePing WsOpType = "ping"
WsOpTypePong WsOpType = "pong"
)
type WebSocketEvent struct {
Success *bool `json:"success,omitempty"`
RetMsg *string `json:"ret_msg,omitempty"`
ReqId *string `json:"req_id,omitempty"`
ConnId string `json:"conn_id"`
Op WsOpType `json:"op"`
Args []string `json:"args"`
}
func (w *WebSocketEvent) IsValid() error {
switch w.Op {
case WsOpTypePing:
// public event
if (w.Success != nil && !*w.Success) ||
(w.RetMsg != nil && WsOpType(*w.RetMsg) != WsOpTypePong) {
return fmt.Errorf("unexpeted response of pong: %#v", w)
}
return nil
case WsOpTypePong:
// private event
return nil
default:
return fmt.Errorf("unexpected op type: %#v", w)
}
}

View File

@ -0,0 +1,191 @@
package bybit
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_parseWebSocketEvent(t *testing.T) {
t.Run("[public] PingEvent without req id", func(t *testing.T) {
s := NewStream()
msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","op":"ping"}`
raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err)
expSucceeds := true
expRetMsg := string(WsOpTypePong)
e, ok := raw.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
ReqId: nil,
Op: WsOpTypePing,
Args: nil,
}, e)
assert.NoError(t, e.IsValid())
})
t.Run("[public] PingEvent with req id", func(t *testing.T) {
s := NewStream()
msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","req_id":"b26704da-f5af-44c2-bdf7-935d6739e1a0","op":"ping"}`
raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err)
expSucceeds := true
expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
e, ok := raw.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ConnId: "a806f6c4-3608-4b6d-a225-9f5da975bc44",
ReqId: &expReqId,
Op: WsOpTypePing,
Args: nil,
}, e)
assert.NoError(t, e.IsValid())
})
t.Run("[private] PingEvent without req id", func(t *testing.T) {
s := NewStream()
msg := `{"op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}`
raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err)
e, ok := raw.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{
Success: nil,
RetMsg: nil,
ConnId: "civn4p1dcjmtvb69ome0-yrt1",
ReqId: nil,
Op: WsOpTypePong,
Args: []string{"1690884539181"},
}, e)
assert.NoError(t, e.IsValid())
})
t.Run("[private] PingEvent with req id", func(t *testing.T) {
s := NewStream()
msg := `{"req_id":"78d36b57-a142-47b7-9143-5843df77d44d","op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}`
raw, err := s.parseWebSocketEvent([]byte(msg))
assert.NoError(t, err)
expReqId := "78d36b57-a142-47b7-9143-5843df77d44d"
e, ok := raw.(*WebSocketEvent)
assert.True(t, ok)
assert.Equal(t, &WebSocketEvent{
Success: nil,
RetMsg: nil,
ConnId: "civn4p1dcjmtvb69ome0-yrt1",
ReqId: &expReqId,
Op: WsOpTypePong,
Args: []string{"1690884539181"},
}, e)
assert.NoError(t, e.IsValid())
})
}
func Test_WebSocketEventIsValid(t *testing.T) {
t.Run("[public] valid op ping", func(t *testing.T) {
expSucceeds := true
expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: &expReqId,
ConnId: "test-conndid",
Op: WsOpTypePing,
Args: nil,
}
assert.NoError(t, w.IsValid())
})
t.Run("[private] valid op ping", func(t *testing.T) {
w := &WebSocketEvent{
Success: nil,
RetMsg: nil,
ReqId: nil,
ConnId: "test-conndid",
Op: WsOpTypePong,
Args: nil,
}
assert.NoError(t, w.IsValid())
})
t.Run("[public] un-Success", func(t *testing.T) {
expSucceeds := false
expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: &expReqId,
ConnId: "test-conndid",
Op: WsOpTypePing,
Args: nil,
}
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid())
})
t.Run("[public] missing Success field", func(t *testing.T) {
expRetMsg := string(WsOpTypePong)
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{
RetMsg: &expRetMsg,
ReqId: &expReqId,
ConnId: "test-conndid",
Op: WsOpTypePing,
Args: nil,
}
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid())
})
t.Run("[public] invalid ret msg", func(t *testing.T) {
expSucceeds := false
expRetMsg := "PINGPONGPINGPONG"
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{
Success: &expSucceeds,
RetMsg: &expRetMsg,
ReqId: &expReqId,
ConnId: "test-conndid",
Op: WsOpTypePing,
Args: nil,
}
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid())
})
t.Run("[public] missing RetMsg field", func(t *testing.T) {
expReqId := "b26704da-f5af-44c2-bdf7-935d6739e1a0"
w := &WebSocketEvent{
ReqId: &expReqId,
ConnId: "test-conndid",
Op: WsOpTypePing,
Args: nil,
}
assert.Error(t, fmt.Errorf("unexpeted response of pong: %#v", w), w.IsValid())
})
t.Run("unexpected op type", func(t *testing.T) {
w := &WebSocketEvent{
Op: WsOpType("unexpected"),
}
assert.Error(t, fmt.Errorf("unexpected op type: %#v", w), w.IsValid())
})
}