mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-25 16:25:16 +00:00
Merge pull request #669 from c9s/fix/kline-partial-sync
fix: fix partial kline sync
This commit is contained in:
commit
9f5575f1ef
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
||||||
|
exchange2 "github.com/c9s/bbgo/pkg/exchange"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -52,7 +53,7 @@ var rootCmd = &cobra.Command{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
exchange, err := cmdutil.NewExchange(exchangeName)
|
exchange, err := exchange2.New(exchangeName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,7 @@ import (
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
"gopkg.in/tucnak/telebot.v2"
|
"gopkg.in/tucnak/telebot.v2"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
exchange2 "github.com/c9s/bbgo/pkg/exchange"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/interact"
|
"github.com/c9s/bbgo/pkg/interact"
|
||||||
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
|
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
|
||||||
|
@ -221,7 +221,7 @@ func (environ *Environment) ConfigureExchangeSessions(userConfig *Config) error
|
||||||
func (environ *Environment) AddExchangesByViperKeys() error {
|
func (environ *Environment) AddExchangesByViperKeys() error {
|
||||||
for _, n := range types.SupportedExchanges {
|
for _, n := range types.SupportedExchanges {
|
||||||
if viper.IsSet(string(n) + "-api-key") {
|
if viper.IsSet(string(n) + "-api-key") {
|
||||||
exchange, err := cmdutil.NewExchangeWithEnvVarPrefix(n, "")
|
exchange, err := exchange2.NewWithEnvVarPrefix(n, "")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
package bbgo
|
package bbgo
|
||||||
|
|
||||||
|
import "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
type Notifier interface {
|
type Notifier interface {
|
||||||
NotifyTo(channel string, obj interface{}, args ...interface{})
|
NotifyTo(channel string, obj interface{}, args ...interface{})
|
||||||
Notify(obj interface{}, args ...interface{})
|
Notify(obj interface{}, args ...interface{})
|
||||||
|
@ -48,6 +50,10 @@ func (m *Notifiability) AddNotifier(notifier Notifier) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Notifiability) Notify(obj interface{}, args ...interface{}) {
|
func (m *Notifiability) Notify(obj interface{}, args ...interface{}) {
|
||||||
|
if str, ok := obj.(string); ok {
|
||||||
|
logrus.Infof(str, args...)
|
||||||
|
}
|
||||||
|
|
||||||
for _, n := range m.notifiers {
|
for _, n := range m.notifiers {
|
||||||
n.Notify(obj, args...)
|
n.Notify(obj, args...)
|
||||||
}
|
}
|
||||||
|
|
|
@ -107,7 +107,7 @@ func loadPersistenceFields(obj interface{}, id string, persistence service.Persi
|
||||||
newValue = newValue.Elem()
|
newValue = newValue.Elem()
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("[loadPersistenceFields] %v = %v (%s) -> %v (%s)\n", field, value, value.Type(), newValue, newValue.Type())
|
log.Debugf("[loadPersistenceFields] %v = %v -> %v\n", field, value, newValue)
|
||||||
|
|
||||||
value.Set(newValue)
|
value.Set(newValue)
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -72,9 +72,6 @@ func iterateFieldsByTag(obj interface{}, tagName string, cb StructFieldIterator)
|
||||||
fv := sv.Field(i)
|
fv := sv.Field(i)
|
||||||
ft := st.Field(i)
|
ft := st.Field(i)
|
||||||
|
|
||||||
fvt := fv.Type()
|
|
||||||
_ = fvt
|
|
||||||
|
|
||||||
// skip unexported fields
|
// skip unexported fields
|
||||||
if !st.Field(i).IsExported() {
|
if !st.Field(i).IsExported() {
|
||||||
continue
|
continue
|
||||||
|
|
|
@ -9,13 +9,13 @@ import (
|
||||||
|
|
||||||
"github.com/slack-go/slack"
|
"github.com/slack-go/slack"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/cache"
|
|
||||||
|
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/viper"
|
"github.com/spf13/viper"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
"github.com/c9s/bbgo/pkg/cache"
|
||||||
|
|
||||||
|
exchange2 "github.com/c9s/bbgo/pkg/exchange"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/indicator"
|
"github.com/c9s/bbgo/pkg/indicator"
|
||||||
"github.com/c9s/bbgo/pkg/service"
|
"github.com/c9s/bbgo/pkg/service"
|
||||||
|
@ -740,17 +740,17 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err
|
||||||
// InitExchange initialize the exchange instance and allocate memory for fields
|
// InitExchange initialize the exchange instance and allocate memory for fields
|
||||||
// In this stage, the session var could be loaded from the JSON config, so the pointer fields are still nil
|
// In this stage, the session var could be loaded from the JSON config, so the pointer fields are still nil
|
||||||
// The Init method will be called after this stage, environment.Init will call the session.Init method later.
|
// The Init method will be called after this stage, environment.Init will call the session.Init method later.
|
||||||
func (session *ExchangeSession) InitExchange(name string, exchange types.Exchange) error {
|
func (session *ExchangeSession) InitExchange(name string, ex types.Exchange) error {
|
||||||
var err error
|
var err error
|
||||||
var exchangeName = session.ExchangeName
|
var exchangeName = session.ExchangeName
|
||||||
if exchange == nil {
|
if ex == nil {
|
||||||
if session.PublicOnly {
|
if session.PublicOnly {
|
||||||
exchange, err = cmdutil.NewExchangePublic(exchangeName)
|
ex, err = exchange2.NewPublic(exchangeName)
|
||||||
} else {
|
} else {
|
||||||
if session.Key != "" && session.Secret != "" {
|
if session.Key != "" && session.Secret != "" {
|
||||||
exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount)
|
ex, err = exchange2.NewStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount)
|
||||||
} else {
|
} else {
|
||||||
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
|
ex, err = exchange2.NewWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -761,7 +761,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
|
||||||
|
|
||||||
// configure exchange
|
// configure exchange
|
||||||
if session.Margin {
|
if session.Margin {
|
||||||
marginExchange, ok := exchange.(types.MarginExchange)
|
marginExchange, ok := ex.(types.MarginExchange)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("exchange %s does not support margin", exchangeName)
|
return fmt.Errorf("exchange %s does not support margin", exchangeName)
|
||||||
}
|
}
|
||||||
|
@ -774,7 +774,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
|
||||||
}
|
}
|
||||||
|
|
||||||
if session.Futures {
|
if session.Futures {
|
||||||
futuresExchange, ok := exchange.(types.FuturesExchange)
|
futuresExchange, ok := ex.(types.FuturesExchange)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("exchange %s does not support futures", exchangeName)
|
return fmt.Errorf("exchange %s does not support futures", exchangeName)
|
||||||
}
|
}
|
||||||
|
@ -792,9 +792,9 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
|
||||||
SessionChannelRouter: NewPatternChannelRouter(nil),
|
SessionChannelRouter: NewPatternChannelRouter(nil),
|
||||||
ObjectChannelRouter: NewObjectChannelRouter(),
|
ObjectChannelRouter: NewObjectChannelRouter(),
|
||||||
}
|
}
|
||||||
session.Exchange = exchange
|
session.Exchange = ex
|
||||||
session.UserDataStream = exchange.NewStream()
|
session.UserDataStream = ex.NewStream()
|
||||||
session.MarketDataStream = exchange.NewStream()
|
session.MarketDataStream = ex.NewStream()
|
||||||
session.MarketDataStream.SetPublicOnly()
|
session.MarketDataStream.SetPublicOnly()
|
||||||
|
|
||||||
// pointer fields
|
// pointer fields
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
|
||||||
"github.com/c9s/bbgo/pkg/data/tsv"
|
"github.com/c9s/bbgo/pkg/data/tsv"
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange"
|
||||||
"github.com/c9s/bbgo/pkg/service"
|
"github.com/c9s/bbgo/pkg/service"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
"github.com/c9s/bbgo/pkg/util"
|
"github.com/c9s/bbgo/pkg/util"
|
||||||
|
@ -186,17 +187,16 @@ var BacktestCmd = &cobra.Command{
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
publicExchange, err := cmdutil.NewExchangePublic(exName)
|
publicExchange, err := exchange.NewPublic(exName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
sourceExchanges[exName] = publicExchange
|
sourceExchanges[exName] = publicExchange
|
||||||
}
|
}
|
||||||
|
|
||||||
if wantSync {
|
|
||||||
var syncFromTime time.Time
|
var syncFromTime time.Time
|
||||||
|
|
||||||
// override the sync from time if the option is given
|
// user can override the sync from time if the option is given
|
||||||
if len(syncFromDateStr) > 0 {
|
if len(syncFromDateStr) > 0 {
|
||||||
syncFromTime, err = time.Parse(types.DateFormat, syncFromDateStr)
|
syncFromTime, err = time.Parse(types.DateFormat, syncFromDateStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -212,8 +212,9 @@ var BacktestCmd = &cobra.Command{
|
||||||
log.Infof("adjusted sync start time %s to %s for backward market data", startTime, syncFromTime)
|
log.Infof("adjusted sync start time %s to %s for backward market data", startTime, syncFromTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if wantSync {
|
||||||
log.Infof("starting synchronization: %v", userConfig.Backtest.Symbols)
|
log.Infof("starting synchronization: %v", userConfig.Backtest.Symbols)
|
||||||
if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime); err != nil {
|
if err := sync(ctx, userConfig, backtestService, sourceExchanges, syncFromTime, endTime); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
log.Info("synchronization done")
|
log.Info("synchronization done")
|
||||||
|
@ -649,9 +650,8 @@ func toExchangeSources(sessions map[string]*bbgo.ExchangeSession, extraIntervals
|
||||||
return exchangeSources, nil
|
return exchangeSources, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service.BacktestService, sourceExchanges map[types.ExchangeName]types.Exchange, syncFromTime time.Time) error {
|
func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service.BacktestService, sourceExchanges map[types.ExchangeName]types.Exchange, syncFrom, syncTo time.Time) error {
|
||||||
for _, symbol := range userConfig.Backtest.Symbols {
|
for _, symbol := range userConfig.Backtest.Symbols {
|
||||||
|
|
||||||
for _, sourceExchange := range sourceExchanges {
|
for _, sourceExchange := range sourceExchanges {
|
||||||
exCustom, ok := sourceExchange.(types.CustomIntervalProvider)
|
exCustom, ok := sourceExchange.(types.CustomIntervalProvider)
|
||||||
|
|
||||||
|
@ -662,11 +662,7 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service
|
||||||
supportIntervals = types.SupportedIntervals
|
supportIntervals = types.SupportedIntervals
|
||||||
}
|
}
|
||||||
|
|
||||||
now := time.Now()
|
|
||||||
for interval := range supportIntervals {
|
for interval := range supportIntervals {
|
||||||
// if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime); err != nil {
|
|
||||||
// return err
|
|
||||||
// }
|
|
||||||
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval)
|
firstKLine, err := backtestService.QueryFirstKLine(sourceExchange.Name(), symbol, interval)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return errors.Wrapf(err, "failed to query backtest kline")
|
return errors.Wrapf(err, "failed to query backtest kline")
|
||||||
|
@ -675,13 +671,13 @@ func sync(ctx context.Context, userConfig *bbgo.Config, backtestService *service
|
||||||
// if we don't have klines before the start time endpoint, the back-test will fail.
|
// if we don't have klines before the start time endpoint, the back-test will fail.
|
||||||
// because the last price will be missing.
|
// because the last price will be missing.
|
||||||
if firstKLine != nil {
|
if firstKLine != nil {
|
||||||
log.Debugf("found existing kline data using partial sync...")
|
|
||||||
if err := backtestService.SyncExist(ctx, sourceExchange, symbol, syncFromTime, now, interval); err != nil {
|
if err := backtestService.SyncPartial(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.Debugf("starting a fresh kline data sync...")
|
log.Debugf("starting a fresh kline data sync...")
|
||||||
if err := backtestService.Sync(ctx, sourceExchange, symbol, syncFromTime, now, interval); err != nil {
|
if err := backtestService.Sync(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,65 +1,2 @@
|
||||||
package cmdutil
|
package cmdutil
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/exchange/binance"
|
|
||||||
"github.com/c9s/bbgo/pkg/exchange/ftx"
|
|
||||||
"github.com/c9s/bbgo/pkg/exchange/kucoin"
|
|
||||||
"github.com/c9s/bbgo/pkg/exchange/max"
|
|
||||||
"github.com/c9s/bbgo/pkg/exchange/okex"
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
func NewExchangePublic(exchangeName types.ExchangeName) (types.Exchange, error) {
|
|
||||||
return NewExchangeStandard(exchangeName, "", "", "", "")
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExchangeStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) {
|
|
||||||
switch n {
|
|
||||||
|
|
||||||
case types.ExchangeFTX:
|
|
||||||
return ftx.NewExchange(key, secret, subAccount), nil
|
|
||||||
|
|
||||||
case types.ExchangeBinance:
|
|
||||||
return binance.New(key, secret), nil
|
|
||||||
|
|
||||||
case types.ExchangeMax:
|
|
||||||
return max.New(key, secret), nil
|
|
||||||
|
|
||||||
case types.ExchangeOKEx:
|
|
||||||
return okex.New(key, secret, passphrase), nil
|
|
||||||
|
|
||||||
case types.ExchangeKucoin:
|
|
||||||
return kucoin.New(key, secret, passphrase), nil
|
|
||||||
|
|
||||||
default:
|
|
||||||
return nil, fmt.Errorf("unsupported exchange: %v", n)
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewExchangeWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) {
|
|
||||||
if len(varPrefix) == 0 {
|
|
||||||
varPrefix = n.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
varPrefix = strings.ToUpper(varPrefix)
|
|
||||||
|
|
||||||
key := os.Getenv(varPrefix + "_API_KEY")
|
|
||||||
secret := os.Getenv(varPrefix + "_API_SECRET")
|
|
||||||
if len(key) == 0 || len(secret) == 0 {
|
|
||||||
return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE")
|
|
||||||
subAccount := os.Getenv(varPrefix + "_SUBACCOUNT")
|
|
||||||
return NewExchangeStandard(n, key, secret, passphrase, subAccount)
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewExchange constructor exchange object from viper config.
|
|
||||||
func NewExchange(n types.ExchangeName) (types.Exchange, error) {
|
|
||||||
return NewExchangeWithEnvVarPrefix(n, "")
|
|
||||||
}
|
|
||||||
|
|
65
pkg/exchange/factory.go
Normal file
65
pkg/exchange/factory.go
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
package exchange
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange/binance"
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange/ftx"
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange/kucoin"
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange/max"
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange/okex"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func NewPublic(exchangeName types.ExchangeName) (types.Exchange, error) {
|
||||||
|
return NewStandard(exchangeName, "", "", "", "")
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) {
|
||||||
|
switch n {
|
||||||
|
|
||||||
|
case types.ExchangeFTX:
|
||||||
|
return ftx.NewExchange(key, secret, subAccount), nil
|
||||||
|
|
||||||
|
case types.ExchangeBinance:
|
||||||
|
return binance.New(key, secret), nil
|
||||||
|
|
||||||
|
case types.ExchangeMax:
|
||||||
|
return max.New(key, secret), nil
|
||||||
|
|
||||||
|
case types.ExchangeOKEx:
|
||||||
|
return okex.New(key, secret, passphrase), nil
|
||||||
|
|
||||||
|
case types.ExchangeKucoin:
|
||||||
|
return kucoin.New(key, secret, passphrase), nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unsupported exchange: %v", n)
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) {
|
||||||
|
if len(varPrefix) == 0 {
|
||||||
|
varPrefix = n.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
varPrefix = strings.ToUpper(varPrefix)
|
||||||
|
|
||||||
|
key := os.Getenv(varPrefix + "_API_KEY")
|
||||||
|
secret := os.Getenv(varPrefix + "_API_SECRET")
|
||||||
|
if len(key) == 0 || len(secret) == 0 {
|
||||||
|
return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix)
|
||||||
|
}
|
||||||
|
|
||||||
|
passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE")
|
||||||
|
subAccount := os.Getenv(varPrefix + "_SUBACCOUNT")
|
||||||
|
return NewStandard(n, key, secret, passphrase, subAccount)
|
||||||
|
}
|
||||||
|
|
||||||
|
// New constructor exchange object from viper config.
|
||||||
|
func New(n types.ExchangeName) (types.Exchange, error) {
|
||||||
|
return NewWithEnvVarPrefix(n, "")
|
||||||
|
}
|
|
@ -9,7 +9,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/service"
|
"github.com/c9s/bbgo/pkg/net/websocketbase"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ const endpoint = "wss://ftx.com/ws/"
|
||||||
type Stream struct {
|
type Stream struct {
|
||||||
*types.StandardStream
|
*types.StandardStream
|
||||||
|
|
||||||
ws *service.WebsocketClientBase
|
ws *websocketbase.WebsocketClientBase
|
||||||
exchange *Exchange
|
exchange *Exchange
|
||||||
|
|
||||||
key string
|
key string
|
||||||
|
@ -42,7 +42,7 @@ func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
|
||||||
secret: secret,
|
secret: secret,
|
||||||
subAccount: subAccount,
|
subAccount: subAccount,
|
||||||
StandardStream: &types.StandardStream{},
|
StandardStream: &types.StandardStream{},
|
||||||
ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
|
ws: websocketbase.NewWebsocketClientBase(endpoint, 3*time.Second),
|
||||||
}
|
}
|
||||||
|
|
||||||
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
|
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
package service
|
package websocketbase
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
@ -8,6 +8,8 @@ import (
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// WebsocketClientBase is a legacy base client
|
||||||
|
// Deprecated: please use standard stream instead.
|
||||||
//go:generate callbackgen -type WebsocketClientBase
|
//go:generate callbackgen -type WebsocketClientBase
|
||||||
type WebsocketClientBase struct {
|
type WebsocketClientBase struct {
|
||||||
baseURL string
|
baseURL string
|
|
@ -1,6 +1,6 @@
|
||||||
// Code generated by "callbackgen -type WebsocketClientBase"; DO NOT EDIT.
|
// Code generated by "callbackgen -type WebsocketClientBase"; DO NOT EDIT.
|
||||||
|
|
||||||
package service
|
package websocketbase
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
|
@ -2,6 +2,7 @@ package service
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -22,7 +23,7 @@ type BacktestService struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
||||||
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
|
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
|
||||||
|
|
||||||
// TODO: use isFutures here
|
// TODO: use isFutures here
|
||||||
_, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
|
_, _, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
|
||||||
|
@ -34,6 +35,7 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
tasks := []SyncTask{
|
tasks := []SyncTask{
|
||||||
{
|
{
|
||||||
Type: types.KLine{},
|
Type: types.KLine{},
|
||||||
|
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, endTime, 100),
|
||||||
Time: func(obj interface{}) time.Time {
|
Time: func(obj interface{}) time.Time {
|
||||||
return obj.(types.KLine).StartTime.Time().UTC()
|
return obj.(types.KLine).StartTime.Time().UTC()
|
||||||
},
|
},
|
||||||
|
@ -41,7 +43,6 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
kline := obj.(types.KLine)
|
kline := obj.(types.KLine)
|
||||||
return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
|
return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
|
||||||
},
|
},
|
||||||
Select: SelectLastKLines(exchange.Name(), symbol, interval, 100),
|
|
||||||
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
|
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
|
||||||
q := &batch.KLineBatchQuery{Exchange: exchange}
|
q := &batch.KLineBatchQuery{Exchange: exchange}
|
||||||
return q.Query(ctx, symbol, interval, startTime, endTime)
|
return q.Query(ctx, symbol, interval, startTime, endTime)
|
||||||
|
@ -54,13 +55,12 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, sel := range tasks {
|
for _, sel := range tasks {
|
||||||
if err := sel.execute(ctx, s.DB, startTime); err != nil {
|
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
|
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
|
||||||
|
@ -115,8 +115,7 @@ func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string,
|
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
||||||
startTime time.Time, endTime time.Time, interval types.Interval) error {
|
|
||||||
|
|
||||||
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
|
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
|
||||||
}
|
}
|
||||||
|
@ -129,12 +128,11 @@ func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string,
|
||||||
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
|
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
|
||||||
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
|
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
|
||||||
|
|
||||||
tableName := s._targetKlineTable(ex)
|
tableName := targetKlineTable(ex)
|
||||||
// make the SQL syntax IDE friendly, so that it can analyze it.
|
// make the SQL syntax IDE friendly, so that it can analyze it.
|
||||||
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
|
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
|
||||||
|
|
||||||
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
|
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
|
||||||
"exchange": ex.String(),
|
|
||||||
"interval": interval,
|
"interval": interval,
|
||||||
"symbol": symbol,
|
"symbol": symbol,
|
||||||
})
|
})
|
||||||
|
@ -160,7 +158,7 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter
|
||||||
|
|
||||||
// QueryKLinesForward is used for querying klines to back-testing
|
// QueryKLinesForward is used for querying klines to back-testing
|
||||||
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
|
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
|
||||||
tableName := s._targetKlineTable(exchange)
|
tableName := targetKlineTable(exchange)
|
||||||
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
|
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
|
||||||
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
||||||
|
|
||||||
|
@ -179,7 +177,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
|
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
|
||||||
tableName := s._targetKlineTable(exchange)
|
tableName := targetKlineTable(exchange)
|
||||||
|
|
||||||
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
|
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
|
||||||
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
||||||
|
@ -205,7 +203,7 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
|
||||||
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
|
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := s._targetKlineTable(exchange.Name())
|
tableName := targetKlineTable(exchange.Name())
|
||||||
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC"
|
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC"
|
||||||
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
|
||||||
|
|
||||||
|
@ -288,7 +286,7 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e
|
||||||
return klines, rows.Err()
|
return klines, rows.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) _targetKlineTable(exchangeName types.ExchangeName) string {
|
func targetKlineTable(exchangeName types.ExchangeName) string {
|
||||||
return strings.ToLower(exchangeName.String()) + "_klines"
|
return strings.ToLower(exchangeName.String()) + "_klines"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -299,7 +297,7 @@ func (s *BacktestService) Insert(kline types.KLine) error {
|
||||||
return errExchangeFieldIsUnset
|
return errExchangeFieldIsUnset
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := s._targetKlineTable(kline.Exchange)
|
tableName := targetKlineTable(kline.Exchange)
|
||||||
|
|
||||||
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
||||||
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
|
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
|
||||||
|
@ -313,47 +311,177 @@ func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
|
||||||
return errors.New("kline.Exchange field should not be empty")
|
return errors.New("kline.Exchange field should not be empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
tableName := s._targetKlineTable(k.Exchange)
|
tableName := targetKlineTable(k.Exchange)
|
||||||
sql := fmt.Sprintf("DELETE FROM `%s` WHERE gid = :gid ", tableName)
|
sql := fmt.Sprintf("DELETE FROM `%s` WHERE gid = :gid ", tableName)
|
||||||
_, err := s.DB.NamedExec(sql, k)
|
_, err := s.DB.NamedExec(sql, k)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange, symbol string,
|
type TimeRange struct {
|
||||||
fromTime time.Time, endTime time.Time, interval types.Interval) error {
|
Start time.Time
|
||||||
klineC, errC := s.QueryKLinesCh(fromTime, endTime, exchange, []string{symbol}, []types.Interval{interval})
|
End time.Time
|
||||||
|
|
||||||
nowStartTime := fromTime
|
|
||||||
for k := range klineC {
|
|
||||||
if nowStartTime.Unix() < k.StartTime.Unix() {
|
|
||||||
log.Infof("syncing %s interval %s syncing %s ~ %s ", symbol, interval, nowStartTime, k.EndTime)
|
|
||||||
if err := s.Sync(ctx, exchange, symbol, nowStartTime, k.EndTime.Time().Add(-1*interval.Duration()), interval); err != nil {
|
|
||||||
log.WithError(err).Errorf("sync error")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
nowStartTime = k.StartTime.Time().Add(interval.Duration())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if nowStartTime.Unix() < endTime.Unix() && nowStartTime.Unix() < time.Now().Unix() {
|
// SyncPartial
|
||||||
if err := s.Sync(ctx, exchange, symbol, nowStartTime, endTime, interval); err != nil {
|
// find the existing data time range (t1, t2)
|
||||||
log.WithError(err).Errorf("sync error")
|
// scan if there is a missing part
|
||||||
}
|
// create a time range slice []TimeRange
|
||||||
}
|
// iterate the []TimeRange slice to sync data.
|
||||||
|
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
|
||||||
if err := <-errC; err != nil {
|
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
|
||||||
|
if err != nil && err != sql.ErrNoRows {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if err == sql.ErrNoRows {
|
||||||
|
// fallback to fresh sync
|
||||||
|
return s.Sync(ctx, ex, symbol, interval, since, until)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("found existing kline data, now using partial sync...")
|
||||||
|
timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(timeRanges) > 0 {
|
||||||
|
log.Infof("found missing time ranges: %v", timeRanges)
|
||||||
|
}
|
||||||
|
|
||||||
|
// there are few cases:
|
||||||
|
// t1 == since && t2 == until
|
||||||
|
if since.Before(t1.Time()) {
|
||||||
|
// shift slice
|
||||||
|
timeRanges = append([]TimeRange{
|
||||||
|
{Start: since.Add(-2 * time.Second), End: t1.Time()}, // include since
|
||||||
|
}, timeRanges...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if t2.Time().Before(until) {
|
||||||
|
timeRanges = append(timeRanges, TimeRange{
|
||||||
|
Start: t2.Time(),
|
||||||
|
End: until.Add(2 * time.Second), // include until
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, timeRange := range timeRanges {
|
||||||
|
err = s.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
|
||||||
|
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
|
||||||
|
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
|
||||||
|
query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
|
||||||
|
sql, args, err := query.ToSql()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
rows, err := s.DB.QueryContext(ctx, sql, args...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var timeRanges []TimeRange
|
||||||
|
var lastTime time.Time
|
||||||
|
var intervalDuration = interval.Duration()
|
||||||
|
for rows.Next() {
|
||||||
|
var tt types.Time
|
||||||
|
if err := rows.Scan(&tt); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var t = time.Time(tt)
|
||||||
|
if lastTime != (time.Time{}) && t.Sub(lastTime) > intervalDuration {
|
||||||
|
timeRanges = append(timeRanges, TimeRange{
|
||||||
|
Start: lastTime,
|
||||||
|
End: t,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
lastTime = t
|
||||||
|
}
|
||||||
|
|
||||||
|
return timeRanges, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
|
||||||
|
sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
|
||||||
|
sql, args, err := sel.ToSql()
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var t1, t2 types.Time
|
||||||
|
|
||||||
|
row := s.DB.QueryRowContext(ctx, sql, args...)
|
||||||
|
|
||||||
|
if err := row.Scan(&t1, &t2); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := row.Err(); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &t1, &t2, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
||||||
|
conditions := sq.And{
|
||||||
|
sq.Eq{"symbol": symbol},
|
||||||
|
sq.Eq{"`interval`": interval.String()},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(args) == 2 {
|
||||||
|
since := args[0]
|
||||||
|
until := args[1]
|
||||||
|
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
||||||
|
}
|
||||||
|
|
||||||
|
tableName := targetKlineTable(ex)
|
||||||
|
|
||||||
|
return sq.Select("start_time").
|
||||||
|
From(tableName).
|
||||||
|
Where(conditions).
|
||||||
|
OrderBy("start_time ASC")
|
||||||
|
}
|
||||||
|
|
||||||
|
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
|
||||||
|
func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
|
||||||
|
conditions := sq.And{
|
||||||
|
sq.Eq{"symbol": symbol},
|
||||||
|
sq.Eq{"`interval`": interval.String()},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(args) == 2 {
|
||||||
|
since := args[0]
|
||||||
|
until := args[1]
|
||||||
|
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
|
||||||
|
}
|
||||||
|
|
||||||
|
tableName := targetKlineTable(ex)
|
||||||
|
|
||||||
|
return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2").
|
||||||
|
From(tableName).
|
||||||
|
Where(conditions)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: add is_futures column since the klines data is different
|
// TODO: add is_futures column since the klines data is different
|
||||||
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, limit uint64) sq.SelectBuilder {
|
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime, endTime time.Time, limit uint64) sq.SelectBuilder {
|
||||||
|
tableName := targetKlineTable(ex)
|
||||||
return sq.Select("*").
|
return sq.Select("*").
|
||||||
From(strings.ToLower(ex.String()) + "_klines").
|
From(tableName).
|
||||||
Where(sq.And{
|
Where(sq.And{
|
||||||
sq.Eq{"symbol": symbol},
|
sq.Eq{"symbol": symbol},
|
||||||
sq.Eq{"exchange": ex},
|
|
||||||
sq.Eq{"`interval`": interval.String()},
|
sq.Eq{"`interval`": interval.String()},
|
||||||
|
sq.Expr("start_time BETWEEN ? AND ?", startTime, endTime),
|
||||||
}).
|
}).
|
||||||
OrderBy("start_time DESC").
|
OrderBy("start_time DESC").
|
||||||
Limit(limit)
|
Limit(limit)
|
||||||
|
|
156
pkg/service/backtest_test.go
Normal file
156
pkg/service/backtest_test.go
Normal file
|
@ -0,0 +1,156 @@
|
||||||
|
package service
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"database/sql"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/jmoiron/sqlx"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/exchange"
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestBacktestService_QueryExistingDataRange(t *testing.T) {
|
||||||
|
db, err := prepareDB(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
dbx := sqlx.NewDb(db.DB, "sqlite3")
|
||||||
|
|
||||||
|
ex, err := exchange.NewPublic(types.ExchangeBinance)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
service := &BacktestService{DB: dbx}
|
||||||
|
|
||||||
|
symbol := "BTCUSDT"
|
||||||
|
now := time.Now()
|
||||||
|
startTime1 := now.AddDate(0, 0, -7).Truncate(time.Hour)
|
||||||
|
endTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
|
||||||
|
// empty range
|
||||||
|
t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h, startTime1, endTime1)
|
||||||
|
assert.Error(t, sql.ErrNoRows, err)
|
||||||
|
assert.Nil(t, t1)
|
||||||
|
assert.Nil(t, t2)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBacktestService_SyncPartial(t *testing.T) {
|
||||||
|
db, err := prepareDB(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
dbx := sqlx.NewDb(db.DB, "sqlite3")
|
||||||
|
|
||||||
|
ex, err := exchange.NewPublic(types.ExchangeBinance)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
service := &BacktestService{DB: dbx}
|
||||||
|
|
||||||
|
symbol := "BTCUSDT"
|
||||||
|
now := time.Now()
|
||||||
|
startTime1 := now.AddDate(0, 0, -7).Truncate(time.Hour)
|
||||||
|
endTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
|
||||||
|
|
||||||
|
startTime2 := now.AddDate(0, 0, -5).Truncate(time.Hour)
|
||||||
|
endTime2 := now.AddDate(0, 0, -4).Truncate(time.Hour)
|
||||||
|
|
||||||
|
// kline query is exclusive
|
||||||
|
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.NotEmpty(t, timeRanges)
|
||||||
|
|
||||||
|
t.Run("fill missing time ranges", func(t *testing.T) {
|
||||||
|
err = service.SyncPartial(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
|
||||||
|
assert.NoError(t, err, "sync partial should not return error")
|
||||||
|
|
||||||
|
timeRanges2, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, timeRanges2)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("extend time ranges", func(t *testing.T) {
|
||||||
|
startTime3 := startTime1.AddDate(0, 0, -3)
|
||||||
|
endTime3 := endTime2.AddDate(0, 0, 3)
|
||||||
|
|
||||||
|
err = service.SyncPartial(ctx, ex, symbol, types.Interval1h, startTime3, endTime3)
|
||||||
|
assert.NoError(t, err, "sync partial should not return error")
|
||||||
|
|
||||||
|
timeRanges3, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime3, endTime3)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, timeRanges3)
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBacktestService_FindMissingTimeRanges(t *testing.T) {
|
||||||
|
db, err := prepareDB(t)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer db.Close()
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
dbx := sqlx.NewDb(db.DB, "sqlite3")
|
||||||
|
|
||||||
|
ex, err := exchange.NewPublic(types.ExchangeBinance)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
service := &BacktestService{DB: dbx}
|
||||||
|
|
||||||
|
symbol := "BTCUSDT"
|
||||||
|
now := time.Now()
|
||||||
|
startTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
|
||||||
|
endTime1 := now.AddDate(0, 0, -5).Truncate(time.Hour)
|
||||||
|
|
||||||
|
startTime2 := now.AddDate(0, 0, -4).Truncate(time.Hour)
|
||||||
|
endTime2 := now.AddDate(0, 0, -3).Truncate(time.Hour)
|
||||||
|
|
||||||
|
// kline query is exclusive
|
||||||
|
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
|
||||||
|
t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
assert.Equal(t, startTime1, t1.Time(), "start time point should match")
|
||||||
|
assert.Equal(t, endTime2, t2.Time(), "end time point should match")
|
||||||
|
}
|
||||||
|
|
||||||
|
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
|
||||||
|
if assert.NoError(t, err) {
|
||||||
|
assert.NotEmpty(t, timeRanges)
|
||||||
|
assert.Len(t, timeRanges, 1, "should find one missing time range")
|
||||||
|
t.Logf("found timeRanges: %+v", timeRanges)
|
||||||
|
|
||||||
|
log.SetLevel(log.DebugLevel)
|
||||||
|
|
||||||
|
for _, timeRange := range timeRanges {
|
||||||
|
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeRanges, err = service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
|
||||||
|
assert.NoError(t, err)
|
||||||
|
assert.Empty(t, timeRanges, "after partial sync, missing time ranges should be back-filled")
|
||||||
|
}
|
||||||
|
}
|
|
@ -63,7 +63,8 @@ func (store *RedisStore) Load(val interface{}) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(data) == 0 {
|
// skip null data
|
||||||
|
if len(data) == 0 || data == "null" {
|
||||||
return ErrPersistenceNotExists
|
return ErrPersistenceNotExists
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4,11 +4,12 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/bbgo"
|
"github.com/c9s/bbgo/pkg/bbgo"
|
||||||
"github.com/c9s/bbgo/pkg/fixedpoint"
|
"github.com/c9s/bbgo/pkg/fixedpoint"
|
||||||
"github.com/c9s/bbgo/pkg/indicator"
|
"github.com/c9s/bbgo/pkg/indicator"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const ID = "pivotshort"
|
const ID = "pivotshort"
|
||||||
|
@ -215,6 +216,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
|
||||||
s.Position = types.NewPositionFromMarket(s.Market)
|
s.Position = types.NewPositionFromMarket(s.Market)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.ProfitStats == nil {
|
||||||
|
s.ProfitStats = types.NewProfitStats(s.Market)
|
||||||
|
}
|
||||||
|
|
||||||
instanceID := s.InstanceID()
|
instanceID := s.InstanceID()
|
||||||
|
|
||||||
// Always update the position fields
|
// Always update the position fields
|
||||||
|
|
Loading…
Reference in New Issue
Block a user