From a878f35ca1ce7c8db325bb1c46fc06753fdffe73 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 2 Jun 2022 16:40:24 +0800 Subject: [PATCH 1/4] improve and fix kline sync --- pkg/exchange/batch/kline.go | 6 ++--- pkg/service/backtest.go | 51 +++++++++++++++++++++++++++++++++---- pkg/service/sync_task.go | 13 +++++++--- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/pkg/exchange/batch/kline.go b/pkg/exchange/batch/kline.go index b680c8143..2e9c65981 100644 --- a/pkg/exchange/batch/kline.go +++ b/pkg/exchange/batch/kline.go @@ -4,8 +4,6 @@ import ( "context" "time" - "golang.org/x/time/rate" - "github.com/c9s/bbgo/pkg/types" ) @@ -16,7 +14,7 @@ type KLineBatchQuery struct { func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) { query := &AsyncTimeRangedBatchQuery{ Type: types.KLine{}, - Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Limiter: nil, // the rate limiter is handled in the exchange query method Q: func(startTime, endTime time.Time) (interface{}, error) { return e.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ StartTime: &startTime, @@ -32,7 +30,7 @@ func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval typ }, } - c = make(chan types.KLine, 100) + c = make(chan types.KLine, 3000) errC = query.Query(ctx, c, startTime, endTime) return c, errC } diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index af8248cb8..500ff1305 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -8,6 +8,7 @@ import ( "strings" "time" + sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -23,16 +24,43 @@ type BacktestService struct { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - q := &batch.KLineBatchQuery{Exchange: exchange} + // TODO: use isFutures here + _, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange) + // override symbol if isolatedSymbol is not empty + if isIsolated && len(isolatedSymbol) > 0 { + symbol = isolatedSymbol + } - klineC, errC := q.Query(ctx, symbol, interval, startTime, endTime) - for kline := range klineC { - if err := s.Insert(kline); err != nil { + tasks := []SyncTask{ + { + Type: types.KLine{}, + Time: func(obj interface{}) time.Time { + return obj.(types.KLine).StartTime.Time() + }, + ID: func(obj interface{}) string { + kline := obj.(types.KLine) + return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10) + }, + Select: SelectLastKLines(exchange.Name(), symbol, interval, 100), + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + q := &batch.KLineBatchQuery{Exchange: exchange} + return q.Query(ctx, symbol, interval, startTime, endTime) + }, + Insert: func(obj interface{}) error { + kline := obj.(types.KLine) + return s.Insert(kline) + }, + }, + } + + for _, sel := range tasks { + if err := sel.execute(ctx, s.DB, startTime); err != nil { return err } } - return <-errC + return nil + } func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error { @@ -334,3 +362,16 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange } return nil } + +// TODO: add is_futures column since the klines data is different +func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From(strings.ToLower(ex.String()) + "_klines"). + Where(sq.And{ + sq.Eq{"symbol": symbol}, + sq.Eq{"exchange": ex}, + sq.Eq{"`interval`": interval.String()}, + }). + OrderBy("start_time DESC"). + Limit(limit) +} diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index 9a62dced2..676dcf3d3 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -40,7 +40,7 @@ type SyncTask struct { BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) } -func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error { +func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time, args ...time.Time) error { // query from db recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type) if err != nil { @@ -65,10 +65,15 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim } // default since time point - since := lastRecordTime(sel, recordSliceRef, startTime) + startTime = lastRecordTime(sel, recordSliceRef, startTime) + + endTime := time.Now() + if len(args) > 0 { + endTime = args[0] + } // asset "" means all assets - dataC, errC := sel.BatchQuery(ctx, since, time.Now()) + dataC, errC := sel.BatchQuery(ctx, startTime, endTime) dataCRef := reflect.ValueOf(dataC) for { @@ -148,3 +153,5 @@ func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} return ids } + + From 02a8bf4c8cda012b6c77780a123947f9aaedc00d Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 2 Jun 2022 16:42:18 +0800 Subject: [PATCH 2/4] remove general rate limiter from batch query since it's already handled in the exchange --- pkg/exchange/batch/closedorders.go | 5 +---- pkg/exchange/batch/trade.go | 1 - 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/pkg/exchange/batch/closedorders.go b/pkg/exchange/batch/closedorders.go index b648acc7d..3af58e2d5 100644 --- a/pkg/exchange/batch/closedorders.go +++ b/pkg/exchange/batch/closedorders.go @@ -5,8 +5,6 @@ import ( "strconv" "time" - "golang.org/x/time/rate" - "github.com/c9s/bbgo/pkg/types" ) @@ -16,8 +14,7 @@ type ClosedOrderBatchQuery struct { func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) { query := &AsyncTimeRangedBatchQuery{ - Type: types.Order{}, - Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Type: types.Order{}, Q: func(startTime, endTime time.Time) (interface{}, error) { orders, err := q.ExchangeTradeHistoryService.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID) return orders, err diff --git a/pkg/exchange/batch/trade.go b/pkg/exchange/batch/trade.go index bfec9c1a4..7dd8b861b 100644 --- a/pkg/exchange/batch/trade.go +++ b/pkg/exchange/batch/trade.go @@ -24,7 +24,6 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type endTime := *options.EndTime query := &AsyncTimeRangedBatchQuery{ Type: types.Trade{}, - Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), Q: func(startTime, endTime time.Time) (interface{}, error) { return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, options) }, From 824951c3d50bb695e35ce5a2b9dfdf8e63504bcf Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 2 Jun 2022 16:51:44 +0800 Subject: [PATCH 3/4] batch: add remote query profiler --- pkg/exchange/batch/time_range_query.go | 7 ++++++- pkg/exchange/batch/trade.go | 2 -- pkg/service/sync_task.go | 2 -- pkg/util/profile.go | 26 +++++++++++++++++++++++--- 4 files changed, 29 insertions(+), 8 deletions(-) diff --git a/pkg/exchange/batch/time_range_query.go b/pkg/exchange/batch/time_range_query.go index 88ef0b755..03535951e 100644 --- a/pkg/exchange/batch/time_range_query.go +++ b/pkg/exchange/batch/time_range_query.go @@ -8,6 +8,8 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/util" ) var log = logrus.WithField("component", "batch") @@ -52,6 +54,8 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s log.Debugf("batch querying %T: %v <=> %v", q.Type, startTime, endTime) + queryProfiler := util.StartTimeProfile("remoteQuery") + sliceInf, err := q.Q(startTime, endTime) if err != nil { errC <- err @@ -60,9 +64,10 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s listRef := reflect.ValueOf(sliceInf) listLen := listRef.Len() - log.Debugf("batch querying %T: %d remote records", q.Type, listLen) + queryProfiler.StopAndLog(log.Debugf) + if listLen == 0 { if q.JumpIfEmpty > 0 { startTime = startTime.Add(q.JumpIfEmpty) diff --git a/pkg/exchange/batch/trade.go b/pkg/exchange/batch/trade.go index 7dd8b861b..351800b06 100644 --- a/pkg/exchange/batch/trade.go +++ b/pkg/exchange/batch/trade.go @@ -4,8 +4,6 @@ import ( "context" "time" - "golang.org/x/time/rate" - "github.com/c9s/bbgo/pkg/types" ) diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index 676dcf3d3..fcd19f1a0 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -153,5 +153,3 @@ func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} return ids } - - diff --git a/pkg/util/profile.go b/pkg/util/profile.go index 830e58592..7ca5f5009 100644 --- a/pkg/util/profile.go +++ b/pkg/util/profile.go @@ -1,14 +1,21 @@ package util -import "time" +import ( + "time" +) type TimeProfile struct { + Name string StartTime, EndTime time.Time Duration time.Duration } -func StartTimeProfile() TimeProfile { - return TimeProfile{StartTime: time.Now()} +func StartTimeProfile(args ...string) TimeProfile { + name := "" + if len(args) > 0 { + name = args[0] + } + return TimeProfile{StartTime: time.Now(), Name: name} } func (p *TimeProfile) TilNow() time.Duration { @@ -20,3 +27,16 @@ func (p *TimeProfile) Stop() time.Duration { p.Duration = p.EndTime.Sub(p.StartTime) return p.Duration } + +type logFunction func(format string, args ...interface{}) + +func (p *TimeProfile) StopAndLog(f logFunction) { + duration := p.Stop() + s := "[profile] " + if len(p.Name) > 0 { + s += p.Name + } + + s += " " + duration.String() + f(s) +} From 16322e19fe386ec5a88e4c6b4f47acca20263522 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 2 Jun 2022 16:53:17 +0800 Subject: [PATCH 4/4] service: set kline time to UTC --- pkg/service/backtest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index 500ff1305..a218e53aa 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -35,7 +35,7 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type { Type: types.KLine{}, Time: func(obj interface{}) time.Time { - return obj.(types.KLine).StartTime.Time() + return obj.(types.KLine).StartTime.Time().UTC() }, ID: func(obj interface{}) string { kline := obj.(types.KLine)