diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index f70294481..5537df904 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -7,7 +7,6 @@ on: branches: [ main ] jobs: - build: runs-on: ubuntu-latest @@ -68,3 +67,9 @@ jobs: - name: TestDnum run: go test -tags dnum -v ./pkg/... + + - name: Create dotenv file + run: | + echo "DB_DRIVER=mysql" >> .env.local + echo "DB_DSN=root:root@/bbgo" >> .env.local + diff --git a/config/backtest.yaml b/config/backtest.yaml new file mode 100644 index 000000000..d41388b29 --- /dev/null +++ b/config/backtest.yaml @@ -0,0 +1,23 @@ +--- +backtest: + startTime: "2022-01-01" + endTime: "2022-01-02" + symbols: + - BTCUSDT + sessions: + - binance + - ftx + - max + - kucoin + - okex + +exchangeStrategies: +- on: binance + grid: + symbol: BTCUSDT + quantity: 0.001 + gridNumber: 100 + profitSpread: 1000.0 # The profit price spread that you want to add to your sell order when your buy order is executed + upperPrice: 40_000.0 + lowerPrice: 20_000.0 + diff --git a/migrations/mysql/20220520140707_kline_unique_idx.sql b/migrations/mysql/20220520140707_kline_unique_idx.sql new file mode 100644 index 000000000..43ba99db6 --- /dev/null +++ b/migrations/mysql/20220520140707_kline_unique_idx.sql @@ -0,0 +1,68 @@ +-- +up + +-- +begin +TRUNCATE TABLE `binance_klines`; +-- +end + +-- +begin +TRUNCATE TABLE `max_klines`; +-- +end + +-- +begin +TRUNCATE TABLE `ftx_klines`; +-- +end + +-- +begin +TRUNCATE TABLE `kucoin_klines`; +-- +end + +-- +begin +TRUNCATE TABLE `okex_klines`; +-- +end + +-- +begin +CREATE UNIQUE INDEX idx_kline_binance_unique + ON binance_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX idx_kline_max_unique + ON max_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_ftx_unique` + ON ftx_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_kucoin_unique` + ON kucoin_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_okex_unique` + ON okex_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +down + +-- +begin +DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_max_unique` ON `max_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_binance_unique` ON `binance_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_okex_unique` ON `okex_klines`; +-- +end diff --git a/migrations/sqlite3/20220520140707_kline_unique_idx.sql b/migrations/sqlite3/20220520140707_kline_unique_idx.sql new file mode 100644 index 000000000..e45bde5f4 --- /dev/null +++ b/migrations/sqlite3/20220520140707_kline_unique_idx.sql @@ -0,0 +1,47 @@ +-- +up +-- +begin +CREATE UNIQUE INDEX idx_kline_binance_unique + ON binance_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX idx_kline_max_unique + ON max_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_ftx_unique` + ON ftx_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_kucoin_unique` + ON kucoin_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +begin +CREATE UNIQUE INDEX `idx_kline_okex_unique` + ON okex_klines (`symbol`, `interval`, `start_time`); +-- +end + +-- +down + +-- +begin +DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_max_unique` ON `max_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_binance_unique` ON `binance_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`; +-- +end + +-- +begin +DROP INDEX `idx_kline_okex_unique` ON `okex_klines`; +-- +end diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index e15f16645..3379b3aac 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -391,13 +391,7 @@ var BacktestCmd = &cobra.Command{ var numOfExchangeSources = len(exchangeSources) if numOfExchangeSources == 1 { exSource := exchangeSources[0] - var lastk types.KLine for k := range exSource.C { - // avoid duplicated klines - if k == lastk { - continue - } - exSource.Exchange.ConsumeKLine(k) for _, h := range kLineHandlers { diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 325c508b2..dafa90500 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -13,6 +13,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +var log = logrus.WithField("component", "batch") + type KLineBatchQuery struct { types.Exchange } @@ -25,12 +27,12 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type defer close(c) defer close(errC) - tryQueryKlineTimes := 0 + var tryQueryKlineTimes = 0 + for startTime.Before(endTime) { + log.Debugf("batch query klines %s %s %s <=> %s", symbol, interval, startTime, endTime) - var currentTime = startTime - for currentTime.Before(endTime) { kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ - StartTime: ¤tTime, + StartTime: &startTime, EndTime: &endTime, }) @@ -39,12 +41,13 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type return } - sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() }) + // ensure the kline is in the right order + sort.Slice(kLines, func(i, j int) bool { + return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() + }) if len(kLines) == 0 { return - } else if len(kLines) == 1 && kLines[0].StartTime.Unix() == currentTime.Unix() { - return } tryQueryKlineTimes++ @@ -53,7 +56,7 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type var batchKLines = make([]types.KLine, 0, BatchSize) for _, kline := range kLines { // ignore any kline before the given start time of the batch query - if currentTime.Unix() != startTime.Unix() && kline.StartTime.Before(currentTime) { + if kline.StartTime.Before(startTime) { continue } @@ -74,7 +77,8 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type } // The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever. - currentTime = kline.StartTime.Time() + // (above comment was written by @tony1223) + startTime = kline.EndTime.Time() tryQueryKlineTimes = 0 } @@ -113,10 +117,10 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti for startTime.Before(endTime) { if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") + log.WithError(err).Error("rate limit error") } - logrus.Infof("batch querying rewards %s <=> %s", startTime, endTime) + log.Infof("batch querying rewards %s <=> %s", startTime, endTime) rewards, err := q.Service.QueryRewards(ctx, startTime) if err != nil { diff --git a/pkg/exchange/ftx/exchange.go b/pkg/exchange/ftx/exchange.go index 341b60fb4..61f763b9e 100644 --- a/pkg/exchange/ftx/exchange.go +++ b/pkg/exchange/ftx/exchange.go @@ -231,85 +231,23 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { var klines []types.KLine - var since, until, currentEnd time.Time - if options.StartTime != nil { - since = *options.StartTime - } - if options.EndTime != nil { - until = *options.EndTime - } else { - until = time.Now() + + // the fetch result is from newest to oldest + // currentEnd = until + // endTime := currentEnd.Add(interval.Duration()) + klines, err := e._queryKLines(ctx, symbol, interval, options) + if err != nil { + return nil, err } - currentEnd = until - - for { - - // the fetch result is from newest to oldest - endTime := currentEnd.Add(interval.Duration()) - options.EndTime = &endTime - lines, err := e._queryKLines(ctx, symbol, interval, types.KLineQueryOptions{ - StartTime: &since, - EndTime: ¤tEnd, - }) - - if err != nil { - return nil, err - } - - if len(lines) == 0 { - break - } - - for _, line := range lines { - - if line.StartTime.Unix() < currentEnd.Unix() { - currentEnd = line.StartTime.Time() - } - - if line.StartTime.Unix() > since.Unix() { - klines = append(klines, line) - } - } - - if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() { - break - } - - outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix() - if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound { - break - } - - if options.Limit != 0 && options.Limit <= len(lines) { - break - } - } - sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() }) - - if options.Limit != 0 { - limitedItems := len(klines) - options.Limit - if limitedItems > 0 { - return klines[limitedItems:], nil - } - } + sort.Slice(klines, func(i, j int) bool { + return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() + }) return klines, nil } func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { - var since, until time.Time - if options.StartTime != nil { - since = *options.StartTime - } - if options.EndTime != nil { - until = *options.EndTime - } else { - until = time.Now() - } - if since.After(until) { - return nil, fmt.Errorf("invalid query klines time range, since: %+v, until: %+v", since, until) - } if !isIntervalSupportedInKLine(interval) { return nil, fmt.Errorf("interval %s is not supported", interval.String()) } @@ -318,7 +256,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ return nil, err } - resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, 0, since, until) + resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), options.StartTime, options.EndTime) if err != nil { return nil, err } @@ -621,7 +559,10 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri } // ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time - prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, time.Now().Add(time.Duration(-1)*time.Hour), time.Now()) + now := time.Now() + since := now.Add(time.Duration(-1) * time.Hour) + until := now + prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, &since, &until) if err != nil || !prices.Success || len(prices.Result) == 0 { continue } diff --git a/pkg/exchange/ftx/rest_market_request.go b/pkg/exchange/ftx/rest_market_request.go index 039bdaf51..aeb41e17a 100644 --- a/pkg/exchange/ftx/rest_market_request.go +++ b/pkg/exchange/ftx/rest_market_request.go @@ -18,7 +18,7 @@ type marketRequest struct { supported resolutions: window length in seconds. options: 15, 60, 300, 900, 3600, 14400, 86400 doc: https://docs.ftx.com/?javascript#get-historical-prices */ -func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time) (HistoricalPricesResponse, error) { +func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end *time.Time) (HistoricalPricesResponse, error) { q := map[string]string{ "resolution": strconv.FormatInt(int64(interval.Minutes())*60, 10), } @@ -27,11 +27,11 @@ func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, int q["limit"] = strconv.FormatInt(limit, 10) } - if start != (time.Time{}) { + if start != nil { q["start_time"] = strconv.FormatInt(start.Unix(), 10) } - if end != (time.Time{}) { + if end != nil { q["end_time"] = strconv.FormatInt(end.Unix(), 10) } diff --git a/pkg/exchange/kucoin/exchange.go b/pkg/exchange/kucoin/exchange.go index c6451e9bd..84e8a8d1b 100644 --- a/pkg/exchange/kucoin/exchange.go +++ b/pkg/exchange/kucoin/exchange.go @@ -17,9 +17,9 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -var marketDataLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) -var queryTradeLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) -var queryOrderLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1) +var marketDataLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1) +var queryTradeLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1) +var queryOrderLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1) var ErrMissingSequence = errors.New("sequence is missing") diff --git a/pkg/migrations/mysql/20220520140707_kline_unique_idx.go b/pkg/migrations/mysql/20220520140707_kline_unique_idx.go new file mode 100644 index 000000000..78c2f8b95 --- /dev/null +++ b/pkg/migrations/mysql/20220520140707_kline_unique_idx.go @@ -0,0 +1,99 @@ +package mysql + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upKlineUniqueIdx, downKlineUniqueIdx) + +} + +func upKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "TRUNCATE TABLE `binance_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "TRUNCATE TABLE `max_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "TRUNCATE TABLE `ftx_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "TRUNCATE TABLE `kucoin_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "TRUNCATE TABLE `okex_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_binance_unique\n ON binance_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_max_unique\n ON max_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_ftx_unique`\n ON ftx_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_kucoin_unique`\n ON kucoin_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_okex_unique`\n ON okex_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + return err +} + +func downKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_max_unique` ON `max_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/sqlite3/20220520140707_kline_unique_idx.go b/pkg/migrations/sqlite3/20220520140707_kline_unique_idx.go new file mode 100644 index 000000000..605187154 --- /dev/null +++ b/pkg/migrations/sqlite3/20220520140707_kline_unique_idx.go @@ -0,0 +1,74 @@ +package sqlite3 + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upKlineUniqueIdx, downKlineUniqueIdx) + +} + +func upKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_binance_unique\n ON binance_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_max_unique\n ON max_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_ftx_unique`\n ON ftx_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_kucoin_unique`\n ON kucoin_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_okex_unique`\n ON okex_klines (`symbol`, `interval`, `start_time`);") + if err != nil { + return err + } + + return err +} + +func downKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_max_unique` ON `max_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index 1621b8f2c..a7fa4337d 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -34,9 +34,10 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type if err := s.BatchInsert(klines); err != nil { return err } + count += len(klines) } - log.Infof("found %s kline %s data count: %d", symbol, interval.String(), count) + log.Debugf("inserted klines %s %s data: %d", symbol, interval.String(), count) if err := <-errC; err != nil { return err @@ -344,13 +345,17 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange for k := range klineC { if nowStartTime.Unix() < k.StartTime.Unix() { log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime) - s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval) + if err := s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval); err != nil { + log.WithError(err).Errorf("sync error") + } } nowStartTime = k.StartTime.Time().Add(interval.Duration()) } if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() { - s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval) + if err := s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval); err != nil { + log.WithError(err).Errorf("sync error") + } } if err := <-errC; err != nil {