From a9bff7701cf6bf07751c1fe15872cec5b0dee8d8 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 24 Jun 2022 18:14:52 +0800 Subject: [PATCH] sync: avoid adding the millisecond one to the start time --- pkg/exchange/batch/time_range_query.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/pkg/exchange/batch/time_range_query.go b/pkg/exchange/batch/time_range_query.go index 76fd04971..f78afbcac 100644 --- a/pkg/exchange/batch/time_range_query.go +++ b/pkg/exchange/batch/time_range_query.go @@ -34,10 +34,12 @@ type AsyncTimeRangedBatchQuery struct { JumpIfEmpty time.Duration } -func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, startTime, endTime time.Time) chan error { +func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, since, until time.Time) chan error { errC := make(chan error, 1) cRef := reflect.ValueOf(ch) // cRef := reflect.MakeChan(reflect.TypeOf(q.Type), 100) + startTime := since + endTime := until go func() { defer cRef.Close() @@ -93,7 +95,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s for i := 0; i < listLen; i++ { item := listRef.Index(i) entryTime := q.T(item.Interface()) - if entryTime.Before(startTime) || entryTime.After(endTime) { + if entryTime.Before(since) || entryTime.After(until) { continue } @@ -108,7 +110,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s cRef.Send(item) sentAny = true - startTime = entryTime.Add(time.Millisecond) + startTime = entryTime } if !sentAny {