From 429036985c56423e00cafc63a341afb83e5fe7fb Mon Sep 17 00:00:00 2001 From: Edwin Date: Fri, 26 Jan 2024 15:21:31 +0800 Subject: [PATCH] pkg/exchange: add new kline stream --- pkg/exchange/okex/kline_stream.go | 87 ++++++++++++++++++++++ pkg/exchange/okex/klinestream_callbacks.go | 19 +++++ pkg/exchange/okex/okexapi/client.go | 1 + pkg/exchange/okex/stream.go | 17 +---- 4 files changed, 110 insertions(+), 14 deletions(-) create mode 100644 pkg/exchange/okex/kline_stream.go create mode 100644 pkg/exchange/okex/klinestream_callbacks.go diff --git a/pkg/exchange/okex/kline_stream.go b/pkg/exchange/okex/kline_stream.go new file mode 100644 index 000000000..e9b2ef356 --- /dev/null +++ b/pkg/exchange/okex/kline_stream.go @@ -0,0 +1,87 @@ +package okex + +import ( + "context" + + "github.com/c9s/bbgo/pkg/exchange/okex/okexapi" + "github.com/c9s/bbgo/pkg/types" +) + +//go:generate callbackgen -type KLineStream -interface +type KLineStream struct { + types.StandardStream + + kLineEventCallbacks []func(candle KLineEvent) +} + +func NewKLineStream() *KLineStream { + k := &KLineStream{ + StandardStream: types.NewStandardStream(), + } + + k.SetParser(parseWebSocketEvent) + k.SetDispatcher(k.dispatchEvent) + k.SetEndpointCreator(func(_ context.Context) (string, error) { return okexapi.PublicBusinessWebSocketURL, nil }) + k.SetPingInterval(pingInterval) + + // K line channel is public only + k.SetPublicOnly() + k.OnConnect(k.handleConnect) + k.OnKLineEvent(k.handleKLineEvent) + + return k +} + +func (s *KLineStream) handleConnect() { + var subs []WebsocketSubscription + for _, subscription := range s.Subscriptions { + if subscription.Channel != types.KLineChannel { + continue + } + + sub, err := convertSubscription(subscription) + if err != nil { + log.WithError(err).Errorf("subscription convert error") + continue + } + + subs = append(subs, sub) + } + if len(subs) == 0 { + return + } + + log.Infof("subscribing channels: %+v", subs) + err := s.Conn.WriteJSON(WebsocketOp{ + Op: "subscribe", + Args: subs, + }) + + if err != nil { + log.WithError(err).Error("subscribe error") + } +} + +func (s *KLineStream) handleKLineEvent(k KLineEvent) { + for _, event := range k.Events { + kline := kLineToGlobal(event, types.Interval(k.Interval), k.Symbol) + if kline.Closed { + s.EmitKLineClosed(kline) + } else { + s.EmitKLine(kline) + } + } +} + +func (s *KLineStream) dispatchEvent(e interface{}) { + switch et := e.(type) { + case *WebSocketEvent: + if err := et.IsValid(); err != nil { + log.Errorf("invalid event: %v", err) + return + } + + case *KLineEvent: + s.EmitKLineEvent(*et) + } +} diff --git a/pkg/exchange/okex/klinestream_callbacks.go b/pkg/exchange/okex/klinestream_callbacks.go new file mode 100644 index 000000000..fc7e91ef0 --- /dev/null +++ b/pkg/exchange/okex/klinestream_callbacks.go @@ -0,0 +1,19 @@ +// Code generated by "callbackgen -type KLineStream -interface"; DO NOT EDIT. + +package okex + +import () + +func (K *KLineStream) OnKLineEvent(cb func(candle KLineEvent)) { + K.kLineEventCallbacks = append(K.kLineEventCallbacks, cb) +} + +func (K *KLineStream) EmitKLineEvent(candle KLineEvent) { + for _, cb := range K.kLineEventCallbacks { + cb(candle) + } +} + +type KLineStreamEventHub interface { + OnKLineEvent(cb func(candle KLineEvent)) +} diff --git a/pkg/exchange/okex/okexapi/client.go b/pkg/exchange/okex/okexapi/client.go index 425339b0f..9fe036471 100644 --- a/pkg/exchange/okex/okexapi/client.go +++ b/pkg/exchange/okex/okexapi/client.go @@ -21,6 +21,7 @@ const defaultHTTPTimeout = time.Second * 15 const RestBaseURL = "https://www.okex.com/" const PublicWebSocketURL = "wss://ws.okex.com:8443/ws/v5/public" const PrivateWebSocketURL = "wss://ws.okex.com:8443/ws/v5/private" +const PublicBusinessWebSocketURL = "wss://wsaws.okx.com:8443/ws/v5/business" type SideType string diff --git a/pkg/exchange/okex/stream.go b/pkg/exchange/okex/stream.go index 6c64ee188..7d602c668 100644 --- a/pkg/exchange/okex/stream.go +++ b/pkg/exchange/okex/stream.go @@ -35,6 +35,7 @@ type WebsocketLogin struct { //go:generate callbackgen -type Stream -interface type Stream struct { types.StandardStream + kLineStream *KLineStream client *okexapi.RestClient balanceProvider types.ExchangeAccountService @@ -52,6 +53,7 @@ func NewStream(client *okexapi.RestClient, balanceProvider types.ExchangeAccount client: client, balanceProvider: balanceProvider, StandardStream: types.NewStandardStream(), + kLineStream: NewKLineStream(), } stream.SetParser(parseWebSocketEvent) @@ -59,13 +61,13 @@ func NewStream(client *okexapi.RestClient, balanceProvider types.ExchangeAccount stream.SetEndpointCreator(stream.createEndpoint) stream.SetPingInterval(pingInterval) - stream.OnKLineEvent(stream.handleKLineEvent) stream.OnBookEvent(stream.handleBookEvent) stream.OnAccountEvent(stream.handleAccountEvent) stream.OnMarketTradeEvent(stream.handleMarketTradeEvent) stream.OnOrderTradesEvent(stream.handleOrderDetailsEvent) stream.OnConnect(stream.handleConnect) stream.OnAuth(stream.subscribePrivateChannels(stream.emitBalanceSnapshot)) + return stream } @@ -250,17 +252,6 @@ func (s *Stream) handleMarketTradeEvent(data []MarketTradeEvent) { } } -func (s *Stream) handleKLineEvent(k KLineEvent) { - for _, event := range k.Events { - kline := kLineToGlobal(event, types.Interval(k.Interval), k.Symbol) - if kline.Closed { - s.EmitKLineClosed(kline) - } else { - s.EmitKLine(kline) - } - } -} - func (s *Stream) createEndpoint(ctx context.Context) (string, error) { var url string if s.PublicOnly { @@ -288,8 +279,6 @@ func (s *Stream) dispatchEvent(e interface{}) { s.EmitBookEvent(*et) } s.EmitBookTickerUpdate(et.BookTicker()) - case *KLineEvent: - s.EmitKLineEvent(*et) case *okexapi.Account: s.EmitAccountEvent(*et)