Merge pull request #111 from c9s/feature/sqlite3

add sqlite3 migration support
This commit is contained in:
Yo-An Lin 2021-02-06 11:52:03 +08:00 committed by GitHub
commit 5a5e64cc8d
63 changed files with 1402 additions and 121 deletions

View File

@ -3,9 +3,21 @@ language: go
go:
- 1.14
- 1.15
services:
- redis-server
- mysql
before_install:
- mysql -e 'CREATE DATABASE bbgo;'
install:
- go get github.com/c9s/rockhopper/cmd/rockhopper
before_script:
- go mod download
- make migrations
script:
- bash scripts/test-sqlite3-migrations.sh
- go test -v ./pkg/...

View File

@ -28,9 +28,9 @@ dist: bin-dir bbgo-linux bbgo-darwin
tar -C $(BUILD_DIR) -cvzf $(DIST_DIR)/bbgo-$$(git describe --tags).tar.gz .
migrations:
rockhopper compile --config rockhopper.yaml --output pkg/migrations
git add -v pkg/migrations
git commit -m "Update migration package" pkg/migrations
rockhopper compile --config rockhopper_mysql.yaml --output pkg/migrations/mysql
rockhopper compile --config rockhopper_sqlite.yaml --output pkg/migrations/sqlite3
git add -v pkg/migrations && git commit -m "compile and update migration package" pkg/migrations || true
docker:
GOPATH=$(PWD)/_mod go mod download

4
go.mod
View File

@ -7,7 +7,7 @@ go 1.13
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/adshao/go-binance/v2 v2.2.1-0.20210119141603-20ceb26d876b
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/gin-contrib/cors v1.3.1
@ -53,7 +53,7 @@ require (
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/zserge/lorca v0.1.9
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
gonum.org/v1/gonum v0.8.1

5
go.sum
View File

@ -37,6 +37,8 @@ github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f h1:n1Ly7178MJj+GQB38q4dV66QktUvzEi2rA7xCtTy6Ck=
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9 h1:umJ5T1aKfA6zmHTJffe5axqM9mtr/tscllnX2wnZzBA=
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -240,6 +242,7 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
@ -466,6 +469,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -19,5 +19,6 @@ CREATE TABLE `trades`
PRIMARY KEY (`gid`),
UNIQUE KEY `id` (`id`)
);
-- +down
DROP TABLE `trades`;
DROP TABLE IF EXISTS `trades`;

View File

