kucoin: connecting stream callbacks

This commit is contained in:
c9s 2021-12-23 11:28:59 +08:00
parent 449434da4c
commit 6330a1845d

View File

@ -39,11 +39,11 @@ type Stream struct {
bullet *kucoinapi.Bullet bullet *kucoinapi.Bullet
publicOnly bool publicOnly bool
candleEventCallbacks []func(c *kucoinapi.WebSocketCandle) candleEventCallbacks []func(e *kucoinapi.WebSocketCandle)
orderBookL2EventCallbacks []func(c *kucoinapi.WebSocketOrderBookL2) orderBookL2EventCallbacks []func(e *kucoinapi.WebSocketOrderBookL2)
tickerEventCallbacks []func(c *kucoinapi.WebSocketTicker) tickerEventCallbacks []func(e *kucoinapi.WebSocketTicker)
accountBalanceEventCallbacks []func(c *kucoinapi.WebSocketAccountBalance) accountBalanceEventCallbacks []func(e *kucoinapi.WebSocketAccountBalance)
privateOrderEventCallbacks []func(c *kucoinapi.WebSocketPrivateOrder) privateOrderEventCallbacks []func(e *kucoinapi.WebSocketPrivateOrder)
} }
func NewStream(client *kucoinapi.RestClient) *Stream { func NewStream(client *kucoinapi.RestClient) *Stream {
@ -55,23 +55,21 @@ func NewStream(client *kucoinapi.RestClient) *Stream {
} }
stream.OnConnect(stream.handleConnect) stream.OnConnect(stream.handleConnect)
stream.OnCandleEvent(stream.handleCandleEvent)
stream.OnOrderBookL2Event(stream.handleOrderBookL2Event)
stream.OnTickerEvent(stream.handleTickerEvent)
return stream return stream
} }
func (s *Stream) sendSubscriptions() error { func (s *Stream) handleCandleEvent(e *kucoinapi.WebSocketCandle) { }
cmds, err := convertSubscriptions(s.Subscriptions)
if err != nil {
return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions)
}
for _, cmd := range cmds { func (s *Stream) handleOrderBookL2Event(e *kucoinapi.WebSocketOrderBookL2) { }
if err := s.conn.WriteJSON(cmd); err != nil {
return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd)
}
}
return nil func (s *Stream) handleTickerEvent(e *kucoinapi.WebSocketTicker) { }
}
func (s *Stream) handleAccountBalanceEvent(e *kucoinapi.WebSocketAccountBalance) { }
func (s *Stream) handlePrivateOrderEvent(e *kucoinapi.WebSocketPrivateOrder) { }
func (s *Stream) handleConnect() { func (s *Stream) handleConnect() {
if s.publicOnly { if s.publicOnly {
@ -144,6 +142,22 @@ func (s *Stream) Reconnector(ctx context.Context) {
} }
} }
func (s *Stream) sendSubscriptions() error {
cmds, err := convertSubscriptions(s.Subscriptions)
if err != nil {
return errors.Wrapf(err, "subscription convert error, subscriptions: %+v", s.Subscriptions)
}
for _, cmd := range cmds {
if err := s.conn.WriteJSON(cmd); err != nil {
return errors.Wrapf(err, "subscribe write error, cmd: %+v", cmd)
}
}
return nil
}
// getEndpoint use the publicOnly flag to check whether we should allocate a public bullet or private bullet // getEndpoint use the publicOnly flag to check whether we should allocate a public bullet or private bullet
func (s *Stream) getEndpoint() (string, error) { func (s *Stream) getEndpoint() (string, error) {
var bullet *kucoinapi.Bullet var bullet *kucoinapi.Bullet