implement kline sync function from command

This commit is contained in:
c9s 2020-11-06 21:40:48 +08:00
parent f78fefb3b0
commit 555fe57341
8 changed files with 179 additions and 130 deletions

View File

@ -20,7 +20,7 @@ CREATE TABLE `klines`
) ENGINE = InnoDB; ) ENGINE = InnoDB;
CREATE INDEX `klines_start_time_symbol_interval` ON klines (`start_time`, `symbol`, `interval`); CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);
CREATE TABLE `okex_klines` LIKE `klines`; CREATE TABLE `okex_klines` LIKE `klines`;
CREATE TABLE `binance_klines` LIKE `klines`; CREATE TABLE `binance_klines` LIKE `klines`;
CREATE TABLE `max_klines` LIKE `klines`; CREATE TABLE `max_klines` LIKE `klines`;

View File

@ -133,7 +133,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
} }
for interval := range types.SupportedIntervals { for interval := range types.SupportedIntervals {
kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval.String(), types.KLineQueryOptions{ kLines, err := session.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
EndTime: &now, EndTime: &now,
Limit: 500, // indicators need at least 100 Limit: 500, // indicators need at least 100
}) })
@ -356,9 +356,9 @@ func (environ *Environment) Connect(ctx context.Context) error {
return nil return nil
} }
func BatchQueryKLineWindows(ctx context.Context, e types.Exchange, symbol string, intervals []string, startTime, endTime time.Time) (map[string]types.KLineWindow, error) { func BatchQueryKLineWindows(ctx context.Context, e types.Exchange, symbol string, intervals []types.Interval, startTime, endTime time.Time) (map[types.Interval]types.KLineWindow, error) {
batch := &types.ExchangeBatchProcessor{Exchange: e} batch := &types.ExchangeBatchProcessor{Exchange: e}
klineWindows := map[string]types.KLineWindow{} klineWindows := map[types.Interval]types.KLineWindow{}
for _, interval := range intervals { for _, interval := range intervals {
kLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime) kLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime)
if err != nil { if err != nil {

View File

@ -16,12 +16,13 @@ func init() {
SyncCmd.Flags().String("exchange", "", "target exchange") SyncCmd.Flags().String("exchange", "", "target exchange")
SyncCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") SyncCmd.Flags().String("symbol", "BTCUSDT", "trading symbol")
SyncCmd.Flags().String("since", "", "sync from time") SyncCmd.Flags().String("since", "", "sync from time")
SyncCmd.Flags().Bool("backtest", true, "sync backtest data")
RootCmd.AddCommand(SyncCmd) RootCmd.AddCommand(SyncCmd)
} }
var SyncCmd = &cobra.Command{ var SyncCmd = &cobra.Command{
Use: "sync", Use: "sync",
Short: "sync trades and orders", Short: "sync data. trades, orders and market data",
SilenceUsage: true, SilenceUsage: true,
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
@ -90,6 +91,18 @@ var SyncCmd = &cobra.Command{
return err return err
} }
backtest, err := cmd.Flags().GetBool("backtest")
if err != nil {
return err
}
if backtest {
backtestService := &service.BacktestService{DB: db}
if err := backtestService.Sync(ctx, exchange, symbol, startTime) ; err != nil {
return err
}
}
logrus.Info("synchronization done") logrus.Info("synchronization done")
return nil return nil
}, },

View File

@ -418,7 +418,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
return createdOrders, err return createdOrders, err
} }
func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, options types.KLineQueryOptions) ([]types.KLine, error) { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var limit = 500 var limit = 500
if options.Limit > 0 { if options.Limit > 0 {
@ -432,7 +432,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, opt
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
req := e.Client.NewKlinesService(). req := e.Client.NewKlinesService().
Symbol(symbol). Symbol(symbol).
Interval(interval). Interval(string(interval)).
Limit(limit) Limit(limit)
if options.StartTime != nil { if options.StartTime != nil {
@ -451,8 +451,9 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, opt
var kLines []types.KLine var kLines []types.KLine
for _, k := range resp { for _, k := range resp {
kLines = append(kLines, types.KLine{ kLines = append(kLines, types.KLine{
Exchange: "binance",
Symbol: symbol, Symbol: symbol,
Interval: types.Interval(interval), Interval: interval,
StartTime: time.Unix(0, k.OpenTime*int64(time.Millisecond)), StartTime: time.Unix(0, k.OpenTime*int64(time.Millisecond)),
EndTime: time.Unix(0, k.CloseTime*int64(time.Millisecond)), EndTime: time.Unix(0, k.CloseTime*int64(time.Millisecond)),
Open: util.MustParseFloat(k.Open), Open: util.MustParseFloat(k.Open),
@ -507,7 +508,7 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
return trades, nil return trades, nil
} }
func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol, interval string, startTime, endTime time.Time) ([]types.KLine, error) { func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) ([]types.KLine, error) {
var allKLines []types.KLine var allKLines []types.KLine
for startTime.Before(endTime) { for startTime.Before(endTime) {
@ -536,9 +537,9 @@ func (e *Exchange) BatchQueryKLines(ctx context.Context, symbol, interval string
return allKLines, nil return allKLines, nil
} }
func (e *Exchange) BatchQueryKLineWindows(ctx context.Context, symbol string, intervals []string, startTime, endTime time.Time) (map[string]types.KLineWindow, error) { func (e *Exchange) BatchQueryKLineWindows(ctx context.Context, symbol string, intervals []types.Interval, startTime, endTime time.Time) (map[types.Interval]types.KLineWindow, error) {
batch := &types.ExchangeBatchProcessor{Exchange: e} batch := &types.ExchangeBatchProcessor{Exchange: e}
klineWindows := map[string]types.KLineWindow{} klineWindows := map[types.Interval]types.KLineWindow{}
for _, interval := range intervals { for _, interval := range intervals {
klines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime) klines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, endTime)
if err != nil { if err != nil {

View File

@ -403,14 +403,14 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
return trades, nil return trades, nil
} }
func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, options types.KLineQueryOptions) ([]types.KLine, error) { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var limit = 5000 var limit = 5000
if options.Limit > 0 { if options.Limit > 0 {
// default limit == 500 // default limit == 500
limit = options.Limit limit = options.Limit
} }
i, err := maxapi.ParseInterval(interval) i, err := maxapi.ParseInterval(string(interval))
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -430,7 +430,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol, interval string, opt
// avoid rate limit // avoid rate limit
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
localKLines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), interval, *options.StartTime, limit) localKLines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), string(interval), *options.StartTime, limit)
if err != nil { if err != nil {
return nil, err return nil, err
} }

149
pkg/service/backtest.go Normal file
View File

@ -0,0 +1,149 @@
package service
import (
"context"
"strings"
"time"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
type BacktestService struct {
DB *sqlx.DB
}
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
lastKLine, err := s.QueryLast(exchange.Name(), symbol, "1m")
if err != nil {
return err
}
if lastKLine != nil {
startTime = lastKLine.EndTime
}
for interval := range types.SupportedIntervals {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
batch := &types.ExchangeBatchProcessor{Exchange: exchange}
// should use channel here
allKLines, err := batch.BatchQueryKLines(ctx, symbol, interval, startTime, time.Now())
if err != nil {
return err
}
for _, k := range allKLines {
if err := s.Insert(k); err != nil {
return err
}
}
}
return nil
}
// QueryLast queries the last order from the database
func (s *BacktestService) QueryLast(ex types.ExchangeName, symbol, interval string) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
// make the SQL syntax IDE friendly, so that it can analyze it.
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY gid DESC LIMIT 1"
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"interval": interval,
"symbol": symbol,
})
if err != nil {
return nil, errors.Wrap(err, "query last order error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
if rows.Next() {
var kline types.KLine
err = rows.StructScan(&kline)
return &kline, err
}
return nil, rows.Err()
}
func (s *BacktestService) QueryKLinesCh(since time.Time, ex types.ExchangeName, symbol string, intervals ...string) (chan types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :since AND `symbol` = :symbol AND `interval` IN (:intervals) ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"since": since,
"exchange": ex,
"symbol": symbol,
"intervals": intervals,
})
if err != nil {
return nil, err
}
return s.scanRowsCh(rows), nil
}
// scanRowsCh scan rows into channel
func (s *BacktestService) scanRowsCh(rows *sqlx.Rows) chan types.KLine {
ch := make(chan types.KLine, 100)
go func() {
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
log.WithError(err).Error("kline scan error")
continue
}
ch <- kline
}
if err := rows.Err(); err != nil {
log.WithError(err).Error("kline scan error")
}
}()
return ch
}
func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
return nil, err
}
klines = append(klines, kline)
}
return klines, rows.Err()
}
func (s *BacktestService) Insert(kline types.KLine) error {
if len(kline.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
}
sql := "INSERT INTO `binance_klines` (`start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`)" +
"VALUES (:start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume)"
sql = strings.ReplaceAll(sql, "binance_klines", kline.Exchange+"_klines")
_, err := s.DB.NamedExec(sql, kline)
return err
}