@ -0,0 +1,20 @@
-- +up
CREATE TABLE `trades`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`id` INTEGER,
`exchange` TEXT NOT NULL DEFAULT '',
`symbol` TEXT NOT NULL,
`price` DECIMAL(16, 8) NOT NULL,
`quantity` DECIMAL(16, 8) NOT NULL,
`quote_quantity` DECIMAL(16, 8) NOT NULL,
`fee` DECIMAL(16, 8) NOT NULL,
`fee_currency` VARCHAR(4) NOT NULL,
`is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,
`is_maker` BOOLEAN NOT NULL DEFAULT FALSE,
`side` VARCHAR(4) NOT NULL DEFAULT '',
`traded_at` DATETIME(3) NOT NULL
);
-- +down
DROP TABLE IF EXISTS `trades`;

View File

@ -0,0 +1,9 @@
-- +up
CREATE INDEX trades_symbol ON trades(symbol);
CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);
-- +down
DROP INDEX trades_symbol;
DROP INDEX trades_symbol_fee_currency;
DROP INDEX trades_traded_at_symbol;

View File

@ -0,0 +1,25 @@
-- +up
CREATE TABLE `orders`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR NOT NULL DEFAULT '',
-- order_id is the order id returned from the exchange
`order_id` INTEGER NOT NULL,
`client_order_id` VARCHAR NOT NULL DEFAULT '',
`order_type` VARCHAR NOT NULL,
`symbol` VARCHAR NOT NULL,
`status` VARCHAR NOT NULL,
`time_in_force` VARCHAR NOT NULL,
`price` DECIMAL(16, 8) NOT NULL,
`stop_price` DECIMAL(16, 8) NOT NULL,
`quantity` DECIMAL(16, 8) NOT NULL,
`executed_quantity` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`side` VARCHAR NOT NULL DEFAULT '',
`is_working` BOOLEAN NOT NULL DEFAULT FALSE,
`created_at` DATETIME(3) NOT NULL,
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- +down
DROP TABLE IF EXISTS `orders`;

View File

@ -0,0 +1,5 @@
-- +up
ALTER TABLE `trades` ADD COLUMN `order_id` INTEGER;
-- +down
ALTER TABLE `trades` RENAME COLUMN `order_id` TO `order_id_deleted`;

View File

@ -0,0 +1,19 @@
-- +up
DROP INDEX IF EXISTS trades_symbol;
DROP INDEX IF EXISTS trades_symbol_fee_currency;
DROP INDEX IF EXISTS trades_traded_at_symbol;
CREATE INDEX trades_symbol ON trades (exchange, symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);
-- +down
DROP INDEX IF EXISTS trades_symbol;
DROP INDEX IF EXISTS trades_symbol_fee_currency;
DROP INDEX IF EXISTS trades_traded_at_symbol;
CREATE INDEX trades_symbol ON trades (symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);

View File

@ -0,0 +1,7 @@
-- +up
CREATE INDEX orders_symbol ON orders (exchange, symbol);
CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);
-- +down
DROP INDEX IF EXISTS orders_symbol;
DROP INDEX IF EXISTS orders_order_id;

View File

@ -0,0 +1,44 @@
-- +up
-- +begin
CREATE TABLE `klines`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR(10) NOT NULL,
`start_time` DATETIME(3) NOT NULL,
`end_time` DATETIME(3) NOT NULL,
`interval` VARCHAR(3) NOT NULL,
`symbol` VARCHAR(7) NOT NULL,
`open` DECIMAL(16, 8) NOT NULL,
`high` DECIMAL(16, 8) NOT NULL,
`low` DECIMAL(16, 8) NOT NULL,
`close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`closed` BOOLEAN NOT NULL DEFAULT TRUE,
`last_trade_id` INT NOT NULL DEFAULT 0,
`num_trades` INT NOT NULL DEFAULT 0
);
-- +end
-- +begin
CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);
-- +end
-- +begin
CREATE TABLE `okex_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +begin
CREATE TABLE `binance_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +begin
CREATE TABLE `max_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +down
DROP INDEX IF EXISTS `klines_end_time_symbol_interval`;
DROP TABLE IF EXISTS `binance_klines`;
DROP TABLE IF EXISTS `okex_klines`;
DROP TABLE IF EXISTS `max_klines`;
DROP TABLE IF EXISTS `klines`;

View File

@ -0,0 +1,5 @@
-- +up
SELECT 1;
-- +down
SELECT 1;

View File

@ -0,0 +1,9 @@
-- +up
-- +begin
CREATE UNIQUE INDEX `trade_unique_id` ON `trades` (`exchange`,`symbol`, `side`, `id`);
-- +end
-- +down
-- +begin
DROP INDEX IF EXISTS `trade_unique_id`;
-- +end

View File

@ -0,0 +1,33 @@
-- +up
-- +begin
ALTER TABLE `trades` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `trades` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `orders` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `orders` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +down
-- +begin
ALTER TABLE `trades` RENAME COLUMN `is_margin` TO `is_margin_deleted`;
-- +end
-- +begin
ALTER TABLE `trades` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;
-- +end
-- +begin
ALTER TABLE `orders` RENAME COLUMN `is_margin` TO `is_margin_deleted`;
-- +end
-- +begin
ALTER TABLE `orders` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;
-- +end

View File

@ -0,0 +1,10 @@
-- +up
-- +begin
CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);
-- +end
-- +down
-- +begin
DROP INDEX trades_price_quantity;
-- +end

View File

@ -1,50 +1,6 @@
package bbgo
import (
"context"
"database/sql"
// register the go migrations
_ "github.com/c9s/bbgo/pkg/migrations"
"github.com/c9s/rockhopper"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
func ConnectMySQL(dsn string) (*sqlx.DB, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
config.ParseTime = true
dsn = config.FormatDSN()
return sqlx.Connect("mysql", dsn)
}
func upgradeDB(ctx context.Context, driver string, db *sql.DB) error {
dialect, err := rockhopper.LoadDialect(driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.Load()
if err != nil {
return err
}
rh := rockhopper.New(driver, dialect, db)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/codingconcepts/env"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
@ -48,6 +47,7 @@ type Environment struct {
PersistenceServiceFacade *PersistenceServiceFacade
DatabaseService *service.DatabaseService
OrderService *service.OrderService
TradeService *service.TradeService
TradeSync *service.SyncService
@ -56,8 +56,6 @@ type Environment struct {
startTime time.Time
tradeScanTime time.Time
sessions map[string]*ExchangeSession
MysqlURL string
}
func NewEnvironment() *Environment {
@ -78,23 +76,19 @@ func (environ *Environment) Sessions() map[string]*ExchangeSession {
return environ.sessions
}
func (environ *Environment) ConfigureDatabase(ctx context.Context, dsn string) error {
db, err := ConnectMySQL(dsn)
func (environ *Environment) ConfigureDatabase(ctx context.Context, driver string, dsn string) error {
environ.DatabaseService = service.NewDatabaseService(driver, dsn)
err := environ.DatabaseService.Connect()
if err != nil {
return err
}
environ.MysqlURL = dsn
if err := upgradeDB(ctx, "mysql", db.DB); err != nil {
if err := environ.DatabaseService.Upgrade(ctx) ; err != nil {
return err
}
environ.SetDB(db)
return nil
}
func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
// get the db connection pool object to create other services
db := environ.DatabaseService.DB
environ.OrderService = &service.OrderService{DB: db}
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.SyncService{
@ -102,7 +96,7 @@ func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
OrderService: environ.OrderService,
}
return environ
return nil
}
// AddExchangeSession adds the existing exchange session or pre-created exchange session

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/backtest"
@ -96,10 +95,6 @@ var BacktestCmd = &cobra.Command{
return err
}
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}
if userConfig.Backtest == nil {
return errors.New("backtest config is not defined")
@ -116,14 +111,12 @@ var BacktestCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
backtestService := &service.BacktestService{DB: db}
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if wantSync {
log.Info("starting synchronization...")
@ -273,8 +266,8 @@ var BacktestCmd = &cobra.Command{
finalBalances.Print()
if wantBaseAssetBaseline {
initBaseAsset := InBaseAsset(initBalances, market, startPrice)
finalBaseAsset := InBaseAsset(finalBalances, market, lastPrice)
initBaseAsset := inBaseAsset(initBalances, market, startPrice)
finalBaseAsset := inBaseAsset(finalBalances, market, lastPrice)
log.Infof("INITIAL ASSET ~= %s %s (1 %s = %f)", market.FormatQuantity(initBaseAsset), market.BaseCurrency, market.BaseCurrency, startPrice)
log.Infof("FINAL ASSET ~= %s %s (1 %s = %f)", market.FormatQuantity(finalBaseAsset), market.BaseCurrency, market.BaseCurrency, lastPrice)
@ -288,8 +281,3 @@ var BacktestCmd = &cobra.Command{
},
}
func InBaseAsset(balances types.BalanceMap, market types.Market, price float64) float64 {
quote := balances[market.QuoteCurrency]
base := balances[market.BaseCurrency]
return (base.Locked.Float64() + base.Available.Float64()) + ((quote.Locked.Float64() + quote.Available.Float64()) / price)
}

View File

@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
@ -63,13 +62,8 @@ var CancelCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}
environ.SetDB(db)
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting"
"github.com/c9s/bbgo/pkg/accounting/pnl"
@ -57,20 +56,19 @@ var PnLCmd = &cobra.Command{
return err
}
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
environ := bbgo.NewEnvironment()
if err := configureDB(ctx, environ) ; err != nil {
return err
}
tradeService := &service.TradeService{DB: db}
var trades []types.Trade
tradingFeeCurrency := exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
log.Infof("loading all trading fee currency related trades: %s", symbol)
trades, err = tradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency)
trades, err = environ.TradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency)
} else {
trades, err = tradeService.Query(service.QueryTradesOptions{
trades, err = environ.TradeService.Query(service.QueryTradesOptions{
Exchange: exchange.Name(),
Symbol: symbol,
Limit: limit,

View File

@ -96,10 +96,8 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer
}
func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userConfig *bbgo.Config) error {
if dsn, ok := os.LookupEnv("MYSQL_URL"); ok {
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {
@ -331,7 +329,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
enableApiServer, err := cmd.Flags().GetBool("enable-web-server")
enableWebServer, err := cmd.Flags().GetBool("enable-web-server")
if err != nil {
return err
}
@ -348,10 +346,7 @@ func run(cmd *cobra.Command, args []string) error {
var userConfig = &bbgo.Config{}
if setup {
log.Infof("running in setup mode, skip reading config file")
enableApiServer = true
} else {
if !setup {
// if it's not setup, then the config file option is required.
if len(configFile) == 0 {
return errors.New("--config option is required")
@ -377,7 +372,7 @@ func run(cmd *cobra.Command, args []string) error {
}
if setup {
return runSetup(ctx, userConfig, enableApiServer)
return runSetup(ctx, userConfig, true)
}
userConfig, err = bbgo.Load(configFile, true)
@ -385,7 +380,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
return runConfig(ctx, userConfig, enableApiServer)
return runConfig(ctx, userConfig, enableWebServer)
}
return runWrapperBinary(ctx, userConfig, cmd, args)

View File

@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
)
@ -52,12 +51,8 @@ var SyncCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {

36
pkg/cmd/utils.go Normal file
View File

@ -0,0 +1,36 @@
package cmd
import (
"context"
"os"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
)
func inBaseAsset(balances types.BalanceMap, market types.Market, price float64) float64 {
quote := balances[market.QuoteCurrency]
base := balances[market.BaseCurrency]
return (base.Locked.Float64() + base.Available.Float64()) + ((quote.Locked.Float64() + quote.Available.Float64()) / price)
}
// configureDB configures the database service based on the environment variable
func configureDB(ctx context.Context, environ *bbgo.Environment) error {
if driver, ok := os.LookupEnv("DB_DRIVER"); ok {
if dsn, ok := os.LookupEnv("DB_DSN"); ok {
return environ.ConfigureDatabase(ctx, driver, dsn)
}
} else if dsn, ok := os.LookupEnv("SQLITE3_DSN"); ok {
return environ.ConfigureDatabase(ctx, "sqlite3", dsn)
} else if dsn, ok := os.LookupEnv("MYSQL_URL"); ok {
return environ.ConfigureDatabase(ctx, "mysql", dsn)
}
return nil
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTrades, downTrades)
}
func upTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `id` BIGINT UNSIGNED,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `symbol` VARCHAR(8) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee_currency` VARCHAR(4) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`id`)\n);")
if err != nil {
return err
}
return err
}
func downTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `trades`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,53 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradeIndex, downTradeIndex)
}
func upTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades(symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrders, downOrders)
}
func upOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` BIGINT UNSIGNED NOT NULL,\n `client_order_id` VARCHAR(42) NOT NULL DEFAULT '',\n `order_type` VARCHAR(16) NOT NULL,\n `symbol` VARCHAR(8) NOT NULL,\n `status` VARCHAR(12) NOT NULL,\n `time_in_force` VARCHAR(4) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `stop_price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_working` BOOL NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),\n PRIMARY KEY (`gid`)\n);")
if err != nil {
return err
}
return err
}
func downOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE `orders`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesAddOrderId, downTradesAddOrderId)
}
func upTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n ADD COLUMN `order_id` BIGINT UNSIGNED NOT NULL;")
if err != nil {
return err
}
return err
}
func downTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n DROP COLUMN `order_id`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,83 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesIndexFix, downTradesIndexFix)
}
func upTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrdersAddIndex, downOrdersAddIndex)
}
func upOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX orders_symbol ON orders (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);")
if err != nil {
return err
}
return err
}
func downOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX orders_symbol ON orders;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX orders_order_id ON orders;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,73 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upKlines, downKlines)
}
func upKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `klines`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `high` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `low` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `close` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `closed` BOOL NOT NULL DEFAULT TRUE,\n `last_trade_id` INT UNSIGNED NOT NULL DEFAULT 0,\n `num_trades` INT UNSIGNED NOT NULL DEFAULT 0,\n PRIMARY KEY (`gid`)\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_klines` LIKE `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_klines` LIKE `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_klines` LIKE `klines`;")
if err != nil {
return err
}
return err
}
func downKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX `klines_end_time_symbol_interval` ON `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `binance_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `okex_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `max_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `klines`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixSymbolLength, downFixSymbolLength)
}
func upFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE trades MODIFY COLUMN symbol VARCHAR(9);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE orders MODIFY COLUMN symbol VARCHAR(9);")
if err != nil {
return err
}
return err
}
func downFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE trades MODIFY COLUMN symbol VARCHAR(8);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE orders MODIFY COLUMN symbol VARCHAR(8);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixUniqueIndex, downFixUniqueIndex)
}
func upFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` DROP INDEX `id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD UNIQUE INDEX `id` (`exchange`,`symbol`, `side`, `id`);")
if err != nil {
return err
}
return err
}
func downFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` DROP INDEX `id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD UNIQUE INDEX `id` (`id`);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upAddMarginColumns, downAddMarginColumns)
}
func upAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE\n ;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders`\n ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE\n ;")
if err != nil {
return err
}
return err
}
func downAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n DROP COLUMN `is_margin`,\n DROP COLUMN `is_isolated`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders`\n DROP COLUMN `is_margin`,\n DROP COLUMN `is_isolated`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradePriceQuantityIndex, downTradePriceQuantityIndex)
}
func upTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);")
if err != nil {
return err
}
return err
}
func downTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_price_quantity ON trades")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTrades, downTrades)
}
func upTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `id` INTEGER,\n `exchange` TEXT NOT NULL DEFAULT '',\n `symbol` TEXT NOT NULL,\n `price` DECIMAL(16, 8) NOT NULL,\n `quantity` DECIMAL(16, 8) NOT NULL,\n `quote_quantity` DECIMAL(16, 8) NOT NULL,\n `fee` DECIMAL(16, 8) NOT NULL,\n `fee_currency` VARCHAR(4) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL\n);")
if err != nil {
return err
}
return err
}
func downTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `trades`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,53 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradeIndex, downTradeIndex)
}
func upTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades(symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrders, downOrders)
}
func upOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` INTEGER NOT NULL,\n `client_order_id` VARCHAR NOT NULL DEFAULT '',\n `order_type` VARCHAR NOT NULL,\n `symbol` VARCHAR NOT NULL,\n `status` VARCHAR NOT NULL,\n `time_in_force` VARCHAR NOT NULL,\n `price` DECIMAL(16, 8) NOT NULL,\n `stop_price` DECIMAL(16, 8) NOT NULL,\n `quantity` DECIMAL(16, 8) NOT NULL,\n `executed_quantity` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `side` VARCHAR NOT NULL DEFAULT '',\n `is_working` BOOLEAN NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP\n);")
if err != nil {
return err
}
return err
}
func downOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `orders`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesAddOrderId, downTradesAddOrderId)
}
func upTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `order_id` INTEGER;")
if err != nil {
return err
}
return err
}
func downTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `order_id` TO `order_id_deleted`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,83 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesIndexFix, downTradesIndexFix)
}
func upTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrdersAddIndex, downOrdersAddIndex)
}
func upOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX orders_symbol ON orders (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);")
if err != nil {
return err
}
return err
}
func downOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS orders_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS orders_order_id;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,73 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upKlines, downKlines)
}
func upKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
return err
}
func downKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `klines_end_time_symbol_interval`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `binance_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `okex_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `max_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `klines`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixSymbolLength, downFixSymbolLength)
}
func upFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}
func downFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixUniqueIndex, downFixUniqueIndex)
}
func upFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `trade_unique_id` ON `trades` (`exchange`,`symbol`, `side`, `id`);")
if err != nil {
return err
}
return err
}
func downFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `trade_unique_id`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,63 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upAddMarginColumns, downAddMarginColumns)
}
func upAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
return err
}
func downAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `is_margin` TO `is_margin_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` RENAME COLUMN `is_margin` TO `is_margin_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradePriceQuantityIndex, downTradePriceQuantityIndex)
}
func upTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);")
if err != nil {
return err
}
return err
}
func downTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_price_quantity;")
if err != nil {
return err
}
return err
}

View File

@ -421,8 +421,9 @@ func (s *Server) setupSaveConfig(c *gin.Context) {
return
}
if len(s.Environ.MysqlURL) > 0 {
envVars["MYSQL_URL"] = s.Environ.MysqlURL
if s.Environ.DatabaseService != nil {
envVars["DB_DRIVER"] = s.Environ.DatabaseService.Driver
envVars["DB_DSN"] = s.Environ.DatabaseService.DSN
}
dotenvFile := ".env.local"

View File

@ -2,6 +2,7 @@ package server
import (
"context"
"database/sql"
"net/http"
"os"
"syscall"
@ -29,7 +30,7 @@ func (s *Server) setupTestDB(c *gin.Context) {
return
}
db, err := bbgo.ConnectMySQL(dsn)
db, err := sql.Open("mysql", dsn)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
@ -58,7 +59,8 @@ func (s *Server) setupConfigureDB(c *gin.Context) {
return
}
if err := s.Environ.ConfigureDatabase(c, dsn); err != nil {
driver := "mysql"
if err := s.Environ.ConfigureDatabase(c, driver, dsn); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
@ -142,4 +144,3 @@ func (s *Server) setupRestart(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"success": true})
}

81
pkg/service/database.go Normal file
View File

@ -0,0 +1,81 @@
package service
import (
"context"
"github.com/c9s/rockhopper"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type DatabaseService struct {
Driver string
DSN string
DB *sqlx.DB
}
func NewDatabaseService(driver, dsn string) *DatabaseService {
if driver == "mysql" {
var err error
dsn, err = ReformatMysqlDSN(dsn)
if err != nil {
// incorrect mysql dsn is logical exception
panic(err)
}
}
return &DatabaseService{
Driver: driver,
DSN: dsn,
}
}
func (s *DatabaseService) Connect() error {
var err error
s.DB, err = sqlx.Connect(s.Driver, s.DSN)
return err
}
func (s *DatabaseService) Close() error {
return s.DB.Close()
}
func (s *DatabaseService) Upgrade(ctx context.Context) error {
dialect, err := rockhopper.LoadDialect(s.Driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.LoadByPackageSuffix(s.Driver)
if err != nil {
return err
}
// sqlx.DB is different from sql.DB
rh := rockhopper.New(s.Driver, dialect, s.DB.DB)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}
func ReformatMysqlDSN(dsn string) (string, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return "", err
}
// we need timestamp and datetime fields to be parsed into time.Time struct
config.ParseTime = true
dsn = config.FormatDSN()
return dsn, nil
}

View File

@ -244,7 +244,7 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(`
INSERT IGNORE INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated)
INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated)
VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at, :is_margin, :is_isolated)`,
trade)
return err

5
rockhopper_mysql.yaml Normal file
View File

@ -0,0 +1,5 @@
---
driver: mysql
dialect: mysql
dsn: "root@tcp(localhost:3306)/bbgo?parseTime=true"
migrationsDir: migrations/mysql

5
rockhopper_sqlite.yaml Normal file
View File

@ -0,0 +1,5 @@
---
driver: sqlite3
dialect: sqlite3
dsn: "bbgo.sqlite3"
migrationsDir: migrations/sqlite3

View File

@ -0,0 +1,2 @@
#!/bin/bash
rm -fv bbgo.sqlite3 && rockhopper --config rockhopper_sqlite.yaml up && rockhopper --config rockhopper_sqlite.yaml down --to 1