fix(ftx): send ctx to handleChannelKlineMessage()

This commit is contained in:
Jui-Nan Lin 2021-05-24 10:16:17 +08:00
parent 64387ed2cb
commit ddcd0d3969

View File

@ -17,10 +17,9 @@ const endpoint = "wss://ftx.com/ws/"
type Stream struct { type Stream struct {
*types.StandardStream *types.StandardStream
ws *service.WebsocketClientBase ws *service.WebsocketClientBase
klineMessage chan types.KLine km chan klineMessage
exchange *Exchange exchange *Exchange
ctx context.Context
// publicOnly can only be configured before connecting // publicOnly can only be configured before connecting
publicOnly int32 publicOnly int32
@ -33,11 +32,16 @@ type Stream struct {
subscriptions []websocketRequest subscriptions []websocketRequest
} }
type klineMessage struct {
symbol string
interval types.Interval
}
func NewStream(key, secret string, subAccount string, e *Exchange) *Stream { func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
s := &Stream{ s := &Stream{
exchange: e, exchange: e,
key: key, key: key,
klineMessage: make(chan types.KLine), km: make(chan klineMessage),
secret: secret, secret: secret,
subAccount: subAccount, subAccount: subAccount,
StandardStream: &types.StandardStream{}, StandardStream: &types.StandardStream{},
@ -69,9 +73,8 @@ func (s *Stream) Connect(ctx context.Context) error {
if err := s.ws.Connect(ctx); err != nil { if err := s.ws.Connect(ctx); err != nil {
return err return err
} }
s.ctx = ctx
s.EmitStart() s.EmitStart()
go s.handleChannelKlineMessage() go s.handleChannelKlineMessage(ctx)
go func() { go func() {
// https://docs.ftx.com/?javascript#request-process // https://docs.ftx.com/?javascript#request-process
@ -130,12 +133,16 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
} }
} }
func (s *Stream) handleChannelKlineMessage() { func (s *Stream) handleChannelKlineMessage(ctx context.Context) {
for { for {
select { select {
case kline := <-s.klineMessage: case km := <-s.km:
// FTX only returns closed kline // FTX only returns closed kline
s.EmitKLineClosed(kline) klines := getLastKLine(s.exchange, ctx, km.symbol, km.interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.EmitKLineClosed(klines[len(klines)-1])
}
} }
} }
} }
@ -152,16 +159,8 @@ func (s *Stream) subscribeKLine(symbol string, option types.SubscribeOptions) {
for { for {
select { select {
case <-s.ctx.Done():
if err := s.ctx.Err(); err != nil {
logger.WithError(err).Errorf("subscribeKLine goroutine is terminated")
}
case <-ticker.C: case <-ticker.C:
klines := getLastKLine(s.exchange, s.ctx, symbol, interval) s.km <- klineMessage{symbol: symbol, interval: interval}
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.klineMessage <- klines[len(klines)-1]
}
} }
} }
} }