View File

@ -1,114 +0,0 @@
package service
import (
"strings"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
type KLineService struct {
DB *sqlx.DB
}
// QueryLast queries the last order from the database
func (s *KLineService) QueryLast(ex types.ExchangeName, symbol, interval string) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
table := ex.String() + "_klines"
// make the SQL syntax IDE friendly, so that it can analyze it.
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY gid DESC LIMIT 1"
sql = strings.ReplaceAll(sql, "binance_klines", table)
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"table": table,
"exchange": ex,
"interval": interval,
"symbol": symbol,
})
if err != nil {
return nil, errors.Wrap(err, "query last order error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
if rows.Next() {
var kline types.KLine
err = rows.StructScan(&kline)
return &kline, err
}
return nil, rows.Err()
}
func (s *KLineService) QueryCh(ex types.ExchangeName, symbol string, intervals ...string) (chan types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `symbol` = :symbol AND `interval` IN (:intervals) ORDER BY start_time ASC"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"symbol": symbol,
"intervals": intervals,
})
if err != nil {
return nil, err
}
c := s.scanRowsCh(rows)
return c, nil
}
// scanRowsCh scan rows into channel
func (s *KLineService) scanRowsCh(rows *sqlx.Rows) chan types.KLine {
ch := make(chan types.KLine, 100)
go func() {
defer rows.Close()
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
log.WithError(err).Error("kline scan error")
continue
}
ch <- kline
}
if err := rows.Err(); err != nil {
log.WithError(err).Error("kline scan error")
}
}()
return ch
}
func (s *KLineService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err error) {
for rows.Next() {
var kline types.KLine
if err := rows.StructScan(&kline); err != nil {
return nil, err
}
klines = append(klines, kline)
}
return klines, rows.Err()
}
func (s *KLineService) Insert(kline types.KLine) error {
table := kline.Exchange + "_klines"
sql := `INSERT INTO binance_klines (start_time, end_time, symbol, interval, open, high, low, close, closed, volume)
VALUES (:start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume)`
sql = strings.ReplaceAll(sql, "binance_klines", table)
_, err := s.DB.NamedExec(sql, kline)
return err
}

View File

@ -44,7 +44,7 @@ type Exchange interface {
QueryAccountBalances(ctx context.Context) (BalanceMap, error) QueryAccountBalances(ctx context.Context) (BalanceMap, error)
QueryKLines(ctx context.Context, symbol string, interval string, options KLineQueryOptions) ([]KLine, error) QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)
QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error)
@ -118,7 +118,7 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb
return c, errC return c, errC
} }
func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol, interval string, startTime, endTime time.Time) (allKLines []KLine, err error) { func (e ExchangeBatchProcessor) BatchQueryKLines(ctx context.Context, symbol string, interval Interval, startTime, endTime time.Time) (allKLines []KLine, err error) {
for startTime.Before(endTime) { for startTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{ kLines, err := e.QueryKLines(ctx, symbol, interval, KLineQueryOptions{
StartTime: &startTime, StartTime: &startTime,