fix backtest (with review)

This commit is contained in:
TonyQ 2021-12-14 07:15:18 +08:00
parent 7fcbc1edcf
commit 8eb3eede82
12 changed files with 254 additions and 59 deletions

2
go.mod
View File

@ -19,7 +19,7 @@ require (
github.com/google/uuid v1.1.2 github.com/google/uuid v1.1.2
github.com/gorilla/websocket v1.4.2 github.com/gorilla/websocket v1.4.2
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect
github.com/jmoiron/sqlx v1.2.0 github.com/jmoiron/sqlx v1.3.4
github.com/joho/godotenv v1.3.0 github.com/joho/godotenv v1.3.0
github.com/json-iterator/go v1.1.10 // indirect github.com/json-iterator/go v1.1.10 // indirect
github.com/klauspost/compress v1.13.6 // indirect github.com/klauspost/compress v1.13.6 // indirect

5
go.sum
View File

@ -188,6 +188,10 @@ github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uc
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag= github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA= github.com/jmoiron/sqlx v1.2.0 h1:41Ip0zITnmWNR/vHV+S4m+VoUivnWY5E4OJfLZjCJMA=
github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks=
github.com/jmoiron/sqlx v1.3.0 h1:xOXsPZ1cwOn1bhi0p6HzHGkLZicSun/jBtY/YuUuQs8=
github.com/jmoiron/sqlx v1.3.0/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/jmoiron/sqlx v1.3.4 h1:wv+0IJZfL5z0uZoUjlpKgHkgaFSYD+r9CfrXjEXsO7w=
github.com/jmoiron/sqlx v1.3.4/go.mod h1:2BljVx/86SuTyjE+aPYlHCTNvZrnJXghYGpNiXLBMCQ=
github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc= github.com/joho/godotenv v1.3.0 h1:Zjp+RcGpHhGlrMbJzXTrZZPrWj+1vfm90La1wgB6Bhc=
github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg= github.com/joho/godotenv v1.3.0/go.mod h1:7hK45KPybAkOC6peb+G5yklZfMxEjkZhHbwpqxOKXbg=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
@ -230,6 +234,7 @@ github.com/lestrrat-go/file-rotatelogs v2.2.0+incompatible/go.mod h1:ZQnN8lSECae
github.com/lestrrat-go/strftime v1.0.0 h1:wZIfTHGdu7TeGu318uLJwuQvTMt9UpRyS+XV2Rc4wo4= github.com/lestrrat-go/strftime v1.0.0 h1:wZIfTHGdu7TeGu318uLJwuQvTMt9UpRyS+XV2Rc4wo4=
github.com/lestrrat-go/strftime v1.0.0/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g= github.com/lestrrat-go/strftime v1.0.0/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8= github.com/lib/pq v1.9.0 h1:L8nSXQQzAYByakOFMTwpjRoHsMJklur4Gi59b6VivR8=
github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lib/pq v1.9.0/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o=
github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A= github.com/magefile/mage v1.10.0/go.mod h1:z5UZb/iS3GoOSn0JgWuiw7dxlurVYTu+/jHXqQg881A=

View File

@ -109,6 +109,7 @@ type Backtest struct {
RecordTrades bool `json:"recordTrades,omitempty" yaml:"recordTrades,omitempty"` RecordTrades bool `json:"recordTrades,omitempty" yaml:"recordTrades,omitempty"`
Account BacktestAccount `json:"account" yaml:"account"` Account BacktestAccount `json:"account" yaml:"account"`
Symbols []string `json:"symbols" yaml:"symbols"` Symbols []string `json:"symbols" yaml:"symbols"`
Session string `json:"session" yaml:"session"`
} }
func parseTimeWithFormats(strTime string, formats []string) (time.Time, error) { func parseTimeWithFormats(strTime string, formats []string) (time.Time, error) {
@ -121,7 +122,6 @@ func parseTimeWithFormats(strTime string, formats []string) (time.Time, error) {
return time.Time{}, fmt.Errorf("failed to parse time %s, valid formats are %+v", strTime, formats) return time.Time{}, fmt.Errorf("failed to parse time %s, valid formats are %+v", strTime, formats)
} }
func (t Backtest) ParseEndTime() (time.Time, error) { func (t Backtest) ParseEndTime() (time.Time, error) {
if len(t.EndTime) == 0 { if len(t.EndTime) == 0 {
return time.Time{}, errors.New("backtest.endTime must be defined") return time.Time{}, errors.New("backtest.endTime must be defined")
@ -462,7 +462,6 @@ func loadExchangeStrategies(config *Config, stash Stash) (err error) {
return fmt.Errorf("unexpected mount type: %T value: %+v", val, val) return fmt.Errorf("unexpected mount type: %T value: %+v", val, val)
} }
} }
for id, conf := range configStash { for id, conf := range configStash {
// look up the real struct type // look up the real struct type
@ -476,6 +475,9 @@ func loadExchangeStrategies(config *Config, stash Stash) (err error) {
Mounts: mounts, Mounts: mounts,
Strategy: st, Strategy: st,
}) })
} else if id != "on" && id != "off" {
//Show error when we didn't find the Strategy
return fmt.Errorf("strategy %s in config not found", id)
} }
} }
} }

