From 84fa19afee4e72190b9e0ca6b9cf87d4cb3fc3bc Mon Sep 17 00:00:00 2001 From: Edwin Date: Mon, 7 Aug 2023 14:55:09 +0800 Subject: [PATCH] pkg/exchange: add auth function for ws --- pkg/exchange/bybit/bybitapi/client.go | 4 ++-- pkg/exchange/bybit/bybitapi/v3/client.go | 4 ++++ pkg/exchange/bybit/exchange.go | 7 +++--- pkg/exchange/bybit/stream.go | 27 +++++++++++++++++++++-- pkg/exchange/bybit/stream_test.go | 28 ++++++++++++++++++++++++ pkg/exchange/bybit/types.go | 14 ++++++------ pkg/exchange/bybit/types_test.go | 8 +++---- 7 files changed, 74 insertions(+), 18 deletions(-) diff --git a/pkg/exchange/bybit/bybitapi/client.go b/pkg/exchange/bybit/bybitapi/client.go index fb3458d8d..034d29750 100644 --- a/pkg/exchange/bybit/bybitapi/client.go +++ b/pkg/exchange/bybit/bybitapi/client.go @@ -107,7 +107,7 @@ func (c *RestClient) NewAuthenticatedRequest(ctx context.Context, method, refURL // 2. Use the HMAC_SHA256 or RSA_SHA256 algorithm to sign the string in step 1, and convert it to a hex // string (HMAC_SHA256) / base64 (RSA_SHA256) to obtain the sign parameter. // 3. Append the sign parameter to request header, and send the HTTP request. - signature := sign(signKey, c.secret) + signature := Sign(signKey, c.secret) req, err := http.NewRequestWithContext(ctx, method, pathURL.String(), bytes.NewReader(body)) if err != nil { @@ -122,7 +122,7 @@ func (c *RestClient) NewAuthenticatedRequest(ctx context.Context, method, refURL return req, nil } -func sign(payload string, secret string) string { +func Sign(payload string, secret string) string { var sig = hmac.New(sha256.New, []byte(secret)) _, err := sig.Write([]byte(payload)) if err != nil { diff --git a/pkg/exchange/bybit/bybitapi/v3/client.go b/pkg/exchange/bybit/bybitapi/v3/client.go index 9fdb889fb..d9dd3c2e0 100644 --- a/pkg/exchange/bybit/bybitapi/v3/client.go +++ b/pkg/exchange/bybit/bybitapi/v3/client.go @@ -11,3 +11,7 @@ type APIResponse = bybitapi.APIResponse type Client struct { Client requestgen.AuthenticatedAPIClient } + +func NewClient(client *bybitapi.RestClient) *Client { + return &Client{Client: client} +} diff --git a/pkg/exchange/bybit/exchange.go b/pkg/exchange/bybit/exchange.go index 915d0d2ea..76c0c2edf 100644 --- a/pkg/exchange/bybit/exchange.go +++ b/pkg/exchange/bybit/exchange.go @@ -57,8 +57,9 @@ func New(key, secret string) (*Exchange, error) { return &Exchange{ key: key, // pragma: allowlist nextline secret - secret: secret, - client: client, + secret: secret, + client: client, + v3client: v3.NewClient(client), }, nil } @@ -389,5 +390,5 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type } func (e *Exchange) NewStream() types.Stream { - return NewStream() + return NewStream(e.key, e.secret) } diff --git a/pkg/exchange/bybit/stream.go b/pkg/exchange/bybit/stream.go index 2f07181b2..687d9452d 100644 --- a/pkg/exchange/bybit/stream.go +++ b/pkg/exchange/bybit/stream.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strconv" "time" "github.com/gorilla/websocket" @@ -21,16 +22,25 @@ const ( spotArgsLimit = 10 ) +var ( + // wsAuthRequest specifies the duration for which a websocket request's authentication is valid. + wsAuthRequest = 10 * time.Second +) + //go:generate callbackgen -type Stream type Stream struct { + key, secret string types.StandardStream bookEventCallbacks []func(e BookEvent) } -func NewStream() *Stream { +func NewStream(key, secret string) *Stream { stream := &Stream{ StandardStream: types.NewStandardStream(), + // pragma: allowlist nextline secret + key: key, + secret: secret, } stream.SetEndpointCreator(stream.createEndpoint) @@ -150,11 +160,24 @@ func (s *Stream) handlerConnect() { } log.Infof("subscribing channels: %+v", topics) if err := s.Conn.WriteJSON(WebsocketOp{ - Op: "subscribe", + Op: WsOpTypeSubscribe, Args: topics, }); err != nil { log.WithError(err).Error("failed to send subscription request") } + } else { + expires := strconv.FormatInt(time.Now().Add(wsAuthRequest).In(time.UTC).UnixMilli(), 10) + + if err := s.Conn.WriteJSON(WebsocketOp{ + Op: WsOpTypeAuth, + Args: []string{ + s.key, + expires, + bybitapi.Sign(fmt.Sprintf("GET/realtime%s", expires), s.secret), + }, + }); err != nil { + log.WithError(err).Error("failed to auth request") + } } } diff --git a/pkg/exchange/bybit/stream_test.go b/pkg/exchange/bybit/stream_test.go index ea4183e04..6ff3c3fcb 100644 --- a/pkg/exchange/bybit/stream_test.go +++ b/pkg/exchange/bybit/stream_test.go @@ -1,15 +1,43 @@ package bybit import ( + "context" "fmt" + "os" + "strconv" "testing" "github.com/stretchr/testify/assert" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/testutil" "github.com/c9s/bbgo/pkg/types" ) +func getTestClientOrSkip(t *testing.T) *Stream { + if b, _ := strconv.ParseBool(os.Getenv("CI")); b { + t.Skip("skip test for CI") + } + + key, secret, ok := testutil.IntegrationTestConfigured(t, "BYBIT") + if !ok { + t.Skip("BYBIT_* env vars are not configured") + return nil + } + + return NewStream(key, secret) +} + +func TestStream(t *testing.T) { + s := getTestClientOrSkip(t) + + t.Run("Auth test", func(t *testing.T) { + s.Connect(context.Background()) + c := make(chan struct{}) + <-c + }) +} + func TestStream_parseWebSocketEvent(t *testing.T) { s := Stream{} diff --git a/pkg/exchange/bybit/types.go b/pkg/exchange/bybit/types.go index 8986451a2..6adbafcd1 100644 --- a/pkg/exchange/bybit/types.go +++ b/pkg/exchange/bybit/types.go @@ -28,11 +28,12 @@ type WsOpType string const ( WsOpTypePing WsOpType = "ping" WsOpTypePong WsOpType = "pong" + WsOpTypeAuth WsOpType = "auth" WsOpTypeSubscribe WsOpType = "subscribe" ) type WebsocketOp struct { - Op string `json:"op"` + Op WsOpType `json:"op"` Args []string `json:"args"` } @@ -58,6 +59,11 @@ func (w *WebSocketOpEvent) IsValid() error { case WsOpTypePong: // private event return nil + case WsOpTypeAuth: + if w.Success != nil && !*w.Success { + return fmt.Errorf("unexpected response of auth: %#v", w) + } + return nil case WsOpTypeSubscribe: if w.Success != nil && !*w.Success { return fmt.Errorf("unexpected subscribe result: %+v", w) @@ -89,12 +95,6 @@ type WebSocketTopicEvent struct { Data json.RawMessage `json:"data"` } -// PriceVolumeSlice represents a slice of price and value. -// -// index 0 is Bid/Ask price. -// index 1 is Bid/Ask size. The *delta data* has size=0, which means that all quotations for this price have been filled or cancelled -type PriceVolumeSlice [2]fixedpoint.Value - type BookEvent struct { // Symbol name Symbol string `json:"s"` diff --git a/pkg/exchange/bybit/types_test.go b/pkg/exchange/bybit/types_test.go index 27dd31054..20f0527de 100644 --- a/pkg/exchange/bybit/types_test.go +++ b/pkg/exchange/bybit/types_test.go @@ -12,7 +12,7 @@ import ( func Test_parseWebSocketEvent(t *testing.T) { t.Run("[public] PingEvent without req id", func(t *testing.T) { - s := NewStream() + s := NewStream("", "") msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -34,7 +34,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[public] PingEvent with req id", func(t *testing.T) { - s := NewStream() + s := NewStream("", "") msg := `{"success":true,"ret_msg":"pong","conn_id":"a806f6c4-3608-4b6d-a225-9f5da975bc44","req_id":"b26704da-f5af-44c2-bdf7-935d6739e1a0","op":"ping"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -57,7 +57,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent without req id", func(t *testing.T) { - s := NewStream() + s := NewStream("", "") msg := `{"op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err) @@ -77,7 +77,7 @@ func Test_parseWebSocketEvent(t *testing.T) { }) t.Run("[private] PingEvent with req id", func(t *testing.T) { - s := NewStream() + s := NewStream("", "") msg := `{"req_id":"78d36b57-a142-47b7-9143-5843df77d44d","op":"pong","args":["1690884539181"],"conn_id":"civn4p1dcjmtvb69ome0-yrt1"}` raw, err := s.parseWebSocketEvent([]byte(msg)) assert.NoError(t, err)