support backtesting kline verification

This commit is contained in:
c9s 2020-11-07 00:49:17 +08:00
parent 555fe57341
commit 8823a39fc2
11 changed files with 172 additions and 118 deletions

View File

@ -13,8 +13,8 @@ CREATE TABLE `klines`
`close` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,
`volume` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,
`closed` BOOL NOT NULL DEFAULT TRUE,
`last_trade_id` INT UNSIGNED NULL,
`num_trades` INT UNSIGNED NULL DEFAULT 0,
`last_trade_id` INT UNSIGNED NOT NULL DEFAULT 0,
`num_trades` INT UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (`gid`)

View File

@ -356,17 +356,3 @@ func (environ *Environment) Connect(ctx context.Context) error {
return nil
}
func BatchQueryKLineWindows(ctx context.Context, e types.Exchange, symbol string, intervals []types.Interval, startTime, endTime time.Time) (map[types.Interval]types.KLineWindow, error) {
batch := &types.ExchangeBatchProcessor{Exchange: e}
klineWindows := map[types.Interval]types.KLineWindow{}
for _, interval := range intervals {
kLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime)
if err != nil {
return klineWindows, err
}
klineWindows[interval] = kLines
}
return klineWindows, nil
}

View File

@ -2,9 +2,10 @@ package cmd
import (
"context"
"fmt"
"time"
"github.com/sirupsen/logrus"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
@ -74,36 +75,63 @@ var SyncCmd = &cobra.Command{
}
}
tradeService := &service.TradeService{DB: db}
orderService := &service.OrderService{DB: db}
syncService := &service.SyncService{
TradeService: tradeService,
OrderService: orderService,
}
logrus.Info("syncing trades from exchange...")
if err := syncService.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
return err
}
logrus.Info("syncing orders from exchange...")
if err := syncService.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}
backtest, err := cmd.Flags().GetBool("backtest")
if err != nil {
return err
}
if backtest {
backtestService := &service.BacktestService{DB: db}
if err := backtestService.Sync(ctx, exchange, symbol, startTime) ; err != nil {
if err := backtestService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err
}
log.Info("synchronization done")
log.Infof("verifying backtesting data...")
for interval := range types.SupportedIntervals {
log.Infof("verifying %s kline data...", interval)
klineC, errC := backtestService.QueryKLinesCh(startTime, exchange, symbol, interval)
var emptyKLine types.KLine
var prevKLine types.KLine
for k := range klineC {
fmt.Print(".")
if prevKLine != emptyKLine {
if prevKLine.StartTime.Add(interval.Duration()) != k.StartTime {
log.Errorf("kline corrupted at %+v", k)
}
}
prevKLine = k
}
fmt.Println()
if err := <-errC; err != nil {
return err
}
}
} else {
tradeService := &service.TradeService{DB: db}
orderService := &service.OrderService{DB: db}
syncService := &service.SyncService{
TradeService: tradeService,
OrderService: orderService,
}
log.Info("syncing trades from exchange...")
if err := syncService.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
return err
}
log.Info("syncing orders from exchange...")
if err := syncService.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}
log.Info("synchronization done")
}
logrus.Info("synchronization done")
return nil
},
}

View File

@ -418,6 +418,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
return createdOrders, err
}
// QueryKLines queries the Kline/candlestick bars for a symbol. Klines are uniquely identified by their open time.
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var limit = 500
@ -429,7 +430,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
log.Infof("querying kline %s %s %v", symbol, interval, options)
// avoid rate limit
time.Sleep(100 * time.Millisecond)
time.Sleep(500 * time.Millisecond)
req := e.Client.NewKlinesService().
Symbol(symbol).
Interval(string(interval)).
@ -463,7 +464,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
Volume: util.MustParseFloat(k.Volume),
QuoteVolume: util.MustParseFloat(k.QuoteAssetVolume),
LastTradeID: 0,
NumberOfTrades: k.TradeNum,
NumberOfTrades: uint64(k.TradeNum),
Closed: true,
})
}
@ -537,16 +538,3 @@ func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol string, interval
return allKLines, nil
}
func (e *Exchange) BatchQueryKLineWindows(ctx context.Context, symbol string, intervals []types.Interval, startTime, endTime time.Time) (map[types.Interval]types.KLineWindow, error) {
batch := &types.ExchangeBatchProcessor{Exchange: e}
klineWindows := map[types.Interval]types.KLineWindow{}
for _, interval := range intervals {
klines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime)
if err != nil {
return klineWindows, err
}
klineWindows[interval] = klines
}
return klineWindows, nil
}