View File

@ -20,7 +20,7 @@ persistence:
strategies: strategies:
- on: max - on: max
swing: test:
symbolPosition: symbolPosition:
persistence: persistence:
type: json type: json

View File

@ -98,24 +98,45 @@ var BacktestCmd = &cobra.Command{
return err return err
} }
exchangeName, err := types.ValidExchangeName(exchangeNameStr)
if err != nil {
return err
}
sourceExchange, err := cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
userConfig, err := bbgo.Load(configFile, true) userConfig, err := bbgo.Load(configFile, true)
if err != nil { if err != nil {
return err return err
} }
//if it's declared in the cmd , use the cmd one first
if exchangeNameStr == "" {
exchangeNameStr = userConfig.Backtest.Session
}
var sourceExchange types.Exchange
var exchangeName types.ExchangeName
for key, session := range userConfig.Sessions {
if exchangeNameStr == key {
err := bbgo.InitExchangeSession(session.Name, session)
if err != nil {
return err
}
sourceExchange = session.Exchange
exchangeName = session.ExchangeName
}
}
if sourceExchange == nil {
exchangeName, err = types.ValidExchangeName(exchangeNameStr)
if err != nil {
return err
}
sourceExchange, err = cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if userConfig.Backtest == nil { if userConfig.Backtest == nil {
return errors.New("backtest config is not defined") return errors.New("backtest config is not defined")
} }
@ -267,7 +288,9 @@ var BacktestCmd = &cobra.Command{
} }
environ.SetStartTime(startTime) environ.SetStartTime(startTime)
environ.AddExchange(exchangeName.String(), backtestExchange)
//exchangeNameStr is the session name.
environ.AddExchange(exchangeNameStr, backtestExchange)
if err := environ.Init(ctx); err != nil { if err := environ.Init(ctx); err != nil {
return err return err

View File

@ -101,7 +101,7 @@ var tradesCmd = &cobra.Command{
log.Infof("%d trades", len(trades)) log.Infof("%d trades", len(trades))
for _, trade := range trades { for _, trade := range trades {
log.Infof("TRADE %s %s %4s %s @ %s orderID %d %s amount %f", log.Infof("TRADE %s %s %4s %s @ %s orderID %d %s amount %f , fee %f %s ",
trade.Exchange.String(), trade.Exchange.String(),
trade.Symbol, trade.Symbol,
trade.Side, trade.Side,
@ -109,7 +109,9 @@ var tradesCmd = &cobra.Command{
util.FormatFloat(trade.Price, 3), util.FormatFloat(trade.Price, 3),
trade.OrderID, trade.OrderID,
trade.Time.Time().Format(time.StampMilli), trade.Time.Time().Format(time.StampMilli),
trade.QuoteQuantity) trade.QuoteQuantity,
trade.Fee,
trade.FeeCurrency)
} }
return nil return nil
}, },

View File

@ -2,6 +2,8 @@ package batch
import ( import (
"context" "context"
"github.com/pkg/errors"
"sort"
"time" "time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -74,18 +76,21 @@ type KLineBatchQuery struct {
types.Exchange types.Exchange
} }
func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) { func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan []types.KLine, errC chan error) {
c = make(chan types.KLine, 1000) c = make(chan []types.KLine, 1000)
errC = make(chan error, 1) errC = make(chan error, 1)
go func() { go func() {
defer close(c) defer close(c)
defer close(errC) defer close(errC)
tryQueryKlineTimes := 0
for startTime.Before(endTime) { for startTime.Before(endTime) {
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
StartTime: &startTime, StartTime: &startTime,
}) })
sort.Slice(kLines, func(i, j int) bool { return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() })
tryQueryKlineTimes++
if err != nil { if err != nil {
errC <- err errC <- err
@ -95,7 +100,9 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
if len(kLines) == 0 { if len(kLines) == 0 {
return return
} }
const BatchSize = 200
var batchKLines = make([]types.KLine, 0, BatchSize)
for _, kline := range kLines { for _, kline := range kLines {
// ignore any kline before the given start time // ignore any kline before the given start time
if kline.StartTime.Before(startTime) { if kline.StartTime.Before(startTime) {
@ -106,8 +113,23 @@ func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval type
return return
} }
c <- kline batchKLines = append(batchKLines, kline)
startTime = kline.EndTime.Add(time.Millisecond)
if len(batchKLines) == BatchSize {
c <- batchKLines
batchKLines = batchKLines[:0]
}
//The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
startTime = kline.EndTime // .Add(time.Millisecond)
tryQueryKlineTimes = 0
}
c <- batchKLines
if tryQueryKlineTimes > 10 { // it means loop 10 times
errC <- errors.Errorf("There's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime :%s ", symbol, interval, startTime.String())
return
} }
} }
}() }()

