diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index 51c40dbd1..038c673c3 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -129,7 +129,7 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su // FTX does not support kline channel, do polling go s.subscribeKLine(symbol, option) } else { - panic("only support book channel now") + panic("only support book/kline channel now") } } @@ -153,35 +153,39 @@ func (s *Stream) subscribeKLine(symbol string, option types.SubscribeOptions) { return } + ticker := time.NewTicker(interval.Duration()) + defer ticker.Stop() + for { - if !s.isConnected { - time.Sleep(time.Second * 2) - continue + select { + case <-s.ctx.Done(): + if err := s.ctx.Err(); err != nil { + logger.WithError(err).Errorf("subscribeKLine goroutine is terminated") + } + case <-ticker.C: + klines := getLastKLine(s.exchange, s.ctx, symbol, interval) + if len(klines) >= 0 { + // handle mutiple klines, get the latest one + s.klineMessage <- klines[len(klines)-1] + } } - - // get the last kline - since := time.Now().Add(time.Duration(-1*(interval.Minutes())) * time.Minute) - kline, err := s.exchange.QueryKLines(s.ctx, symbol, interval, types.KLineQueryOptions{ - StartTime: &since, - }) - if err != nil { - logger.WithError(err).Errorf("failed to get kline data") - time.Sleep(time.Second * 5) - continue - } - if len(kline) <= 0 { - time.Sleep(time.Second * 5) - continue - } - - s.klineMessage <- kline[0] - - intervalSec := int64(interval.Minutes() * 60) - nextReq := intervalSec - time.Now().Unix()%intervalSec + 1 - time.Sleep(time.Second * time.Duration(nextReq)) } } +func getLastKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { + // set since to more 30s ago to avoid getting no kline candle + since := time.Now().Add(time.Duration(-1*(interval.Minutes()*60+30)) * time.Second) + klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ + StartTime: &since, + }) + if err != nil { + logger.WithError(err).Errorf("failed to get kline data") + return klines + } + + return klines +} + func (s *Stream) Close() error { s.subscriptions = nil if s.ws != nil {