View File

@ -425,8 +425,8 @@ func (k *KLine) KLine() types.KLine {
Low: util.MustParseFloat(k.Low),
Volume: util.MustParseFloat(k.Volume),
QuoteVolume: util.MustParseFloat(k.QuoteVolume),
LastTradeID: k.LastTradeID,
NumberOfTrades: k.NumberOfTrades,
LastTradeID: uint64(k.LastTradeID),
NumberOfTrades: uint64(k.NumberOfTrades),
Closed: k.Closed,
}
}

View File

@ -116,19 +116,19 @@ type KLinePayload struct {
func (k KLinePayload) KLine() types.KLine {
return types.KLine{
StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)),
Symbol: k.Market,
Interval: types.Interval(k.Resolution),
Open: util.MustParseFloat(k.Open),
Close: util.MustParseFloat(k.Close),
High: util.MustParseFloat(k.High),
Low: util.MustParseFloat(k.Low),
Volume: util.MustParseFloat(k.Volume),
QuoteVolume: 0,
LastTradeID: k.LastTradeID,
// NumberOfTrades: 0,
Closed: k.Closed,
StartTime: time.Unix(0, k.StartTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.EndTime*int64(time.Millisecond)),
Symbol: k.Market,
Interval: types.Interval(k.Resolution),
Open: util.MustParseFloat(k.Open),
Close: util.MustParseFloat(k.Close),
High: util.MustParseFloat(k.High),
Low: util.MustParseFloat(k.Low),
Volume: util.MustParseFloat(k.Volume),
QuoteVolume: 0, // TODO: add this from kingfisher
LastTradeID: uint64(k.LastTradeID),
NumberOfTrades: 0, // TODO: add this from kingfisher
Closed: k.Closed,
}
}

View File

@ -17,42 +17,45 @@ type BacktestService struct {
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastKLine, err := s.QueryLast(exchange.Name(), symbol, "1m")
if err != nil {
return err
}
if lastKLine != nil {
startTime = lastKLine.EndTime
}
now := time.Now()
for interval := range types.SupportedIntervals {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
// should use channel here
allKLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, time.Now())
lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval)
if err != nil {
return err
}
for _, k := range allKLines {
if lastKLine != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
// should use channel here
klineC, errC := batch.BatchQueryKLines(ctx, symbol, interval, startTime, now)
// var previousKLine types.KLine
for k := range klineC {
if err := s.Insert(k); err != nil {
return err
}
}
if err := <-errC; err != nil {
return err
}
}
return nil
}
// QueryLast queries the last order from the database
func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol, interval string) (*types.KLine, error) {
func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol string, interval types.Interval) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
// make the SQL syntax IDE friendly, so that it can analyze it.
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY gid DESC LIMIT 1"
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT 1"
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
@ -80,46 +83,62 @@ func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol, interval stri
return nil, rows.Err()
}
func (s *BacktestService) QueryKLinesCh(since time.Time, ex types.ExchangeName, symbol string, intervals ...string) (chan types.KLine, error) {
func (s *BacktestService) QueryKLinesCh(since time.Time, exchange types.Exchange, symbol string, intervals ...types.Interval) (chan types.KLine, chan error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :since AND `symbol` = :symbol AND `interval` IN (:intervals) ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
sql = strings.ReplaceAll(sql, "binance_klines", exchange.Name().String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
sql, args, err := sqlx.Named(sql, map[string]interface{}{
"since": since,
"exchange": ex,
"symbol": symbol,
"intervals": intervals,
"intervals": types.IntervalSlice(intervals),
})
sql, args, err = sqlx.In(sql, args...)
sql = s.DB.Rebind(sql)
rows, err := s.DB.Queryx(sql, args...)
if err != nil {
return nil, err
log.WithError(err).Error("query error")
errC := make(chan error, 1)
// avoid blocking
go func() {
errC <- err
close(errC)
}()
return nil, errC
}
return s.scanRowsCh(rows), nil
return s.scanRowsCh(rows)
}
// scanRowsCh scan rows into channel
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) chan types.KLine {
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) (chan types.KLine, chan error) {
ch := make(chan types.KLine, 100)
errC := make(chan error, 1)
go func() {
defer close(ch)
defer close(errC)
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
log.WithError(err).Error("kline scan error")
continue
errC <- err
return
}
ch <- kline
}
if err := rows.Err(); err != nil {
log.WithError(err).Error("kline scan error")
errC <- err
return
}
}()
return ch
return ch, errC
}
func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {

View File

@ -48,7 +48,7 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
}
}
return nil
return <-errC
}
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
@ -89,5 +89,5 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
}
return nil
return <-errC
}