View File

@ -179,7 +179,74 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap,
return balances, nil return balances, nil
} }
//resolution field in api
//window length in seconds. options: 15, 60, 300, 900, 3600, 14400, 86400, or any multiple of 86400 up to 30*86400
var supportedIntervals = map[types.Interval]int{
types.Interval1m: 1,
types.Interval5m: 5,
types.Interval15m: 15,
types.Interval1h: 60,
types.Interval1d: 60 * 24,
types.Interval3d: 60 * 24 * 3,
}
func (e *Exchange) SupportedInterval() map[types.Interval]int {
return supportedIntervals
}
func (e *Exchange) IsSupportedInterval(interval types.Interval) bool {
return isIntervalSupportedInKLine(interval)
}
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var klines []types.KLine
var since, until, current time.Time
if options.StartTime != nil {
since = *options.StartTime
}
if options.EndTime != nil {
until = *options.EndTime
} else {
until = time.Now()
}
current = until
for {
endTime := current.Add(interval.Duration())
options.EndTime = &endTime
lines, err := e._queryKLines(ctx, symbol, interval, options)
if err != nil {
return nil, err
}
if len(lines) == 0 {
break
}
for _, line := range lines {
if line.EndTime.Unix() < current.Unix() {
current = line.StartTime
}
if line.EndTime.Unix() > since.Unix() {
klines = append(klines, line)
}
}
if since.IsZero() || current.Unix() == since.Unix() {
break
}
}
sort.Slice(klines, func(i, j int) bool { return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() })
return klines, nil
}
func (e *Exchange) _queryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
var since, until time.Time var since, until time.Time
if options.StartTime != nil { if options.StartTime != nil {
since = *options.StartTime since = *options.StartTime
@ -195,6 +262,11 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
if !isIntervalSupportedInKLine(interval) { if !isIntervalSupportedInKLine(interval) {
return nil, fmt.Errorf("interval %s is not supported", interval.String()) return nil, fmt.Errorf("interval %s is not supported", interval.String())
} }
if err := requestLimit.Wait(ctx); err != nil {
return nil, err
}
resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), since, until) resp, err := e.newRest().HistoricalPrices(ctx, toLocalSymbol(symbol), interval, int64(options.Limit), since, until)
if err != nil { if err != nil {
return nil, err return nil, err
@ -203,29 +275,19 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
return nil, fmt.Errorf("ftx returns failure") return nil, fmt.Errorf("ftx returns failure")
} }
var kline []types.KLine var klines []types.KLine
for _, r := range resp.Result { for _, r := range resp.Result {
globalKline, err := toGlobalKLine(symbol, interval, r) globalKline, err := toGlobalKLine(symbol, interval, r)
if err != nil { if err != nil {
return nil, err return nil, err
} }
kline = append(kline, globalKline) klines = append(klines, globalKline)
} }
return kline, nil return klines, nil
}
var supportedInterval = map[int]struct{}{
15: {},
60: {},
300: {},
900: {},
3600: {},
14400: {},
86400: {},
} }
func isIntervalSupportedInKLine(interval types.Interval) bool { func isIntervalSupportedInKLine(interval types.Interval) bool {
_, ok := supportedInterval[interval.Minutes()*60] _, ok := supportedIntervals[interval]
return ok return ok
} }
@ -406,6 +468,11 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
s := since s := since
var lastOrder order var lastOrder order
for hasMoreData { for hasMoreData {
if err := requestLimit.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
resp, err := e.newRest().OrdersHistory(ctx, toLocalSymbol(symbol), s, until, limit) resp, err := e.newRest().OrdersHistory(ctx, toLocalSymbol(symbol), s, until, limit)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -627,12 +627,12 @@ func Test_isIntervalSupportedInKLine(t *testing.T) {
types.Interval5m, types.Interval5m,
types.Interval15m, types.Interval15m,
types.Interval1h, types.Interval1h,
types.Interval4h,
types.Interval1d, types.Interval1d,
} }
for _, i := range supportedIntervals { for _, i := range supportedIntervals {
assert.True(t, isIntervalSupportedInKLine(i)) assert.True(t, isIntervalSupportedInKLine(i))
} }
assert.False(t, isIntervalSupportedInKLine(types.Interval30m)) assert.False(t, isIntervalSupportedInKLine(types.Interval30m))
assert.False(t, isIntervalSupportedInKLine(types.Interval3d)) assert.False(t, isIntervalSupportedInKLine(types.Interval2h))
assert.True(t, isIntervalSupportedInKLine(types.Interval3d))
} }

