From 6330a1845dbe6d18c619e461439dacdddf8d0329 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 23 Dec 2021 11:28:59 +0800 Subject: [PATCH] kucoin: connecting stream callbacks --- pkg/exchange/kucoin/stream.go | 48 ++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/pkg/exchange/kucoin/stream.go b/pkg/exchange/kucoin/stream.go index 8384ba894..dd3c0f603 100644 --- a/pkg/exchange/kucoin/stream.go +++ b/pkg/exchange/kucoin/stream.go @@ -39,11 +39,11 @@ type Stream struct { bullet *kucoinapi.Bullet publicOnly bool - candleEventCallbacks []func(c *kucoinapi.WebSocketCandle) - orderBookL2EventCallbacks []func(c *kucoinapi.WebSocketOrderBookL2) - tickerEventCallbacks []func(c *kucoinapi.WebSocketTicker) - accountBalanceEventCallbacks []func(c *kucoinapi.WebSocketAccountBalance) - privateOrderEventCallbacks []func(c *kucoinapi.WebSocketPrivateOrder) + candleEventCallbacks []func(e *kucoinapi.WebSocketCandle) + orderBookL2EventCallbacks []func(e *kucoinapi.WebSocketOrderBookL2) + tickerEventCallbacks []func(e *kucoinapi.WebSocketTicker) + accountBalanceEventCallbacks []func(e *kucoinapi.WebSocketAccountBalance) + privateOrderEventCallbacks []func(e *kucoinapi.WebSocketPrivateOrder) } func NewStream(client *kucoinapi.RestClient) *Stream { @@ -55,23 +55,21 @@ func NewStream(client *kucoinapi.RestClient) *Stream { } stream.OnConnect(stream.handleConnect) + stream.OnCandleEvent(stream.handleCandleEvent) + stream.OnOrderBookL2Event(stream.handleOrderBookL2Event) + stream.OnTickerEvent(stream.handleTickerEvent) return stream } -func (s *Stream) sendSubscriptions() error { - cmds, err := convertSubscriptions(s.Subscriptions) - if err != nil { - return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions) - } +func (s *Stream) handleCandleEvent(e *kucoinapi.WebSocketCandle) { } - for _, cmd := range cmds { - if err := s.conn.WriteJSON(cmd); err != nil { - return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd) - } - } +func (s *Stream) handleOrderBookL2Event(e *kucoinapi.WebSocketOrderBookL2) { } - return nil -} +func (s *Stream) handleTickerEvent(e *kucoinapi.WebSocketTicker) { } + +func (s *Stream) handleAccountBalanceEvent(e *kucoinapi.WebSocketAccountBalance) { } + +func (s *Stream) handlePrivateOrderEvent(e *kucoinapi.WebSocketPrivateOrder) { } func (s *Stream) handleConnect() { if s.publicOnly { @@ -144,6 +142,22 @@ func (s *Stream) Reconnector(ctx context.Context) { } } +func (s *Stream) sendSubscriptions() error { + cmds, err := convertSubscriptions(s.Subscriptions) + if err != nil { + return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions) + } + + for _, cmd := range cmds { + if err := s.conn.WriteJSON(cmd); err != nil { + return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd) + } + } + + return nil +} + + // getEndpoint use the publicOnly flag to check whether we should allocate a public bullet or private bullet func (s *Stream) getEndpoint() (string, error) { var bullet *kucoinapi.Bullet