mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-21 22:43:52 +00:00
Merge pull request #630 from c9s/fix/kline-sync
fix: fix duplicated kline sync issue and add unique index for kline tables
This commit is contained in:
commit
7c98fdfb31
7
.github/workflows/go.yml
vendored
7
.github/workflows/go.yml
vendored
|
@ -7,7 +7,6 @@ on:
|
||||||
branches: [ main ]
|
branches: [ main ]
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
||||||
build:
|
build:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
|
@ -68,3 +67,9 @@ jobs:
|
||||||
|
|
||||||
- name: TestDnum
|
- name: TestDnum
|
||||||
run: go test -tags dnum -v ./pkg/...
|
run: go test -tags dnum -v ./pkg/...
|
||||||
|
|
||||||
|
- name: Create dotenv file
|
||||||
|
run: |
|
||||||
|
echo "DB_DRIVER=mysql" >> .env.local
|
||||||
|
echo "DB_DSN=root:root@/bbgo" >> .env.local
|
||||||
|
|
||||||
|
|
23
config/backtest.yaml
Normal file
23
config/backtest.yaml
Normal file
|
@ -0,0 +1,23 @@
|
||||||
|
---
|
||||||
|
backtest:
|
||||||
|
startTime: "2022-01-01"
|
||||||
|
endTime: "2022-01-02"
|
||||||
|
symbols:
|
||||||
|
- BTCUSDT
|
||||||
|
sessions:
|
||||||
|
- binance
|
||||||
|
- ftx
|
||||||
|
- max
|
||||||
|
- kucoin
|
||||||
|
- okex
|
||||||
|
|
||||||
|
exchangeStrategies:
|
||||||
|
- on: binance
|
||||||
|
grid:
|
||||||
|
symbol: BTCUSDT
|
||||||
|
quantity: 0.001
|
||||||
|
gridNumber: 100
|
||||||
|
profitSpread: 1000.0 # The profit price spread that you want to add to your sell order when your buy order is executed
|
||||||
|
upperPrice: 40_000.0
|
||||||
|
lowerPrice: 20_000.0
|
||||||
|
|
68
migrations/mysql/20220520140707_kline_unique_idx.sql
Normal file
68
migrations/mysql/20220520140707_kline_unique_idx.sql
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
-- +up
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
TRUNCATE TABLE `binance_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
TRUNCATE TABLE `max_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
TRUNCATE TABLE `ftx_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
TRUNCATE TABLE `kucoin_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
TRUNCATE TABLE `okex_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX idx_kline_binance_unique
|
||||||
|
ON binance_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX idx_kline_max_unique
|
||||||
|
ON max_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_ftx_unique`
|
||||||
|
ON ftx_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_kucoin_unique`
|
||||||
|
ON kucoin_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_okex_unique`
|
||||||
|
ON okex_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +down
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_max_unique` ON `max_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;
|
||||||
|
-- +end
|
47
migrations/sqlite3/20220520140707_kline_unique_idx.sql
Normal file
47
migrations/sqlite3/20220520140707_kline_unique_idx.sql
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
-- +up
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX idx_kline_binance_unique
|
||||||
|
ON binance_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX idx_kline_max_unique
|
||||||
|
ON max_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_ftx_unique`
|
||||||
|
ON ftx_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_kucoin_unique`
|
||||||
|
ON kucoin_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
CREATE UNIQUE INDEX `idx_kline_okex_unique`
|
||||||
|
ON okex_klines (`symbol`, `interval`, `start_time`);
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +down
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_max_unique` ON `max_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;
|
||||||
|
-- +end
|
||||||
|
|
||||||
|
-- +begin
|
||||||
|
DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;
|
||||||
|
-- +end
|
|
@ -391,13 +391,7 @@ var BacktestCmd = &cobra.Command{
|
||||||
var numOfExchangeSources = len(exchangeSources)
|
var numOfExchangeSources = len(exchangeSources)
|
||||||
if numOfExchangeSources == 1 {
|
if numOfExchangeSources == 1 {
|
||||||
exSource := exchangeSources[0]
|
exSource := exchangeSources[0]
|
||||||
var lastk types.KLine
|
|
||||||
for k := range exSource.C {
|
for k := range exSource.C {
|
||||||
// avoid duplicated klines
|
|
||||||
if k == lastk {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
exSource.Exchange.ConsumeKLine(k)
|
exSource.Exchange.ConsumeKLine(k)
|
||||||
|
|
||||||
for _, h := range kLineHandlers {
|
for _, h := range kLineHandlers {
|
||||||
|
|
|
@ -13,6 +13,8 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var log = logrus.WithField("component", "batch")
|
||||||
|
|
||||||
type KLineBatchQuery struct {
|
type KLineBatchQuery struct {
|
||||||
types.Exchange
|
types.Exchange
|
||||||
}
|
}
|
||||||
|
@ -25,12 +27,12 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
||||||
defer close(c)
|
defer close(c)
|
||||||
defer close(errC)
|
defer close(errC)
|
||||||
|
|
||||||
tryQueryKlineTimes := 0
|
var tryQueryKlineTimes = 0
|
||||||
|
for startTime.Before(endTime) {
|
||||||
|
log.Debugf("batch query klines %s %s %s <=> %s", symbol, interval, startTime, endTime)
|
||||||
|
|
||||||
var currentTime = startTime
|
|
||||||
for currentTime.Before(endTime) {
|
|
||||||
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
||||||
StartTime: ¤tTime,
|
StartTime: &startTime,
|
||||||
EndTime: &endTime,
|
EndTime: &endTime,
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -39,12 +41,13 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
|
// ensure the kline is in the right order
|
||||||
|
sort.Slice(kLines, func(i, j int) bool {
|
||||||
|
return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix()
|
||||||
|
})
|
||||||
|
|
||||||
if len(kLines) == 0 {
|
if len(kLines) == 0 {
|
||||||
return
|
return
|
||||||
} else if len(kLines) == 1 && kLines[0].StartTime.Unix() == currentTime.Unix() {
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tryQueryKlineTimes++
|
tryQueryKlineTimes++
|
||||||
|
@ -53,7 +56,7 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
||||||
var batchKLines = make([]types.KLine, 0, BatchSize)
|
var batchKLines = make([]types.KLine, 0, BatchSize)
|
||||||
for _, kline := range kLines {
|
for _, kline := range kLines {
|
||||||
// ignore any kline before the given start time of the batch query
|
// ignore any kline before the given start time of the batch query
|
||||||
if currentTime.Unix() != startTime.Unix() && kline.StartTime.Before(currentTime) {
|
if kline.StartTime.Before(startTime) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +77,8 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
|
||||||
}
|
}
|
||||||
|
|
||||||
// The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
|
// The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
|
||||||
currentTime = kline.StartTime.Time()
|
// (above comment was written by @tony1223)
|
||||||
|
startTime = kline.EndTime.Time()
|
||||||
tryQueryKlineTimes = 0
|
tryQueryKlineTimes = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,10 +117,10 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti
|
||||||
|
|
||||||
for startTime.Before(endTime) {
|
for startTime.Before(endTime) {
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
if err := limiter.Wait(ctx); err != nil {
|
||||||
logrus.WithError(err).Error("rate limit error")
|
log.WithError(err).Error("rate limit error")
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Infof("batch querying rewards %s <=> %s", startTime, endTime)
|
log.Infof("batch querying rewards %s <=> %s", startTime, endTime)
|
||||||
|
|
||||||
rewards, err := q.Service.QueryRewards(ctx, startTime)
|
rewards, err := q.Service.QueryRewards(ctx, startTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -231,85 +231,23 @@ func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
|
||||||
|
|
||||||
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
||||||
var klines []types.KLine
|
var klines []types.KLine
|
||||||
var since, until, currentEnd time.Time
|
|
||||||
if options.StartTime != nil {
|
// the fetch result is from newest to oldest
|
||||||
since = *options.StartTime
|
// currentEnd = until
|
||||||
}
|
// endTime := currentEnd.Add(interval.Duration())
|
||||||
if options.EndTime != nil {
|
klines, err := e._queryKLines(ctx, symbol, interval, options)
|
||||||
until = *options.EndTime
|
if err != nil {
|
||||||
} else {
|
return nil, err
|
||||||
until = time.Now()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
currentEnd = until
|
sort.Slice(klines, func(i, j int) bool {
|
||||||
|
return klines[i].StartTime.Unix() < klines[j].StartTime.Unix()
|
||||||
for {
|
})
|
||||||
|
|
||||||
// the fetch result is from newest to oldest
|
|
||||||
endTime := currentEnd.Add(interval.Duration())
|
|
||||||
options.EndTime = &endTime
|
|
||||||
lines, err := e._queryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
|
||||||
StartTime: &since,
|
|
||||||
EndTime: ¤tEnd,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(lines) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, line := range lines {
|
|
||||||
|
|
||||||
if line.StartTime.Unix() < currentEnd.Unix() {
|
|
||||||
currentEnd = line.StartTime.Time()
|
|
||||||
}
|
|
||||||
|
|
||||||
if line.StartTime.Unix() > since.Unix() {
|
|
||||||
klines = append(klines, line)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(lines) == 1 && lines[0].StartTime.Unix() == currentEnd.Unix() {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
outBound := currentEnd.Add(interval.Duration()*-1).Unix() <= since.Unix()
|
|
||||||
if since.IsZero() || currentEnd.Unix() == since.Unix() || outBound {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
if options.Limit != 0 && options.Limit <= len(lines) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() })
|
|
||||||
|
|
||||||
if options.Limit != 0 {
|
|
||||||
limitedItems := len(klines) - options.Limit
|
|
||||||
if limitedItems > 0 {
|
|
||||||
return klines[limitedItems:], nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return klines, nil
|
return klines, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
|
||||||
var since, until time.Time
|
|
||||||
if options.StartTime != nil {
|
|
||||||
since = *options.StartTime
|
|
||||||
}
|
|
||||||
if options.EndTime != nil {
|
|
||||||
until = *options.EndTime
|
|
||||||
} else {
|
|
||||||
until = time.Now()
|
|
||||||
}
|
|
||||||
if since.After(until) {
|
|
||||||
return nil, fmt.Errorf("invalid query klines time range, since: %+v, until: %+v", since, until)
|
|
||||||
}
|
|
||||||
if !isIntervalSupportedInKLine(interval) {
|
if !isIntervalSupportedInKLine(interval) {
|
||||||
return nil, fmt.Errorf("interval %s is not supported", interval.String())
|
return nil, fmt.Errorf("interval %s is not supported", interval.String())
|
||||||
}
|
}
|
||||||
|
@ -318,7 +256,7 @@ func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval typ
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, 0, since, until)
|
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), options.StartTime, options.EndTime)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -621,7 +559,10 @@ func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[stri
|
||||||
}
|
}
|
||||||
|
|
||||||
// ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time
|
// ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time
|
||||||
prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, time.Now().Add(time.Duration(-1)*time.Hour), time.Now())
|
now := time.Now()
|
||||||
|
since := now.Add(time.Duration(-1) * time.Hour)
|
||||||
|
until := now
|
||||||
|
prices, err := rest.HistoricalPrices(ctx, v.Market.LocalSymbol, types.Interval1h, 1, &since, &until)
|
||||||
if err != nil || !prices.Success || len(prices.Result) == 0 {
|
if err != nil || !prices.Success || len(prices.Result) == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,7 +18,7 @@ type marketRequest struct {
|
||||||
supported resolutions: window length in seconds. options: 15, 60, 300, 900, 3600, 14400, 86400
|
supported resolutions: window length in seconds. options: 15, 60, 300, 900, 3600, 14400, 86400
|
||||||
doc: https://docs.ftx.com/?javascript#get-historical-prices
|
doc: https://docs.ftx.com/?javascript#get-historical-prices
|
||||||
*/
|
*/
|
||||||
func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end time.Time) (HistoricalPricesResponse, error) {
|
func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, interval types.Interval, limit int64, start, end *time.Time) (HistoricalPricesResponse, error) {
|
||||||
q := map[string]string{
|
q := map[string]string{
|
||||||
"resolution": strconv.FormatInt(int64(interval.Minutes())*60, 10),
|
"resolution": strconv.FormatInt(int64(interval.Minutes())*60, 10),
|
||||||
}
|
}
|
||||||
|
@ -27,11 +27,11 @@ func (r *marketRequest) HistoricalPrices(ctx context.Context, market string, int
|
||||||
q["limit"] = strconv.FormatInt(limit, 10)
|
q["limit"] = strconv.FormatInt(limit, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
if start != (time.Time{}) {
|
if start != nil {
|
||||||
q["start_time"] = strconv.FormatInt(start.Unix(), 10)
|
q["start_time"] = strconv.FormatInt(start.Unix(), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
if end != (time.Time{}) {
|
if end != nil {
|
||||||
q["end_time"] = strconv.FormatInt(end.Unix(), 10)
|
q["end_time"] = strconv.FormatInt(end.Unix(), 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -17,9 +17,9 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
var marketDataLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
|
var marketDataLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
|
||||||
var queryTradeLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
|
var queryTradeLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
|
||||||
var queryOrderLimiter = rate.NewLimiter(rate.Every(5*time.Second), 1)
|
var queryOrderLimiter = rate.NewLimiter(rate.Every(6*time.Second), 1)
|
||||||
|
|
||||||
var ErrMissingSequence = errors.New("sequence is missing")
|
var ErrMissingSequence = errors.New("sequence is missing")
|
||||||
|
|
||||||
|
|
99
pkg/migrations/mysql/20220520140707_kline_unique_idx.go
Normal file
99
pkg/migrations/mysql/20220520140707_kline_unique_idx.go
Normal file
|
@ -0,0 +1,99 @@
|
||||||
|
package mysql
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/c9s/rockhopper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
AddMigration(upKlineUniqueIdx, downKlineUniqueIdx)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func upKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
|
||||||
|
// This code is executed when the migration is applied.
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "TRUNCATE TABLE `binance_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "TRUNCATE TABLE `max_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "TRUNCATE TABLE `ftx_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "TRUNCATE TABLE `kucoin_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "TRUNCATE TABLE `okex_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_binance_unique\n ON binance_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_max_unique\n ON max_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_ftx_unique`\n ON ftx_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_kucoin_unique`\n ON kucoin_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_okex_unique`\n ON okex_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func downKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
|
||||||
|
// This code is executed when the migration is rolled back.
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_max_unique` ON `max_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
74
pkg/migrations/sqlite3/20220520140707_kline_unique_idx.go
Normal file
74
pkg/migrations/sqlite3/20220520140707_kline_unique_idx.go
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
package sqlite3
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/c9s/rockhopper"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
AddMigration(upKlineUniqueIdx, downKlineUniqueIdx)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func upKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
|
||||||
|
// This code is executed when the migration is applied.
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_binance_unique\n ON binance_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX idx_kline_max_unique\n ON max_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_ftx_unique`\n ON ftx_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_kucoin_unique`\n ON kucoin_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `idx_kline_okex_unique`\n ON okex_klines (`symbol`, `interval`, `start_time`);")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func downKlineUniqueIdx(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
|
||||||
|
// This code is executed when the migration is rolled back.
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_ftx_unique` ON `ftx_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_max_unique` ON `max_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_binance_unique` ON `binance_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_kucoin_unique` ON `kucoin_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err = tx.ExecContext(ctx, "DROP INDEX `idx_kline_okex_unique` ON `okex_klines`;")
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
|
@ -34,9 +34,10 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
if err := s.BatchInsert(klines); err != nil {
|
if err := s.BatchInsert(klines); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
count += len(klines)
|
count += len(klines)
|
||||||
}
|
}
|
||||||
log.Infof("found %s kline %s data count: %d", symbol, interval.String(), count)
|
log.Debugf("inserted klines %s %s data: %d", symbol, interval.String(), count)
|
||||||
|
|
||||||
if err := <-errC; err != nil {
|
if err := <-errC; err != nil {
|
||||||
return err
|
return err
|
||||||
|
@ -344,13 +345,17 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange
|
||||||
for k := range klineC {
|
for k := range klineC {
|
||||||
if nowStartTime.Unix() < k.StartTime.Unix() {
|
if nowStartTime.Unix() < k.StartTime.Unix() {
|
||||||
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
|
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
|
||||||
s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval)
|
if err := s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval); err != nil {
|
||||||
|
log.WithError(err).Errorf("sync error")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
nowStartTime = k.StartTime.Time().Add(interval.Duration())
|
nowStartTime = k.StartTime.Time().Add(interval.Duration())
|
||||||
}
|
}
|
||||||
|
|
||||||
if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() {
|
if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() {
|
||||||
s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval)
|
if err := s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval); err != nil {
|
||||||
|
log.WithError(err).Errorf("sync error")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := <-errC; err != nil {
|
if err := <-errC; err != nil {
|
||||||
|
|
Loading…
Reference in New Issue
Block a user