View File

@ -2,6 +2,7 @@ package service
import ( import (
"context" "context"
"fmt"
"strconv" "strconv"
"strings" "strings"
"time" "time"
@ -35,12 +36,16 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
// should use channel here // should use channel here
klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime) klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime)
// var previousKLine types.KLine // var previousKLine types.KLine
for k := range klineC { count := 0
if err := s.Insert(k); err != nil { for klines := range klineC {
if err := s.BatchInsert(klines); err != nil {
return err return err
} }
count += len(klines)
} }
log.Infof("found %s kline %s data count: %d", symbol, interval.String(), count)
if err := <-errC; err != nil { if err := <-errC; err != nil {
return err return err
@ -51,7 +56,17 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
endTime := time.Now() endTime := time.Now()
for interval := range types.SupportedIntervals {
exCustom, ok := exchange.(types.CustomIntervalProvider)
var supportIntervals map[types.Interval]int
if ok {
supportIntervals = exCustom.SupportedInterval()
} else {
supportIntervals = types.SupportedIntervals
}
for interval := range supportIntervals {
if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil { if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
return err return err
} }
@ -73,12 +88,12 @@ func (s *BacktestService) QueryLastKLine(ex types.ExchangeName, symbol string, i
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) { func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval) log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
tableName := s._targetKlineTable(ex)
// make the SQL syntax IDE friendly, so that it can analyze it. // make the SQL syntax IDE friendly, so that it can analyze it.
sql := "SELECT * FROM binance_klines WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time " + orderBy + " LIMIT " + strconv.Itoa(limit) sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
sql = strings.ReplaceAll(sql, "binance_klines", ex.String()+"_klines")
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex, "exchange": ex.String(),
"interval": interval, "interval": interval,
"symbol": symbol, "symbol": symbol,
}) })
@ -103,14 +118,16 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter
} }
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) { func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time ASC LIMIT :limit" tableName := s._targetKlineTable(exchange)
sql = strings.ReplaceAll(sql, "binance_klines", exchange.String()+"_klines") sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"start_time": startTime, "start_time": startTime,
"limit": limit, "limit": limit,
"symbol": symbol, "symbol": symbol,
"interval": interval, "interval": interval,
"exchange": exchange.String(),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -120,8 +137,10 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
} }
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) { func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit" tableName := s._targetKlineTable(exchange)
sql = strings.ReplaceAll(sql, "binance_klines", exchange.String()+"_klines")
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC" sql = "SELECT t.* FROM (" + sql + ") AS t ORDER BY t.end_time ASC"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
@ -129,6 +148,7 @@ func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbo
"end_time": endTime, "end_time": endTime,
"symbol": symbol, "symbol": symbol,
"interval": interval, "interval": interval,
"exchange": exchange.String(),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@ -138,14 +158,28 @@ func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbo
} }
func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) { func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error) {
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", exchange.Name().String()+"_klines") if len(symbols) == 0 {
errC := make(chan error, 1)
// avoid blocking
go func() {
errC <- errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. ")
close(errC)
}()
return nil, errC
}
tableName := s._targetKlineTable(exchange.Name())
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
sql, args, err := sqlx.Named(sql, map[string]interface{}{ sql, args, err := sqlx.Named(sql, map[string]interface{}{
"since": since, "since": since,
"until": until, "until": until,
"symbols": symbols, "symbols": symbols,
"intervals": types.IntervalSlice(intervals), "intervals": types.IntervalSlice(intervals),
"exchange": exchange.Name().String(),
}) })
sql, args, err = sqlx.In(sql, args...) sql, args, err = sqlx.In(sql, args...)
@ -153,10 +187,9 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
rows, err := s.DB.Queryx(sql, args...) rows, err := s.DB.Queryx(sql, args...)
if err != nil { if err != nil {
log.WithError(err).Error("query error") log.WithError(err).Error("backtest query error")
errC := make(chan error, 1) errC := make(chan error, 1)
// avoid blocking // avoid blocking
go func() { go func() {
errC <- err errC <- err
@ -211,14 +244,48 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e
return klines, rows.Err() return klines, rows.Err()
} }
func (s *BacktestService) _targetKlineTable(exchangeName types.ExchangeName) string {
switch exchangeName {
case types.ExchangeBinance:
return "binance_klines"
case types.ExchangeFTX:
return "ftx_klines"
case types.ExchangeMax:
return "max_klines"
case types.ExchangeOKEx:
return "okex_klines"
default:
return "klines"
}
}
func (s *BacktestService) Insert(kline types.KLine) error { func (s *BacktestService) Insert(kline types.KLine) error {
if len(kline.Exchange) == 0 { if len(kline.Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty") return errors.New("kline.Exchange field should not be empty")
} }
sql := "INSERT INTO `binance_klines` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)" + tableName := s._targetKlineTable(kline.Exchange)
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)"
sql = strings.ReplaceAll(sql, "binance_klines", kline.Exchange.String()+"_klines") sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
_, err := s.DB.NamedExec(sql, kline)
return err
}
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
func (s *BacktestService) BatchInsert(kline []types.KLine) error {
if len(kline) == 0 {
return nil
}
if len(kline[0].Exchange) == 0 {
return errors.New("kline.Exchange field should not be empty")
}
tableName := s._targetKlineTable(kline[0].Exchange)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
" values (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
_, err := s.DB.NamedExec(sql, kline) _, err := s.DB.NamedExec(sql, kline)
return err return err

View File

@ -103,6 +103,11 @@ type ExchangeMarketDataService interface {
QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error) QueryKLines(ctx context.Context, symbol string, interval Interval, options KLineQueryOptions) ([]KLine, error)
} }
type CustomIntervalProvider interface {
SupportedInterval() map[Interval]int
IsSupportedInterval(interval Interval) bool
}
type ExchangeTransferService interface { type ExchangeTransferService interface {
QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error)
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)

View File

@ -87,13 +87,15 @@ func (trade Trade) String() string {
// PlainText is used for telegram-styled messages // PlainText is used for telegram-styled messages
func (trade Trade) PlainText() string { func (trade Trade) PlainText() string {
return fmt.Sprintf("Trade %s %s %s %f @ %f, amount %f", return fmt.Sprintf("Trade %s %s %s %f @ %f, amount %f , fee %f %s ",
trade.Exchange.String(), trade.Exchange.String(),
trade.Symbol, trade.Symbol,
trade.Side, trade.Side,
trade.Quantity, trade.Quantity,
trade.Price, trade.Price,
trade.QuoteQuantity) trade.QuoteQuantity,
trade.Fee,
trade.FeeCurrency)
} }
var slackTradeTextTemplate = ":handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}" var slackTradeTextTemplate = ":handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}"