diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index 8b99c1099..804271ed3 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -17,10 +17,9 @@ const endpoint = "wss://ftx.com/ws/" type Stream struct { *types.StandardStream - ws *service.WebsocketClientBase - klineMessage chan types.KLine - exchange *Exchange - ctx context.Context + ws *service.WebsocketClientBase + km chan klineMessage + exchange *Exchange // publicOnly can only be configured before connecting publicOnly int32 @@ -33,11 +32,16 @@ type Stream struct { subscriptions []websocketRequest } +type klineMessage struct { + symbol string + interval types.Interval +} + func NewStream(key, secret string, subAccount string, e *Exchange) *Stream { s := &Stream{ exchange: e, key: key, - klineMessage: make(chan types.KLine), + km: make(chan klineMessage), secret: secret, subAccount: subAccount, StandardStream: &types.StandardStream{}, @@ -69,9 +73,8 @@ func (s *Stream) Connect(ctx context.Context) error { if err := s.ws.Connect(ctx); err != nil { return err } - s.ctx = ctx s.EmitStart() - go s.handleChannelKlineMessage() + go s.handleChannelKlineMessage(ctx) go func() { // https://docs.ftx.com/?javascript#request-process @@ -130,12 +133,16 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su } } -func (s *Stream) handleChannelKlineMessage() { +func (s *Stream) handleChannelKlineMessage(ctx context.Context) { for { select { - case kline := <-s.klineMessage: + case km := <-s.km: // FTX only returns closed kline - s.EmitKLineClosed(kline) + klines := getLastKLine(s.exchange, ctx, km.symbol, km.interval) + if len(klines) >= 0 { + // handle mutiple klines, get the latest one + s.EmitKLineClosed(klines[len(klines)-1]) + } } } } @@ -152,16 +159,8 @@ func (s *Stream) subscribeKLine(symbol string, option types.SubscribeOptions) { for { select { - case <-s.ctx.Done(): - if err := s.ctx.Err(); err != nil { - logger.WithError(err).Errorf("subscribeKLine goroutine is terminated") - } case <-ticker.C: - klines := getLastKLine(s.exchange, s.ctx, symbol, interval) - if len(klines) >= 0 { - // handle mutiple klines, get the latest one - s.klineMessage <- klines[len(klines)-1] - } + s.km <- klineMessage{symbol: symbol, interval: interval} } } }