refactory sync command and upgrade db automatically

This commit is contained in:
c9s 2021-01-14 15:10:11 +08:00
parent ad567dc360
commit d04e1e7816
11 changed files with 87 additions and 81 deletions

2
go.mod
View File

@ -8,7 +8,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/adshao/go-binance/v2 v2.2.1-0.20210108025425-9a582c63144e
github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b
github.com/c9s/rockhopper v1.2.1-0.20210114064926-84f8d06c527b
github.com/c9s/rockhopper v1.2.1-0.20210114070642-bde97ed28999
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-redis/redis/v8 v8.4.0

2
go.sum
View File

@ -41,6 +41,8 @@ github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b h1:4qsZTw8wHHTzFnwrfs3zL
github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b/go.mod h1:RaBe6PIVbQRqwrnjjSoHhlLM601JWdT7KZ0p6rhgI7I=
github.com/c9s/rockhopper v1.2.1-0.20210114064926-84f8d06c527b h1:D4TpmOWK8GVV7bvazJiW17cwTp1ke8aADrr7BalCB4E=
github.com/c9s/rockhopper v1.2.1-0.20210114064926-84f8d06c527b/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/c9s/rockhopper v1.2.1-0.20210114070642-bde97ed28999 h1:++sXjheN0ZuQtOvzqnCx/jCs4wDSQoj2GMEbE1UvsGE=
github.com/c9s/rockhopper v1.2.1-0.20210114070642-bde97ed28999/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=

50
pkg/bbgo/db.go Normal file
View File

@ -0,0 +1,50 @@
package bbgo
import (
"context"
"database/sql"
// register the go migrations
_ "github.com/c9s/bbgo/pkg/migrations"
"github.com/c9s/rockhopper"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
func ConnectMySQL(dsn string) (*sqlx.DB, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
config.ParseTime = true
dsn = config.FormatDSN()
return sqlx.Connect("mysql", dsn)
}
func upgradeDB(ctx context.Context, driver string, db *sql.DB) error {
dialect, err := rockhopper.LoadDialect(driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.Load()
if err != nil {
return err
}
rh := rockhopper.New(driver, dialect, db)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}

View File

@ -24,12 +24,12 @@ var LoadedCrossExchangeStrategies = make(map[string]CrossExchangeStrategy)
func RegisterStrategy(key string, s interface{}) {
loaded := 0
if d, ok := s.(SingleExchangeStrategy) ; ok {
if d, ok := s.(SingleExchangeStrategy); ok {
LoadedExchangeStrategies[key] = d
loaded++
}
if d, ok := s.(CrossExchangeStrategy) ; ok {
if d, ok := s.(CrossExchangeStrategy); ok {
LoadedCrossExchangeStrategies[key] = d
loaded++
}
@ -49,6 +49,7 @@ type Environment struct {
PersistenceServiceFacade *PersistenceServiceFacade
OrderService *service.OrderService
TradeService *service.TradeService
TradeSync *service.SyncService
@ -70,10 +71,30 @@ func (environ *Environment) Sessions() map[string]*ExchangeSession {
return environ.sessions
}
func (environ *Environment) ConfigureDatabase(ctx context.Context) error {
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
db, err := ConnectMySQL(dsn)
if err != nil {
return err
}
if err := upgradeDB(ctx, "mysql", db.DB); err != nil {
return err
}
environ.SetDB(db)
}
return nil
}
func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
environ.OrderService = &service.OrderService{DB: db}
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.SyncService{
TradeService: environ.TradeService,
OrderService: environ.OrderService,
}
return environ

View File

@ -96,7 +96,7 @@ var BacktestCmd = &cobra.Command{
return err
}
db, err := cmdutil.ConnectMySQL(viper.GetString("mysql-url"))
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}

View File

@ -10,7 +10,6 @@ import (
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/types"
)
@ -66,7 +65,7 @@ var CancelCmd = &cobra.Command{
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
db, err := cmdutil.ConnectMySQL(viper.GetString("mysql-url"))
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}

View File

@ -1,17 +0,0 @@
package cmdutil
import (
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
func ConnectMySQL(dsn string) (*sqlx.DB, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
config.ParseTime = true
dsn = config.FormatDSN()
return sqlx.Connect("mysql", dsn)
}

View File

@ -1,34 +1,2 @@
package cmd
import (
"context"
"database/sql"
"github.com/c9s/rockhopper"
)
func upgradeDB(ctx context.Context, driver string, db *sql.DB) error {
dialect, err := rockhopper.LoadDialect(driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.Load()
if err != nil {
return err
}
rh := rockhopper.New(driver, dialect, db)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}

View File

@ -12,6 +12,7 @@ import (
"github.com/c9s/bbgo/pkg/accounting"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
@ -51,7 +52,7 @@ var PnLCmd = &cobra.Command{
return err
}
db, err := cmdutil.ConnectMySQL(viper.GetString("mysql-url"))
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}

View File

@ -84,18 +84,8 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config) error {
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
db, err := cmdutil.ConnectMySQL(dsn)
if err != nil {
return err
}
if err := upgradeDB(ctx, "mysql", db.DB); err != nil {
return err
}
environ.SetDB(db)
if err := environ.ConfigureDatabase(ctx) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {

View File

@ -6,10 +6,9 @@ import (
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
)
@ -47,8 +46,8 @@ var SyncCmd = &cobra.Command{
return err
}
db, err := cmdutil.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
environ := bbgo.NewEnvironment()
if err := environ.ConfigureDatabase(ctx); err != nil {
return err
}
@ -74,20 +73,13 @@ var SyncCmd = &cobra.Command{
}
}
tradeService := &service.TradeService{DB: db}
orderService := &service.OrderService{DB: db}
syncService := &service.SyncService{
TradeService: tradeService,
OrderService: orderService,
}
log.Info("syncing trades from exchange...")
if err := syncService.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
if err := environ.TradeSync.SyncTrades(ctx, exchange, symbol, startTime); err != nil {
return err
}
log.Info("syncing orders from exchange...")
if err := syncService.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
if err := environ.TradeSync.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}