exchange: make ftx kline event more reliable

This commit is contained in:
TonyQ 2021-12-15 11:23:07 +08:00
parent 05323f211f
commit c0b9cc0f0b
2 changed files with 26 additions and 11 deletions

View File

@ -318,6 +318,10 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
} }
} }
session.MarketDataStream.OnKLine(func(kline types.KLine) {
log.WithField("marketData", "kline").Infof("kline : %+v", kline)
})
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
log.WithField("marketData", "kline").Infof("kline closed: %+v", kline) log.WithField("marketData", "kline").Infof("kline closed: %+v", kline)
}) })

View File

@ -135,20 +135,22 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su
} }
func (s *Stream) pollKLines(ctx context.Context) { func (s *Stream) pollKLines(ctx context.Context) {
lastClosed := time.Time{}
// get current kline candle // get current kline candle
for _, sub := range s.klineSubscriptions { for _, sub := range s.klineSubscriptions {
klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) > 0 { if len(klines) > 0 {
// handle mutiple klines, get the latest one // handle mutiple klines, get the latest one
kline := klines[len(klines)-1] s.EmitKLine(klines[0])
s.EmitKLine(kline) s.EmitKLineClosed(klines[0])
s.EmitKLineClosed(kline) s.EmitKLine(klines[1])
lastClosed = klines[0].StartTime
} }
} }
// the highest resolution of kline is 1min // the highest resolution of kline is 1min
ticker := time.NewTicker(time.Minute) ticker := time.NewTicker(time.Second * 30)
defer ticker.Stop() defer ticker.Stop()
for { for {
@ -166,20 +168,23 @@ func (s *Stream) pollKLines(ctx context.Context) {
// not in the checking time slot, check next subscription // not in the checking time slot, check next subscription
continue continue
} }
klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval)
if len(klines) > 0 { if len(klines) > 0 {
// handle mutiple klines, get the latest one // handle mutiple klines, get the latest one
kline := klines[len(klines)-1] if lastClosed.Unix() < klines[0].StartTime.Unix() {
s.EmitKLine(kline) s.EmitKLine(klines[0])
s.EmitKLineClosed(kline) s.EmitKLineClosed(klines[0])
lastClosed = klines[0].StartTime
}
s.EmitKLine(klines[1])
} }
} }
} }
} }
} }
func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { func getLast2KLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle // set since to more 30s ago to avoid getting no kline candle
since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute) since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute)
klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
@ -191,6 +196,12 @@ func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interva
return klines return klines
} }
return klines
}
func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine {
// set since to more 30s ago to avoid getting no kline candle
klines := getLast2KLine(e, ctx, symbol, interval)
return []types.KLine{klines[0]} return []types.KLine{klines[0]}
} }