From 1dedd32f4263284a9e9a292225d701fc6bd90791 Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 10 Jan 2024 13:56:17 +0800 Subject: [PATCH 1/2] pkg/exchange: support unsubscribe and resubscribe --- pkg/exchange/okex/parse.go | 9 ++++-- pkg/exchange/okex/stream.go | 41 +++++++++++++++++++++++++++- pkg/exchange/okex/stream_test.go | 47 ++++++++++++++++++++++++++++++++ 3 files changed, 94 insertions(+), 3 deletions(-) diff --git a/pkg/exchange/okex/parse.go b/pkg/exchange/okex/parse.go index 193518474..c09e21c15 100644 --- a/pkg/exchange/okex/parse.go +++ b/pkg/exchange/okex/parse.go @@ -101,8 +101,10 @@ func parseWebSocketEvent(in []byte) (interface{}, error) { type WsEventType string const ( - WsEventTypeLogin = "login" - WsEventTypeError = "error" + WsEventTypeLogin = "login" + WsEventTypeError = "error" + WsEventTypeSubscribe = "subscribe" + WsEventTypeUnsubscribe = "unsubscribe" ) type WebSocketEvent struct { @@ -122,6 +124,9 @@ func (w *WebSocketEvent) IsValid() error { case WsEventTypeError: return fmt.Errorf("websocket request error, code: %s, msg: %s", w.Code, w.Message) + case WsEventTypeSubscribe, WsEventTypeUnsubscribe: + return nil + case WsEventTypeLogin: // Actually, this code is unnecessary because the events are either `Subscribe` or `Unsubscribe`, But to avoid bugs // in the exchange, we still check. diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index c27fedbfa..96b955c73 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -2,6 +2,7 @@ package okex import ( "context" + "fmt" "golang.org/x/time/rate" "strconv" "time" @@ -15,7 +16,7 @@ var ( ) type WebsocketOp struct { - Op string `json:"op"` + Op WsEventType `json:"op"` Args interface{} `json:"args"` } @@ -60,6 +61,44 @@ func NewStream(client *okexapi.RestClient) *Stream { return stream } +func (s *Stream) syncSubscriptions(opType WsEventType) error { + if opType != WsEventTypeUnsubscribe && opType != WsEventTypeSubscribe { + return fmt.Errorf("unexpected subscription type: %v", opType) + } + + logger := log.WithField("opType", opType) + var topics []WebsocketSubscription + for _, subscription := range s.Subscriptions { + topic, err := convertSubscription(subscription) + if err != nil { + logger.WithError(err).Errorf("convert error, subscription: %+v", subscription) + return err + } + + topics = append(topics, topic) + } + + logger.Infof("%s channels: %+v", opType, topics) + if err := s.Conn.WriteJSON(WebsocketOp{ + Op: opType, + Args: topics, + }); err != nil { + logger.WithError(err).Error("failed to send request") + return err + } + + return nil +} + +func (s *Stream) Unsubscribe() { + // errors are handled in the syncSubscriptions, so they are skipped here. + _ = s.syncSubscriptions(WsEventTypeUnsubscribe) + s.Resubscribe(func(old []types.Subscription) (new []types.Subscription, err error) { + // clear the subscriptions + return []types.Subscription{}, nil + }) +} + func (s *Stream) handleConnect() { if s.PublicOnly { var subs []WebsocketSubscription diff --git a/pkg/exchange/okex/stream_test.go b/pkg/exchange/okex/stream_test.go index cf0125ded..a832767cf 100644 --- a/pkg/exchange/okex/stream_test.go +++ b/pkg/exchange/okex/stream_test.go @@ -5,6 +5,7 @@ import ( "os" "strconv" "testing" + "time" "github.com/stretchr/testify/assert" @@ -93,4 +94,50 @@ func TestStream(t *testing.T) { c := make(chan struct{}) <-c }) + + t.Run("Subscribe/Unsubscribe test", func(t *testing.T) { + s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{ + Depth: types.DepthLevel50, + }) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnBookSnapshot(func(book types.SliceOrderBook) { + t.Log("got snapshot", book) + }) + s.OnBookUpdate(func(book types.SliceOrderBook) { + t.Log("got update", book) + }) + + <-time.After(5 * time.Second) + + s.Unsubscribe() + c := make(chan struct{}) + <-c + }) + + t.Run("Resubscribe test", func(t *testing.T) { + s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{ + Depth: types.DepthLevel50, + }) + s.SetPublicOnly() + err := s.Connect(context.Background()) + assert.NoError(t, err) + + s.OnBookSnapshot(func(book types.SliceOrderBook) { + t.Log("got snapshot", book) + }) + s.OnBookUpdate(func(book types.SliceOrderBook) { + t.Log("got update", book) + }) + + <-time.After(5 * time.Second) + + s.Resubscribe(func(old []types.Subscription) (new []types.Subscription, err error) { + return old, nil + }) + c := make(chan struct{}) + <-c + }) } From a7aa34c396d15c5ff0d47f305cf66b0e5ffff79e Mon Sep 17 00:00:00 2001 From: Edwin Date: Wed, 10 Jan 2024 14:02:03 +0800 Subject: [PATCH 2/2] pkg/exchange: add comment --- pkg/exchange/okex/convert.go | 4 ++++ pkg/exchange/okex/stream_test.go | 6 +++--- pkg/types/stream.go | 1 + 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/exchange/okex/convert.go b/pkg/exchange/okex/convert.go index f1889bb0a..0551ef684 100644 --- a/pkg/exchange/okex/convert.go +++ b/pkg/exchange/okex/convert.go @@ -97,6 +97,10 @@ func convertSubscription(s types.Subscription) (WebsocketSubscription, error) { }, nil case types.BookChannel: + if s.Options.Depth != types.DepthLevel400 { + return WebsocketSubscription{}, fmt.Errorf("%s depth not supported", s.Options.Depth) + } + return WebsocketSubscription{ Channel: ChannelBooks, InstrumentID: toLocalSymbol(s.Symbol), diff --git a/pkg/exchange/okex/stream_test.go b/pkg/exchange/okex/stream_test.go index a832767cf..7f85973ad 100644 --- a/pkg/exchange/okex/stream_test.go +++ b/pkg/exchange/okex/stream_test.go @@ -48,7 +48,7 @@ func TestStream(t *testing.T) { t.Run("book test", func(t *testing.T) { s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{ - Depth: types.DepthLevel50, + Depth: types.DepthLevel400, }) s.SetPublicOnly() err := s.Connect(context.Background()) @@ -97,7 +97,7 @@ func TestStream(t *testing.T) { t.Run("Subscribe/Unsubscribe test", func(t *testing.T) { s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{ - Depth: types.DepthLevel50, + Depth: types.DepthLevel400, }) s.SetPublicOnly() err := s.Connect(context.Background()) @@ -119,7 +119,7 @@ func TestStream(t *testing.T) { t.Run("Resubscribe test", func(t *testing.T) { s.Subscribe(types.BookChannel, "BTCUSDT", types.SubscribeOptions{ - Depth: types.DepthLevel50, + Depth: types.DepthLevel400, }) s.SetPublicOnly() err := s.Connect(context.Background()) diff --git a/pkg/types/stream.go b/pkg/types/stream.go index f8aa209e1..aaa9efab5 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -547,6 +547,7 @@ const ( DepthLevel20 Depth = "20" DepthLevel50 Depth = "50" DepthLevel200 Depth = "200" + DepthLevel400 Depth = "400" ) type Speed string