fix(ftx): use timer.ticker()

This commit is contained in:
Jui-Nan Lin 2021-05-24 09:45:33 +08:00
parent 2394aab32e
commit 301ed621e6

View File

@ -129,7 +129,7 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
// FTX does not support kline channel, do polling
go s.subscribeKLine(symbol, option)
} else {
panic("only support book channel now")
panic("only support book/kline channel now")
}
}
@ -153,33 +153,37 @@ func (s *Stream) subscribeKLine(symbol string, option types.SubscribeOptions) {
return
}
for {
if !s.isConnected {
time.Sleep(time.Second * 2)
continue
}
ticker := time.NewTicker(interval.Duration())
defer ticker.Stop()
// get the last kline
since := time.Now().Add(time.Duration(-1*(interval.Minutes())) * time.Minute)
kline, err := s.exchange.QueryKLines(s.ctx, symbol, interval, types.KLineQueryOptions{
for {
select {
case <-s.ctx.Done():
if err := s.ctx.Err(); err != nil {
logger.WithError(err).Errorf("subscribeKLine goroutine is terminated")
}
case <-ticker.C:
klines := getLastKLine(s.exchange, s.ctx, symbol, interval)
if len(klines) >= 0 {
// handle mutiple klines, get the latest one
s.klineMessage <- klines[len(klines)-1]
}
}
}
}
func getLastKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle
since := time.Now().Add(time.Duration(-1*(interval.Minutes()*60+30)) * time.Second)
klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &since,
})
if err != nil {
logger.WithError(err).Errorf("failed to get kline data")
time.Sleep(time.Second * 5)
continue
}
if len(kline) <= 0 {
time.Sleep(time.Second * 5)
continue
return klines
}
s.klineMessage <- kline[0]
intervalSec := int64(interval.Minutes() * 60)
nextReq := intervalSec - time.Now().Unix()%intervalSec + 1
time.Sleep(time.Second * time.Duration(nextReq))
}
return klines
}
func (s *Stream) Close() error {