Merge pull request #676 from c9s/fix/ftx-kline

fix: rewrite kline verifying function
This commit is contained in:
Yo-An Lin 2022-06-06 06:31:37 +08:00 committed by GitHub
commit aa0d3ba279
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 55 deletions

1
.gitignore vendored
View File

@ -43,3 +43,4 @@ otp*png
/.deploy
*.swp
/pkg/backtest/assets.go

View File

@ -148,17 +148,21 @@ func (m *SimplePriceMatching) PlaceOrder(o types.SubmitOrder) (closedOrders *typ
if o.Type == types.OrderTypeMarket {
m.EmitOrderUpdate(order)
// copy the order object to avoid side effect (for different callbacks)
var order2 = order
// emit trade before we publish order
trade := m.newTradeFromOrder(&order, false)
trade := m.newTradeFromOrder(&order2, false)
m.executeTrade(trade)
// update the order status
order.Status = types.OrderStatusFilled
order.ExecutedQuantity = order.Quantity
order.Price = price
order.IsWorking = false
m.EmitOrderUpdate(order)
return &order, &trade, nil
order2.Status = types.OrderStatusFilled
order2.ExecutedQuantity = order2.Quantity
order2.Price = price
order2.IsWorking = false
m.EmitOrderUpdate(order2)
return &order2, &trade, nil
}
// for limit maker orders

View File

@ -600,7 +600,7 @@ func createSymbolReport(userConfig *bbgo.Config, session *bbgo.ExchangeSession,
func verify(userConfig *bbgo.Config, backtestService *service.BacktestService, sourceExchanges map[types.ExchangeName]types.Exchange, startTime time.Time, verboseCnt int) error {
for _, sourceExchange := range sourceExchanges {
err := backtestService.Verify(userConfig.Backtest.Symbols, startTime, time.Now(), sourceExchange, verboseCnt)
err := backtestService.Verify(sourceExchange, userConfig.Backtest.Symbols, startTime, time.Now())
if err != nil {
return err
}

View File

@ -4,7 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"os"
"strconv"
"strings"
"time"
@ -63,45 +62,26 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
return nil
}
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string, startTime time.Time, endTime time.Time) error {
var corruptCnt = 0
for _, symbol := range symbols {
log.Infof("verifying backtesting data...")
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s kline data...", symbol, interval)
log.Infof("verifying %s %s backtesting data: %s to %s...", symbol, interval, startTime, endTime)
klineC, errC := s.QueryKLinesCh(startTime, endTime, sourceExchange, []string{symbol}, []types.Interval{interval})
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {
if verboseCnt > 1 {
fmt.Fprint(os.Stderr, ".")
}
if prevKLine != emptyKLine {
if prevKLine.StartTime.Unix() == k.StartTime.Unix() {
s._deleteDuplicatedKLine(k)
log.Errorf("found kline data duplicated at time: %s kline: %+v , deleted it", k.StartTime, k)
} else if prevKLine.StartTime.Time().Add(interval.Duration()).Unix() != k.StartTime.Time().Unix() {
corruptCnt++
log.Errorf("found kline data corrupted at time: %s kline: %+v", k.StartTime, k)
log.Errorf("between %d and %d",
prevKLine.StartTime.Unix(),
k.StartTime.Unix())
}
}
prevKLine = k
}
if verboseCnt > 1 {
fmt.Fprintln(os.Stderr)
}
if err := <-errC; err != nil {
timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval, startTime, endTime)
if err != nil {
return err
}
if len(timeRanges) == 0 {
continue
}
log.Warnf("found missing time ranges:")
corruptCnt += len(timeRanges)
for _, timeRange := range timeRanges {
log.Warnf("symbol %s interval: %s %v", symbol, interval, timeRange)
}
}
}
@ -116,7 +96,6 @@ func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
@ -306,28 +285,25 @@ func (s *BacktestService) Insert(kline types.KLine) error {
return err
}
func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
if len(k.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
}
tableName := targetKlineTable(k.Exchange)
sql := fmt.Sprintf("DELETE FROM `%s` WHERE gid = :gid ", tableName)
_, err := s.DB.NamedExec(sql, k)
return err
}
type TimeRange struct {
Start time.Time
End time.Time
}
func (t *TimeRange) String() string {
return t.Start.String() + " ~ " + t.End.String()
}
// SyncPartial
// find the existing data time range (t1, t2)
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
// truncate time point to minute
since = since.Truncate(time.Minute)
until = until.Truncate(time.Minute)
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
@ -350,6 +326,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
// there are few cases:
// t1 == since && t2 == until
// [since] ------- [t1] data [t2] ------ [until]
if since.Before(t1.Time()) {
// shift slice
timeRanges = append([]TimeRange{
@ -365,7 +342,7 @@ func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, sy
}
for _, timeRange := range timeRanges {
err = s.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
if err != nil {
return err
}