View File

@ -118,28 +118,41 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (allKLines []KLine, err error) {
for startTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (c chan KLine, errC chan error) {
c = make(chan KLine, 1000)
errC = make(chan error, 1)
if err != nil {
return allKLines, err
}
go func() {
defer close(c)
defer close(errC)
for _, kline := range kLines {
if kline.EndTime.After(endTime) {
return allKLines, nil
for startTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime,
Limit: 1000,
})
if err != nil {
errC <- err
return
}
allKLines = append(allKLines, kline)
startTime = kline.EndTime
}
}
if len(kLines) == 0 {
return
}
return allKLines, err
for _, kline := range kLines {
if kline.EndTime.After(endTime) {
return
}
c <- kline
startTime = kline.EndTime
}
}
}()
return c, errC
}
func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) (c chan Trade, errC chan error) {

View File

@ -1,9 +1,20 @@
package types
import "encoding/json"
import (
"encoding/json"
"time"
)
type Interval string
func (i Interval) Minutes() int {
return SupportedIntervals[i]
}
func (i Interval) Duration() time.Duration {
return time.Duration(i.Minutes()) * time.Minute
}
func (i *Interval) UnmarshalJSON(b []byte) (err error) {
var a string
err = json.Unmarshal(b, &a)
@ -19,6 +30,15 @@ func (i Interval) String() string {
return string(i)
}
type IntervalSlice []Interval
func (s IntervalSlice) StringSlice() (slice []string) {
for _, interval := range s {
slice = append(slice, `"` + interval.String() + `"`)
}
return slice
}
var Interval1m = Interval("1m")
var Interval5m = Interval("5m")
var Interval15m = Interval("15m")
@ -53,4 +73,3 @@ type IntervalWindow struct {
// The windows size of the indicator (EWMA and SMA)
Window int
}

View File

@ -45,6 +45,7 @@ type KLineQueryOptions struct {
// KLine uses binance's kline as the standard structure
type KLine struct {
GID uint64 `json:"gid" db:"gid"`
Exchange string `json:"exchange"`
Symbol string `json:"symbol" db:"symbol"`
@ -61,8 +62,8 @@ type KLine struct {
Volume float64 `json:"volume" db:"volume"`
QuoteVolume float64 `json:"quoteVolume" db:"quote_volume"`
LastTradeID int `json:"lastTradeID" db:"last_trade_id"`
NumberOfTrades int64 `json:"numberOfTrades" db:"num_trades"`
LastTradeID uint64 `json:"lastTradeID" db:"last_trade_id"`
NumberOfTrades uint64 `json:"numberOfTrades" db:"num_trades"`
Closed bool `json:"closed" db:"closed"`
}