diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index b90aa9217..ab8951a61 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -318,6 +318,10 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) } } + session.MarketDataStream.OnKLine(func(kline types.KLine) { + log.WithField("marketData", "kline").Infof("kline : %+v", kline) + }) + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { log.WithField("marketData", "kline").Infof("kline closed: %+v", kline) }) diff --git a/pkg/exchange/ftx/stream.go b/pkg/exchange/ftx/stream.go index dec0f5d33..250714879 100644 --- a/pkg/exchange/ftx/stream.go +++ b/pkg/exchange/ftx/stream.go @@ -135,20 +135,22 @@ func (s *Stream) Subscribe(channel types.Channel, symbol string, option types.Su } func (s *Stream) pollKLines(ctx context.Context) { + + lastClosed := time.Time{} // get current kline candle for _, sub := range s.klineSubscriptions { - klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) - + klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval) if len(klines) > 0 { // handle mutiple klines, get the latest one - kline := klines[len(klines)-1] - s.EmitKLine(kline) - s.EmitKLineClosed(kline) + s.EmitKLine(klines[0]) + s.EmitKLineClosed(klines[0]) + s.EmitKLine(klines[1]) + lastClosed = klines[0].StartTime } } // the highest resolution of kline is 1min - ticker := time.NewTicker(time.Minute) + ticker := time.NewTicker(time.Second * 30) defer ticker.Stop() for { @@ -166,20 +168,23 @@ func (s *Stream) pollKLines(ctx context.Context) { // not in the checking time slot, check next subscription continue } - klines := getLastClosedKLine(s.exchange, ctx, sub.symbol, sub.interval) + klines := getLast2KLine(s.exchange, ctx, sub.symbol, sub.interval) if len(klines) > 0 { // handle mutiple klines, get the latest one - kline := klines[len(klines)-1] - s.EmitKLine(kline) - s.EmitKLineClosed(kline) + if lastClosed.Unix() < klines[0].StartTime.Unix() { + s.EmitKLine(klines[0]) + s.EmitKLineClosed(klines[0]) + lastClosed = klines[0].StartTime + } + s.EmitKLine(klines[1]) } } } } } -func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { +func getLast2KLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { // set since to more 30s ago to avoid getting no kline candle since := time.Now().Add(time.Duration(interval.Minutes()*-3) * time.Minute) klines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ @@ -191,6 +196,12 @@ func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interva return klines } + return klines +} + +func getLastClosedKLine(e *Exchange, ctx context.Context, symbol string, interval types.Interval) []types.KLine { + // set since to more 30s ago to avoid getting no kline candle + klines := getLast2KLine(e, ctx, symbol, interval) return []types.KLine{klines[0]} }