Merge branch 'main' into fix/pnl-amount

This commit is contained in:
Jui-Nan Lin 2021-02-06 17:22:43 +08:00
commit 7e1825d991
79 changed files with 1631 additions and 206 deletions

View File

@ -3,9 +3,21 @@ language: go
go:
- 1.14
- 1.15
services:
- redis-server
- mysql
before_install:
- mysql -e 'CREATE DATABASE bbgo;'
install:
- go get github.com/c9s/rockhopper/cmd/rockhopper
before_script:
- go mod download
- make migrations
script:
- bash scripts/test-sqlite3-migrations.sh
- go test -v ./pkg/...

View File

@ -28,9 +28,9 @@ dist: bin-dir bbgo-linux bbgo-darwin
tar -C $(BUILD_DIR) -cvzf $(DIST_DIR)/bbgo-$$(git describe --tags).tar.gz .
migrations:
rockhopper compile --config rockhopper.yaml --output pkg/migrations
git add -v pkg/migrations
git commit -m "Update migration package" pkg/migrations
rockhopper compile --config rockhopper_mysql.yaml --output pkg/migrations/mysql
rockhopper compile --config rockhopper_sqlite.yaml --output pkg/migrations/sqlite3
git add -v pkg/migrations && git commit -m "compile and update migration package" pkg/migrations || true
docker:
GOPATH=$(PWD)/_mod go mod download

View File

@ -282,6 +282,21 @@ You may register your exchange account with my referral ID to support this proje
- BTC address `3J6XQJNWT56amqz9Hz2BEVQ7W4aNmb5kiU`
- USDT ERC20 address `0x63E5805e027548A384c57E20141f6778591Bac6F`
## Community
You can join our telegram channel <https://t.me/bbgocrypto>, it's in Chinese, but English is fine as well.
## Contribution
BBGO has a token BBG for the ecosystem (contract address: <https://etherscan.io/address/0x3afe98235d680e8d7a52e1458a59d60f45f935c0>).
Each issue has its BBG label, by completing the issue with a pull request, you can get correspond amount of BBG.
If you have feature request, you can offer your BBG for contributors.
For further request, please contact us: <https://t.me/c123456789s>
## License
MIT License

4
go.mod
View File

@ -7,7 +7,7 @@ go 1.13
require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/adshao/go-binance/v2 v2.2.1-0.20210119141603-20ceb26d876b
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/gin-contrib/cors v1.3.1
@ -53,7 +53,7 @@ require (
github.com/x-cray/logrus-prefixed-formatter v0.5.2
github.com/zserge/lorca v0.1.9
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad // indirect
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 // indirect
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect
golang.org/x/text v0.3.5 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
gonum.org/v1/gonum v0.8.1

5
go.sum
View File

@ -37,6 +37,8 @@ github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8
github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f h1:n1Ly7178MJj+GQB38q4dV66QktUvzEi2rA7xCtTy6Ck=
github.com/c9s/rockhopper v1.2.1-0.20210115022144-cc77e66fc34f/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9 h1:umJ5T1aKfA6zmHTJffe5axqM9mtr/tscllnX2wnZzBA=
github.com/c9s/rockhopper v1.2.1-0.20210206025705-bbb1e34bd7a9/go.mod h1:KJnQjZSrWA83jjwGF/+O7Y96VCVirYTYEvXJJOc6kMU=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -240,6 +242,7 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/mattn/go-sqlite3 v1.14.6 h1:dNPt6NO46WmLVt2DLNpwczCmdV5boIZ6g/tlDrlRUbg=
github.com/mattn/go-sqlite3 v1.14.6/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU=
github.com/mattn/go-sqlite3 v2.0.3+incompatible h1:gXHsfypPkaMZrKbD5209QV9jbUTJKjyR5WD3HYQSd+U=
github.com/mattn/go-sqlite3 v2.0.3+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
@ -466,6 +469,8 @@ golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4 h1:myAQVi0cGEoqQVR5POX+8RR2mrocKqNN1hmeMqhX27k=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c h1:VwygUrnw9jn88c4u8GD3rZQbqrP/tgas88tPUbBxQrk=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221 h1:/ZHdbVpdR/jk3g30/d4yUL0JU9kksj8+F/bnQUVLGDM=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=

View File

@ -19,5 +19,6 @@ CREATE TABLE `trades`
PRIMARY KEY (`gid`),
UNIQUE KEY `id` (`id`)
);
-- +down
DROP TABLE `trades`;
DROP TABLE IF EXISTS `trades`;

View File

@ -0,0 +1,20 @@
-- +up
CREATE TABLE `trades`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`id` INTEGER,
`exchange` TEXT NOT NULL DEFAULT '',
`symbol` TEXT NOT NULL,
`price` DECIMAL(16, 8) NOT NULL,
`quantity` DECIMAL(16, 8) NOT NULL,
`quote_quantity` DECIMAL(16, 8) NOT NULL,
`fee` DECIMAL(16, 8) NOT NULL,
`fee_currency` VARCHAR(4) NOT NULL,
`is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,
`is_maker` BOOLEAN NOT NULL DEFAULT FALSE,
`side` VARCHAR(4) NOT NULL DEFAULT '',
`traded_at` DATETIME(3) NOT NULL
);
-- +down
DROP TABLE IF EXISTS `trades`;

View File

@ -0,0 +1,9 @@
-- +up
CREATE INDEX trades_symbol ON trades(symbol);
CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);
-- +down
DROP INDEX trades_symbol;
DROP INDEX trades_symbol_fee_currency;
DROP INDEX trades_traded_at_symbol;

View File

@ -0,0 +1,25 @@
-- +up
CREATE TABLE `orders`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR NOT NULL DEFAULT '',
-- order_id is the order id returned from the exchange
`order_id` INTEGER NOT NULL,
`client_order_id` VARCHAR NOT NULL DEFAULT '',
`order_type` VARCHAR NOT NULL,
`symbol` VARCHAR NOT NULL,
`status` VARCHAR NOT NULL,
`time_in_force` VARCHAR NOT NULL,
`price` DECIMAL(16, 8) NOT NULL,
`stop_price` DECIMAL(16, 8) NOT NULL,
`quantity` DECIMAL(16, 8) NOT NULL,
`executed_quantity` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`side` VARCHAR NOT NULL DEFAULT '',
`is_working` BOOLEAN NOT NULL DEFAULT FALSE,
`created_at` DATETIME(3) NOT NULL,
`updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP
);
-- +down
DROP TABLE IF EXISTS `orders`;

View File

@ -0,0 +1,5 @@
-- +up
ALTER TABLE `trades` ADD COLUMN `order_id` INTEGER;
-- +down
ALTER TABLE `trades` RENAME COLUMN `order_id` TO `order_id_deleted`;

View File

@ -0,0 +1,19 @@
-- +up
DROP INDEX IF EXISTS trades_symbol;
DROP INDEX IF EXISTS trades_symbol_fee_currency;
DROP INDEX IF EXISTS trades_traded_at_symbol;
CREATE INDEX trades_symbol ON trades (exchange, symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);
-- +down
DROP INDEX IF EXISTS trades_symbol;
DROP INDEX IF EXISTS trades_symbol_fee_currency;
DROP INDEX IF EXISTS trades_traded_at_symbol;
CREATE INDEX trades_symbol ON trades (symbol);
CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);
CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);

View File

@ -0,0 +1,7 @@
-- +up
CREATE INDEX orders_symbol ON orders (exchange, symbol);
CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);
-- +down
DROP INDEX IF EXISTS orders_symbol;
DROP INDEX IF EXISTS orders_order_id;

View File

@ -0,0 +1,44 @@
-- +up
-- +begin
CREATE TABLE `klines`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR(10) NOT NULL,
`start_time` DATETIME(3) NOT NULL,
`end_time` DATETIME(3) NOT NULL,
`interval` VARCHAR(3) NOT NULL,
`symbol` VARCHAR(7) NOT NULL,
`open` DECIMAL(16, 8) NOT NULL,
`high` DECIMAL(16, 8) NOT NULL,
`low` DECIMAL(16, 8) NOT NULL,
`close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,
`closed` BOOLEAN NOT NULL DEFAULT TRUE,
`last_trade_id` INT NOT NULL DEFAULT 0,
`num_trades` INT NOT NULL DEFAULT 0
);
-- +end
-- +begin
CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);
-- +end
-- +begin
CREATE TABLE `okex_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +begin
CREATE TABLE `binance_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +begin
CREATE TABLE `max_klines` AS SELECT * FROM `klines` WHERE 0
-- +end
-- +down
DROP INDEX IF EXISTS `klines_end_time_symbol_interval`;
DROP TABLE IF EXISTS `binance_klines`;
DROP TABLE IF EXISTS `okex_klines`;
DROP TABLE IF EXISTS `max_klines`;
DROP TABLE IF EXISTS `klines`;

View File

@ -0,0 +1,5 @@
-- +up
SELECT 1;
-- +down
SELECT 1;

View File

@ -0,0 +1,9 @@
-- +up
-- +begin
CREATE UNIQUE INDEX `trade_unique_id` ON `trades` (`exchange`,`symbol`, `side`, `id`);
-- +end
-- +down
-- +begin
DROP INDEX IF EXISTS `trade_unique_id`;
-- +end

View File

@ -0,0 +1,33 @@
-- +up
-- +begin
ALTER TABLE `trades` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `trades` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `orders` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +begin
ALTER TABLE `orders` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;
-- +end
-- +down
-- +begin
ALTER TABLE `trades` RENAME COLUMN `is_margin` TO `is_margin_deleted`;
-- +end
-- +begin
ALTER TABLE `trades` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;
-- +end
-- +begin
ALTER TABLE `orders` RENAME COLUMN `is_margin` TO `is_margin_deleted`;
-- +end
-- +begin
ALTER TABLE `orders` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;
-- +end

View File

@ -0,0 +1,10 @@
-- +up
-- +begin
CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);
-- +end
-- +down
-- +begin
DROP INDEX trades_price_quantity;
-- +end

View File

@ -2,6 +2,7 @@ package pnl
import (
"strings"
"time"
"github.com/c9s/bbgo/pkg/types"
)
@ -96,7 +97,7 @@ func (c *AverageCostCalculator) Calculate(symbol string, trades []types.Trade, c
Symbol: symbol,
CurrentPrice: currentPrice,
NumTrades: len(trades),
StartTime: trades[0].Time,
StartTime: time.Time(trades[0].Time),
BuyVolume: bidVolume,
SellVolume: askVolume,

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -235,7 +236,7 @@ func (m *SimplePriceMatching) newTradeFromOrder(order types.Order, isMaker bool)
Side: order.Side,
IsBuyer: order.Side == types.SideTypeBuy,
IsMaker: isMaker,
Time: m.CurrentTime,
Time: datatype.Time(m.CurrentTime),
Fee: fee,
FeeCurrency: feeCurrency,
}
@ -436,7 +437,7 @@ func (m *SimplePriceMatching) newOrder(o types.SubmitOrder, orderID uint64) type
Status: types.OrderStatusNew,
ExecutedQuantity: 0,
IsWorking: true,
CreationTime: m.CurrentTime,
UpdateTime: m.CurrentTime,
CreationTime: datatype.Time(m.CurrentTime),
UpdateTime: datatype.Time(m.CurrentTime),
}
}

View File

@ -1,50 +1,6 @@
package bbgo
import (
"context"
"database/sql"
// register the go migrations
_ "github.com/c9s/bbgo/pkg/migrations"
"github.com/c9s/rockhopper"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
func ConnectMySQL(dsn string) (*sqlx.DB, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return nil, err
}
config.ParseTime = true
dsn = config.FormatDSN()
return sqlx.Connect("mysql", dsn)
}
func upgradeDB(ctx context.Context, driver string, db *sql.DB) error {
dialect, err := rockhopper.LoadDialect(driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.Load()
if err != nil {
return err
}
rh := rockhopper.New(driver, dialect, db)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/codingconcepts/env"
"github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
@ -48,6 +47,7 @@ type Environment struct {
PersistenceServiceFacade *PersistenceServiceFacade
DatabaseService *service.DatabaseService
OrderService *service.OrderService
TradeService *service.TradeService
TradeSync *service.SyncService
@ -56,8 +56,6 @@ type Environment struct {
startTime time.Time
tradeScanTime time.Time
sessions map[string]*ExchangeSession
MysqlURL string
}
func NewEnvironment() *Environment {
@ -78,23 +76,19 @@ func (environ *Environment) Sessions() map[string]*ExchangeSession {
return environ.sessions
}
func (environ *Environment) ConfigureDatabase(ctx context.Context, dsn string) error {
db, err := ConnectMySQL(dsn)
func (environ *Environment) ConfigureDatabase(ctx context.Context, driver string, dsn string) error {
environ.DatabaseService = service.NewDatabaseService(driver, dsn)
err := environ.DatabaseService.Connect()
if err != nil {
return err
}
environ.MysqlURL = dsn
if err := upgradeDB(ctx, "mysql", db.DB); err != nil {
if err := environ.DatabaseService.Upgrade(ctx) ; err != nil {
return err
}
environ.SetDB(db)
return nil
}
func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
// get the db connection pool object to create other services
db := environ.DatabaseService.DB
environ.OrderService = &service.OrderService{DB: db}
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.SyncService{
@ -102,7 +96,7 @@ func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
OrderService: environ.OrderService,
}
return environ
return nil
}
// AddExchangeSession adds the existing exchange session or pre-created exchange session

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/backtest"
@ -96,10 +95,6 @@ var BacktestCmd = &cobra.Command{
return err
}
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}
if userConfig.Backtest == nil {
return errors.New("backtest config is not defined")
@ -116,14 +111,12 @@ var BacktestCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
backtestService := &service.BacktestService{DB: db}
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if wantSync {
log.Info("starting synchronization...")
@ -273,8 +266,8 @@ var BacktestCmd = &cobra.Command{
finalBalances.Print()
if wantBaseAssetBaseline {
initBaseAsset := InBaseAsset(initBalances, market, startPrice)
finalBaseAsset := InBaseAsset(finalBalances, market, lastPrice)
initBaseAsset := inBaseAsset(initBalances, market, startPrice)
finalBaseAsset := inBaseAsset(finalBalances, market, lastPrice)
log.Infof("INITIAL ASSET ~= %s %s (1 %s = %f)", market.FormatQuantity(initBaseAsset), market.BaseCurrency, market.BaseCurrency, startPrice)
log.Infof("FINAL ASSET ~= %s %s (1 %s = %f)", market.FormatQuantity(finalBaseAsset), market.BaseCurrency, market.BaseCurrency, lastPrice)
@ -288,8 +281,3 @@ var BacktestCmd = &cobra.Command{
},
}
func InBaseAsset(balances types.BalanceMap, market types.Market, price float64) float64 {
quote := balances[market.QuoteCurrency]
base := balances[market.BaseCurrency]
return (base.Locked.Float64() + base.Available.Float64()) + ((quote.Locked.Float64() + quote.Available.Float64()) / price)
}

View File

@ -7,7 +7,6 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
@ -63,13 +62,8 @@ var CancelCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
return err
}
environ.SetDB(db)
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {

View File

@ -8,7 +8,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting"
"github.com/c9s/bbgo/pkg/accounting/pnl"
@ -21,6 +20,7 @@ import (
func init() {
PnLCmd.Flags().String("exchange", "", "target exchange")
PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol")
PnLCmd.Flags().Int("limit", 500, "number of trades")
RootCmd.AddCommand(PnLCmd)
}
@ -46,27 +46,32 @@ var PnLCmd = &cobra.Command{
return err
}
limit, err := cmd.Flags().GetInt("limit")
if err != nil {
return err
}
exchange, err := cmdutil.NewExchange(exchangeName)
if err != nil {
return err
}
db, err := bbgo.ConnectMySQL(viper.GetString("mysql-url"))
if err != nil {
environ := bbgo.NewEnvironment()
if err := configureDB(ctx, environ) ; err != nil {
return err
}
tradeService := &service.TradeService{DB: db}
var trades []types.Trade
tradingFeeCurrency := exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
log.Infof("loading all trading fee currency related trades: %s", symbol)
trades, err = tradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency)
trades, err = environ.TradeService.QueryForTradingFeeCurrency(exchange.Name(), symbol, tradingFeeCurrency)
} else {
trades, err = tradeService.Query(service.QueryTradesOptions{
trades, err = environ.TradeService.Query(service.QueryTradesOptions{
Exchange: exchange.Name(),
Symbol: symbol,
Limit: limit,
})
}

View File

@ -96,10 +96,8 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer
}
func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userConfig *bbgo.Config) error {
if dsn, ok := os.LookupEnv("MYSQL_URL"); ok {
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {
@ -331,7 +329,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
enableApiServer, err := cmd.Flags().GetBool("enable-web-server")
enableWebServer, err := cmd.Flags().GetBool("enable-web-server")
if err != nil {
return err
}
@ -348,10 +346,7 @@ func run(cmd *cobra.Command, args []string) error {
var userConfig = &bbgo.Config{}
if setup {
log.Infof("running in setup mode, skip reading config file")
enableApiServer = true
} else {
if !setup {
// if it's not setup, then the config file option is required.
if len(configFile) == 0 {
return errors.New("--config option is required")
@ -377,7 +372,7 @@ func run(cmd *cobra.Command, args []string) error {
}
if setup {
return runSetup(ctx, userConfig, enableApiServer)
return runSetup(ctx, userConfig, true)
}
userConfig, err = bbgo.Load(configFile, true)
@ -385,7 +380,7 @@ func run(cmd *cobra.Command, args []string) error {
return err
}
return runConfig(ctx, userConfig, enableApiServer)
return runConfig(ctx, userConfig, enableWebServer)
}
return runWrapperBinary(ctx, userConfig, cmd, args)

View File

@ -9,7 +9,6 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/bbgo"
)
@ -52,12 +51,8 @@ var SyncCmd = &cobra.Command{
}
environ := bbgo.NewEnvironment()
if viper.IsSet("mysql-url") {
dsn := viper.GetString("mysql-url")
if err := environ.ConfigureDatabase(ctx, dsn); err != nil {
return err
}
if err := configureDB(ctx, environ) ; err != nil {
return err
}
if err := environ.AddExchangesFromConfig(userConfig); err != nil {

36
pkg/cmd/utils.go Normal file
View File

@ -0,0 +1,36 @@
package cmd
import (
"context"
"os"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
)
func inBaseAsset(balances types.BalanceMap, market types.Market, price float64) float64 {
quote := balances[market.QuoteCurrency]
base := balances[market.BaseCurrency]
return (base.Locked.Float64() + base.Available.Float64()) + ((quote.Locked.Float64() + quote.Available.Float64()) / price)
}
// configureDB configures the database service based on the environment variable
func configureDB(ctx context.Context, environ *bbgo.Environment) error {
if driver, ok := os.LookupEnv("DB_DRIVER"); ok {
if dsn, ok := os.LookupEnv("DB_DSN"); ok {
return environ.ConfigureDatabase(ctx, driver, dsn)
}
} else if dsn, ok := os.LookupEnv("SQLITE3_DSN"); ok {
return environ.ConfigureDatabase(ctx, "sqlite3", dsn)
} else if dsn, ok := os.LookupEnv("MYSQL_URL"); ok {
return environ.ConfigureDatabase(ctx, "mysql", dsn)
}
return nil
}

71
pkg/datatype/time.go Normal file
View File

@ -0,0 +1,71 @@
package datatype
import (
"database/sql/driver"
"fmt"
"time"
)
type Time time.Time
var layout = "2006-01-02 15:04:05.999Z07:00"
func (t *Time) UnmarshalJSON(data []byte) error {
return (*time.Time)(t).UnmarshalJSON(data)
}
func (t Time) MarshalJSON() ([]byte, error) {
return time.Time(t).MarshalJSON()
}
func (t Time) String() string {
return time.Time(t).String()
}
func (t Time) Time() time.Time {
return time.Time(t)
}
// driver.Valuer interface
// see http://jmoiron.net/blog/built-in-interfaces/
func (t Time) Value() (driver.Value, error) {
return time.Time(t), nil
}
func (t *Time) Scan(src interface{}) error {
switch d := src.(type) {
case *time.Time:
*t = Time(*d)
return nil
case time.Time:
*t = Time(d)
return nil
case string:
// 2020-12-16 05:17:12.994+08:00
tt, err := time.Parse(layout, d)
if err != nil {
return err
}
*t = Time(tt)
return nil
case []byte:
// 2019-10-20 23:01:43.77+08:00
tt, err := time.Parse(layout, string(d))
if err != nil {
return err
}
*t = Time(tt)
return nil
default:
}
return fmt.Errorf("datatype.Time scan error, type: %T is not supported, value; %+v", src, src)
}

View File

@ -8,6 +8,7 @@ import (
"github.com/adshao/go-binance/v2"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
@ -59,8 +60,8 @@ func ToGlobalOrder(binanceOrder *binance.Order, isMargin bool) (*types.Order, er
OrderID: uint64(binanceOrder.OrderID),
Status: toGlobalOrderStatus(binanceOrder.Status),
ExecutedQuantity: util.MustParseFloat(binanceOrder.ExecutedQuantity),
CreationTime: millisecondTime(binanceOrder.Time),
UpdateTime: millisecondTime(binanceOrder.UpdateTime),
CreationTime: datatype.Time(millisecondTime(binanceOrder.Time)),
UpdateTime: datatype.Time(millisecondTime(binanceOrder.UpdateTime)),
IsMargin: isMargin,
IsIsolated: binanceOrder.IsIsolated,
}, nil
@ -115,7 +116,7 @@ func ToGlobalTrade(t binance.TradeV3, isMargin bool) (*types.Trade, error) {
Fee: fee,
FeeCurrency: t.CommissionAsset,
QuoteQuantity: quoteQuantity,
Time: millisecondTime(t.Time),
Time: datatype.Time(millisecondTime(t.Time)),
IsMargin: isMargin,
IsIsolated: t.IsIsolated,
}, nil

View File

@ -9,6 +9,7 @@ import (
"github.com/adshao/go-binance/v2"
"github.com/valyala/fastjson"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -112,7 +113,7 @@ func (e *ExecutionReportEvent) Order() (*types.Order, error) {
OrderID: uint64(e.OrderID),
Status: toGlobalOrderStatus(binance.OrderStatusType(e.CurrentOrderStatus)),
ExecutedQuantity: util.MustParseFloat(e.CumulativeFilledQuantity),
CreationTime: orderCreationTime,
CreationTime: datatype.Time(orderCreationTime),
}, nil
}
@ -132,7 +133,7 @@ func (e *ExecutionReportEvent) Trade() (*types.Trade, error) {
QuoteQuantity: util.MustParseFloat(e.LastQuoteAssetTransactedQuantity),
IsBuyer: e.Side == "BUY",
IsMaker: e.IsMaker,
Time: tt,
Time: datatype.Time(tt),
Fee: util.MustParseFloat(e.CommissionAmount),
FeeCurrency: e.CommissionAsset,
}, nil

View File

@ -6,6 +6,7 @@ import (
"strings"
"time"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
@ -166,8 +167,8 @@ func toGlobalOrder(maxOrder max.Order) (*types.Order, error) {
OrderID: maxOrder.ID,
Status: toGlobalOrderStatus(maxOrder.State, executedVolume, remainingVolume),
ExecutedQuantity: executedVolume.Float64(),
CreationTime: maxOrder.CreatedAt,
UpdateTime: maxOrder.CreatedAt,
CreationTime: datatype.Time(maxOrder.CreatedAt),
UpdateTime: datatype.Time(maxOrder.CreatedAt),
}, nil
}
@ -211,7 +212,7 @@ func toGlobalTrade(t max.Trade) (*types.Trade, error) {
Fee: fee,
FeeCurrency: toGlobalCurrency(t.FeeCurrency),
QuoteQuantity: quoteQuantity,
Time: mts,
Time: datatype.Time(mts),
}, nil
}

View File

@ -20,6 +20,8 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/util"
)
const (
@ -46,37 +48,6 @@ var serverTimestamp = time.Now().Unix()
// reqCount is used for nonce, this variable counts the API request count.
var reqCount int64 = 0
// Response is wrapper for standard http.Response and provides
// more methods.
type Response struct {
*http.Response
// Body overrides the composited Body field.
Body []byte
}
// newResponse is a wrapper of the http.Response instance, it reads the response body and close the file.
func newResponse(r *http.Response) (response *Response, err error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
err = r.Body.Close()
response = &Response{Response: r, Body: body}
return response, err
}
// String converts response body to string.
// An empty string will be returned if error.
func (r *Response) String() string {
return string(r.Body)
}
func (r *Response) DecodeJSON(o interface{}) error {
return json.Unmarshal(r.Body, o)
}
type RestClient struct {
client *http.Client
@ -290,14 +261,14 @@ func (c *RestClient) Do(req *http.Request) (resp *http.Response, err error) {
}
// sendRequest sends the request to the API server and handle the response
func (c *RestClient) sendRequest(req *http.Request) (*Response, error) {
func (c *RestClient) sendRequest(req *http.Request) (*util.Response, error) {
resp, err := c.Do(req)
if err != nil {
return nil, err
}
// newResponse reads the response body and return a new Response object
response, err := newResponse(resp)
response, err := util.NewResponse(resp)
if err != nil {
return response, err
}
@ -314,7 +285,7 @@ func (c *RestClient) sendRequest(req *http.Request) (*Response, error) {
return response, nil
}
func (c *RestClient) sendAuthenticatedRequest(m string, refURL string, data map[string]interface{}) (*Response, error) {
func (c *RestClient) sendAuthenticatedRequest(m string, refURL string, data map[string]interface{}) (*util.Response, error) {
req, err := c.newAuthenticatedRequest(m, refURL, data)
if err != nil {
return nil, err
@ -374,7 +345,7 @@ type ErrorField struct {
}
type ErrorResponse struct {
*Response
*util.Response
Err ErrorField `json:"error"`
}
@ -389,13 +360,13 @@ func (r *ErrorResponse) Error() string {
}
// isError check the response status code so see if a response is an error.
func isError(response *Response) bool {
func isError(response *util.Response) bool {
var c = response.StatusCode
return c < 200 || c > 299
}
// toErrorResponse tries to convert/parse the server response to the standard Error interface object
func toErrorResponse(response *Response) (errorResponse *ErrorResponse, err error) {
func toErrorResponse(response *util.Response) (errorResponse *ErrorResponse, err error) {
errorResponse = &ErrorResponse{Response: response}
contentType := response.Header.Get("content-type")

View File

@ -7,6 +7,7 @@ import (
"github.com/gorilla/websocket"
"github.com/c9s/bbgo/pkg/datatype"
max "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
@ -198,7 +199,7 @@ func convertWebSocketTrade(t max.TradeUpdate) (*types.Trade, error) {
Fee: fee,
FeeCurrency: toGlobalCurrency(t.FeeCurrency),
QuoteQuantity: quoteQuantity,
Time: mts,
Time: datatype.Time(mts),
}, nil
}
@ -228,6 +229,6 @@ func toGlobalOrderUpdate(u max.OrderUpdate) (*types.Order, error) {
OrderID: u.ID,
Status: toGlobalOrderStatus(u.State, executedVolume, remainingVolume),
ExecutedQuantity: executedVolume.Float64(),
CreationTime: time.Unix(0, u.CreatedAtMs*int64(time.Millisecond)),
CreationTime: datatype.Time(time.Unix(0, u.CreatedAtMs*int64(time.Millisecond))),
}, nil
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTrades, downTrades)
}
func upTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `id` BIGINT UNSIGNED,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n `symbol` VARCHAR(8) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quote_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `fee_currency` VARCHAR(4) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`id`)\n);")
if err != nil {
return err
}
return err
}
func downTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `trades`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,53 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradeIndex, downTradeIndex)
}
func upTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades(symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrders, downOrders)
}
func upOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` BIGINT UNSIGNED NOT NULL,\n `client_order_id` VARCHAR(42) NOT NULL DEFAULT '',\n `order_type` VARCHAR(16) NOT NULL,\n `symbol` VARCHAR(8) NOT NULL,\n `status` VARCHAR(12) NOT NULL,\n `time_in_force` VARCHAR(4) NOT NULL,\n `price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `stop_price` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `executed_quantity` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `is_working` BOOL NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),\n PRIMARY KEY (`gid`)\n);")
if err != nil {
return err
}
return err
}
func downOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE `orders`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesAddOrderId, downTradesAddOrderId)
}
func upTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n ADD COLUMN `order_id` BIGINT UNSIGNED NOT NULL;")
if err != nil {
return err
}
return err
}
func downTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n DROP COLUMN `order_id`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,83 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesIndexFix, downTradesIndexFix)
}
func upTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol ON trades;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrdersAddIndex, downOrdersAddIndex)
}
func upOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX orders_symbol ON orders (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);")
if err != nil {
return err
}
return err
}
func downOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX orders_symbol ON orders;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX orders_order_id ON orders;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,73 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upKlines, downKlines)
}
func upKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `klines`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `high` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `low` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `close` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) UNSIGNED NOT NULL DEFAULT 0.0,\n `closed` BOOL NOT NULL DEFAULT TRUE,\n `last_trade_id` INT UNSIGNED NOT NULL DEFAULT 0,\n `num_trades` INT UNSIGNED NOT NULL DEFAULT 0,\n PRIMARY KEY (`gid`)\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_klines` LIKE `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_klines` LIKE `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_klines` LIKE `klines`;")
if err != nil {
return err
}
return err
}
func downKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX `klines_end_time_symbol_interval` ON `klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `binance_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `okex_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `max_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE `klines`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixSymbolLength, downFixSymbolLength)
}
func upFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE trades MODIFY COLUMN symbol VARCHAR(9);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE orders MODIFY COLUMN symbol VARCHAR(9);")
if err != nil {
return err
}
return err
}
func downFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE trades MODIFY COLUMN symbol VARCHAR(8);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE orders MODIFY COLUMN symbol VARCHAR(8);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixUniqueIndex, downFixUniqueIndex)
}
func upFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` DROP INDEX `id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD UNIQUE INDEX `id` (`exchange`,`symbol`, `side`, `id`);")
if err != nil {
return err
}
return err
}
func downFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` DROP INDEX `id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD UNIQUE INDEX `id` (`id`);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upAddMarginColumns, downAddMarginColumns)
}
func upAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE\n ;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders`\n ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE,\n ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE\n ;")
if err != nil {
return err
}
return err
}
func downAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades`\n DROP COLUMN `is_margin`,\n DROP COLUMN `is_isolated`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders`\n DROP COLUMN `is_margin`,\n DROP COLUMN `is_isolated`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradePriceQuantityIndex, downTradePriceQuantityIndex)
}
func upTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);")
if err != nil {
return err
}
return err
}
func downTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_price_quantity ON trades")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTrades, downTrades)
}
func upTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `trades`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `id` INTEGER,\n `exchange` TEXT NOT NULL DEFAULT '',\n `symbol` TEXT NOT NULL,\n `price` DECIMAL(16, 8) NOT NULL,\n `quantity` DECIMAL(16, 8) NOT NULL,\n `quote_quantity` DECIMAL(16, 8) NOT NULL,\n `fee` DECIMAL(16, 8) NOT NULL,\n `fee_currency` VARCHAR(4) NOT NULL,\n `is_buyer` BOOLEAN NOT NULL DEFAULT FALSE,\n `is_maker` BOOLEAN NOT NULL DEFAULT FALSE,\n `side` VARCHAR(4) NOT NULL DEFAULT '',\n `traded_at` DATETIME(3) NOT NULL\n);")
if err != nil {
return err
}
return err
}
func downTrades(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `trades`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,53 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradeIndex, downTradeIndex)
}
func upTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades(symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades(symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades(traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradeIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX trades_traded_at_symbol;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrders, downOrders)
}
func upOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `orders`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR NOT NULL DEFAULT '',\n -- order_id is the order id returned from the exchange\n `order_id` INTEGER NOT NULL,\n `client_order_id` VARCHAR NOT NULL DEFAULT '',\n `order_type` VARCHAR NOT NULL,\n `symbol` VARCHAR NOT NULL,\n `status` VARCHAR NOT NULL,\n `time_in_force` VARCHAR NOT NULL,\n `price` DECIMAL(16, 8) NOT NULL,\n `stop_price` DECIMAL(16, 8) NOT NULL,\n `quantity` DECIMAL(16, 8) NOT NULL,\n `executed_quantity` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `side` VARCHAR NOT NULL DEFAULT '',\n `is_working` BOOLEAN NOT NULL DEFAULT FALSE,\n `created_at` DATETIME(3) NOT NULL,\n `updated_at` DATETIME(3) NOT NULL DEFAULT CURRENT_TIMESTAMP\n);")
if err != nil {
return err
}
return err
}
func downOrders(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `orders`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesAddOrderId, downTradesAddOrderId)
}
func upTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `order_id` INTEGER;")
if err != nil {
return err
}
return err
}
func downTradesAddOrderId(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `order_id` TO `order_id_deleted`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,83 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradesIndexFix, downTradesIndexFix)
}
func upTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (exchange, symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (exchange, traded_at, symbol);")
if err != nil {
return err
}
return err
}
func downTradesIndexFix(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_symbol_fee_currency;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS trades_traded_at_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol ON trades (symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_symbol_fee_currency ON trades (symbol, fee_currency, traded_at);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_traded_at_symbol ON trades (traded_at, symbol);")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,43 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upOrdersAddIndex, downOrdersAddIndex)
}
func upOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX orders_symbol ON orders (exchange, symbol);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX orders_order_id ON orders (order_id, exchange);")
if err != nil {
return err
}
return err
}
func downOrdersAddIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS orders_symbol;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS orders_order_id;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,73 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upKlines, downKlines)
}
func upKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `klines`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(10) NOT NULL,\n `start_time` DATETIME(3) NOT NULL,\n `end_time` DATETIME(3) NOT NULL,\n `interval` VARCHAR(3) NOT NULL,\n `symbol` VARCHAR(7) NOT NULL,\n `open` DECIMAL(16, 8) NOT NULL,\n `high` DECIMAL(16, 8) NOT NULL,\n `low` DECIMAL(16, 8) NOT NULL,\n `close` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `volume` DECIMAL(16, 8) NOT NULL DEFAULT 0.0,\n `closed` BOOLEAN NOT NULL DEFAULT TRUE,\n `last_trade_id` INT NOT NULL DEFAULT 0,\n `num_trades` INT NOT NULL DEFAULT 0\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE INDEX `klines_end_time_symbol_interval` ON klines (`end_time`, `symbol`, `interval`);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `okex_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `binance_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE TABLE `max_klines` AS SELECT * FROM `klines` WHERE 0")
if err != nil {
return err
}
return err
}
func downKlines(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `klines_end_time_symbol_interval`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `binance_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `okex_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `max_klines`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `klines`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixSymbolLength, downFixSymbolLength)
}
func upFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}
func downFixSymbolLength(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "SELECT 1;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upFixUniqueIndex, downFixUniqueIndex)
}
func upFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `trade_unique_id` ON `trades` (`exchange`,`symbol`, `side`, `id`);")
if err != nil {
return err
}
return err
}
func downFixUniqueIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `trade_unique_id`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,63 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upAddMarginColumns, downAddMarginColumns)
}
func upAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` ADD COLUMN `is_margin` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` ADD COLUMN `is_isolated` BOOLEAN NOT NULL DEFAULT FALSE;")
if err != nil {
return err
}
return err
}
func downAddMarginColumns(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `is_margin` TO `is_margin_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `trades` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` RENAME COLUMN `is_margin` TO `is_margin_deleted`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "ALTER TABLE `orders` RENAME COLUMN `is_isolated` TO `is_isolated_deleted`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,33 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
rockhopper.AddMigration(upTradePriceQuantityIndex, downTradePriceQuantityIndex)
}
func upTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE INDEX trades_price_quantity ON trades (order_id,price,quantity);")
if err != nil {
return err
}
return err
}
func downTradePriceQuantityIndex(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX trades_price_quantity;")
if err != nil {
return err
}
return err
}

View File

@ -421,8 +421,9 @@ func (s *Server) setupSaveConfig(c *gin.Context) {
return
}
if len(s.Environ.MysqlURL) > 0 {
envVars["MYSQL_URL"] = s.Environ.MysqlURL
if s.Environ.DatabaseService != nil {
envVars["DB_DRIVER"] = s.Environ.DatabaseService.Driver
envVars["DB_DSN"] = s.Environ.DatabaseService.DSN
}
dotenvFile := ".env.local"

View File

@ -2,6 +2,7 @@ package server
import (
"context"
"database/sql"
"net/http"
"os"
"syscall"
@ -29,7 +30,7 @@ func (s *Server) setupTestDB(c *gin.Context) {
return
}
db, err := bbgo.ConnectMySQL(dsn)
db, err := sql.Open("mysql", dsn)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
@ -44,7 +45,8 @@ func (s *Server) setupTestDB(c *gin.Context) {
func (s *Server) setupConfigureDB(c *gin.Context) {
payload := struct {
DSN string `json:"dsn"`
Driver string `json:"driver"`
DSN string `json:"dsn"`
}{}
if err := c.BindJSON(&payload); err != nil {
@ -52,18 +54,25 @@ func (s *Server) setupConfigureDB(c *gin.Context) {
return
}
dsn := payload.DSN
if len(dsn) == 0 {
if len(payload.DSN) == 0 {
c.JSON(http.StatusBadRequest, gin.H{"error": "missing dsn argument"})
return
}
if err := s.Environ.ConfigureDatabase(c, dsn); err != nil {
if payload.Driver == "" {
payload.Driver = "mysql"
}
if err := s.Environ.ConfigureDatabase(c, payload.Driver, payload.DSN); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{"success": true})
c.JSON(http.StatusOK, gin.H{
"success": true,
"driver": payload.Driver,
"dsn": payload.DSN,
})
}
func (s *Server) setupAddStrategy(c *gin.Context) {
@ -142,4 +151,3 @@ func (s *Server) setupRestart(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"success": true})
}

81
pkg/service/database.go Normal file
View File

@ -0,0 +1,81 @@
package service
import (
"context"
"github.com/c9s/rockhopper"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)
type DatabaseService struct {
Driver string
DSN string
DB *sqlx.DB
}
func NewDatabaseService(driver, dsn string) *DatabaseService {
if driver == "mysql" {
var err error
dsn, err = ReformatMysqlDSN(dsn)
if err != nil {
// incorrect mysql dsn is logical exception
panic(err)
}
}
return &DatabaseService{
Driver: driver,
DSN: dsn,
}
}
func (s *DatabaseService) Connect() error {
var err error
s.DB, err = sqlx.Connect(s.Driver, s.DSN)
return err
}
func (s *DatabaseService) Close() error {
return s.DB.Close()
}
func (s *DatabaseService) Upgrade(ctx context.Context) error {
dialect, err := rockhopper.LoadDialect(s.Driver)
if err != nil {
return err
}
loader := &rockhopper.GoMigrationLoader{}
migrations, err := loader.LoadByPackageSuffix(s.Driver)
if err != nil {
return err
}
// sqlx.DB is different from sql.DB
rh := rockhopper.New(s.Driver, dialect, s.DB.DB)
currentVersion, err := rh.CurrentVersion()
if err != nil {
return err
}
if err := rockhopper.Up(ctx, rh, migrations, currentVersion, 0); err != nil {
return err
}
return nil
}
func ReformatMysqlDSN(dsn string) (string, error) {
config, err := mysql.ParseDSN(dsn)
if err != nil {
return "", err
}
// we need timestamp and datetime fields to be parsed into time.Time struct
config.ParseTime = true
dsn = config.FormatDSN()
return dsn, nil
}

View File

@ -34,7 +34,7 @@ func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, s
var lastID uint64 = 0
if lastOrder != nil {
lastID = lastOrder.OrderID
startTime = lastOrder.CreationTime
startTime = lastOrder.CreationTime.Time()
logrus.Infof("found last order, start from lastID = %d since %s", lastID, startTime)
}
@ -84,7 +84,7 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
var lastID int64 = 0
if lastTrade != nil {
lastID = lastTrade.ID
startTime = lastTrade.Time
startTime = time.Time(lastTrade.Time)
logrus.Infof("found last trade, start from lastID = %d since %s", lastID, startTime)
}

View File

@ -163,13 +163,13 @@ func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol
return s.scanRows(rows)
}
// Only return 500 items.
type QueryTradesOptions struct {
Exchange types.ExchangeName
Symbol string
LastGID int64
// ASC or DESC
Ordering string
Limit int
}
func (s *TradeService) Query(options QueryTradesOptions) ([]types.Trade, error) {
@ -225,7 +225,7 @@ func queryTradesSQL(options QueryTradesOptions) string {
sql += ` ORDER BY gid ` + ordering
sql += ` LIMIT ` + strconv.Itoa(500)
sql += ` LIMIT ` + strconv.Itoa(options.Limit)
return sql
}
@ -244,7 +244,7 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(`
INSERT IGNORE INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated)
INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated)
VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at, :is_margin, :is_isolated)`,
trade)
return err

View File

@ -27,23 +27,23 @@ func Test_queryTradingVolumeSQL(t *testing.T) {
func Test_queryTradesSQL(t *testing.T) {
t.Run("generate order by clause by Ordering option", func(t *testing.T) {
assert.Equal(t, "SELECT * FROM trades ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{}))
assert.Equal(t, "SELECT * FROM trades ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Ordering: "ASC"}))
assert.Equal(t, "SELECT * FROM trades ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{Ordering: "DESC"}))
assert.Equal(t, "SELECT * FROM trades ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Limit: 500}))
assert.Equal(t, "SELECT * FROM trades ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Ordering: "ASC", Limit: 500}))
assert.Equal(t, "SELECT * FROM trades ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{Ordering: "DESC", Limit: 500}))
})
t.Run("filter by exchange name", func(t *testing.T) {
assert.Equal(t, "SELECT * FROM trades WHERE exchange = :exchange ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Exchange: "max"}))
assert.Equal(t, "SELECT * FROM trades WHERE exchange = :exchange ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Exchange: "max", Limit: 500}))
})
t.Run("filter by symbol", func(t *testing.T) {
assert.Equal(t, "SELECT * FROM trades WHERE symbol = :symbol ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Symbol: "eth"}))
assert.Equal(t, "SELECT * FROM trades WHERE symbol = :symbol ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{Symbol: "eth", Limit: 500}))
})
t.Run("GID ordering", func(t *testing.T) {
assert.Equal(t, "SELECT * FROM trades WHERE gid > :gid ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1}))
assert.Equal(t, "SELECT * FROM trades WHERE gid > :gid ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1, Ordering: "ASC"}))
assert.Equal(t, "SELECT * FROM trades WHERE gid < :gid ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1, Ordering: "DESC"}))
assert.Equal(t, "SELECT * FROM trades WHERE gid > :gid ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1, Limit: 500}))
assert.Equal(t, "SELECT * FROM trades WHERE gid > :gid ORDER BY gid ASC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1, Ordering: "ASC", Limit: 500}))
assert.Equal(t, "SELECT * FROM trades WHERE gid < :gid ORDER BY gid DESC LIMIT 500", queryTradesSQL(QueryTradesOptions{LastGID: 1, Ordering: "DESC", Limit: 500}))
})
t.Run("convert all options", func(t *testing.T) {
@ -52,6 +52,7 @@ func Test_queryTradesSQL(t *testing.T) {
Symbol: "btc",
LastGID: 123,
Ordering: "DESC",
Limit: 500,
}))
})
}

View File

@ -51,7 +51,7 @@ func (e ExchangeBatchProcessor) BatchQueryClosedOrders(ctx context.Context, symb
}
c <- o
startTime = o.CreationTime
startTime = o.CreationTime.Time()
lastOrderID = o.OrderID
orderIDs[o.OrderID] = struct{}{}
}
@ -155,7 +155,7 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
logrus.Infof("returned %d trades", len(trades))
startTime = trades[len(trades)-1].Time
startTime = time.Time(trades[len(trades)-1].Time)
for _, t := range trades {
// ignore the first trade if last TradeID is given
if t.ID == lastTradeID {

View File

@ -2,10 +2,10 @@ package types
import (
"fmt"
"time"
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/util"
)
@ -113,14 +113,14 @@ func (o *SubmitOrder) SlackAttachment() slack.Attachment {
type Order struct {
SubmitOrder
Exchange string `json:"exchange" db:"exchange"`
GID uint64 `json:"gid" db:"gid"`
OrderID uint64 `json:"orderID" db:"order_id"` // order id
Status OrderStatus `json:"status" db:"status"`
ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"`
IsWorking bool `json:"isWorking" db:"is_working"`
CreationTime time.Time `json:"creationTime" db:"created_at"`
UpdateTime time.Time `json:"updateTime" db:"updated_at"`
Exchange string `json:"exchange" db:"exchange"`
GID uint64 `json:"gid" db:"gid"`
OrderID uint64 `json:"orderID" db:"order_id"` // order id
Status OrderStatus `json:"status" db:"status"`
ExecutedQuantity float64 `json:"executedQuantity" db:"executed_quantity"`
IsWorking bool `json:"isWorking" db:"is_working"`
CreationTime datatype.Time `json:"creationTime" db:"created_at"`
UpdateTime datatype.Time `json:"updateTime" db:"updated_at"`
IsMargin bool `json:"isMargin" db:"is_margin"`
IsIsolated bool `json:"isIsolated" db:"is_isolated"`

View File

@ -3,10 +3,10 @@ package types
import (
"fmt"
"sync"
"time"
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/util"
)
@ -49,12 +49,12 @@ type Trade struct {
QuoteQuantity float64 `json:"quoteQuantity" db:"quote_quantity"`
Symbol string `json:"symbol" db:"symbol"`
Side SideType `json:"side" db:"side"`
IsBuyer bool `json:"isBuyer" db:"is_buyer"`
IsMaker bool `json:"isMaker" db:"is_maker"`
Time time.Time `json:"tradedAt" db:"traded_at"`
Fee float64 `json:"fee" db:"fee"`
FeeCurrency string `json:"feeCurrency" db:"fee_currency"`
Side SideType `json:"side" db:"side"`
IsBuyer bool `json:"isBuyer" db:"is_buyer"`
IsMaker bool `json:"isMaker" db:"is_maker"`
Time datatype.Time `json:"tradedAt" db:"traded_at"`
Fee float64 `json:"fee" db:"fee"`
FeeCurrency string `json:"feeCurrency" db:"fee_currency"`
IsMargin bool `json:"isMargin" db:"is_margin"`
IsIsolated bool `json:"isIsolated" db:"is_isolated"`

38
pkg/util/http_response.go Normal file
View File

@ -0,0 +1,38 @@
package util
import (
"encoding/json"
"io/ioutil"
"net/http"
)
// Response is wrapper for standard http.Response and provides
// more methods.
type Response struct {
*http.Response
// Body overrides the composited Body field.
Body []byte
}
// newResponse is a wrapper of the http.Response instance, it reads the response body and close the file.
func NewResponse(r *http.Response) (response *Response, err error) {
body, err := ioutil.ReadAll(r.Body)
if err != nil {
return nil, err
}
err = r.Body.Close()
response = &Response{Response: r, Body: body}
return response, err
}
// String converts response body to string.
// An empty string will be returned if error.
func (r *Response) String() string {
return string(r.Body)
}
func (r *Response) DecodeJSON(o interface{}) error {
return json.Unmarshal(r.Body, o)
}

View File

@ -0,0 +1,28 @@
package util
import (
"bytes"
"io/ioutil"
"net/http"
"testing"
"github.com/stretchr/testify/assert"
)
func TestResponse_DecodeJSON(t *testing.T) {
type temp struct {
Name string `json:"name"`
}
json := `{"name":"Test Name","a":"a"}`
reader := ioutil.NopCloser(bytes.NewReader([]byte(json)))
resp, err := NewResponse(&http.Response{
StatusCode: 200,
Body: reader,
})
assert.NoError(t, err)
assert.Equal(t, json, resp.String())
var result temp
assert.NoError(t, resp.DecodeJSON(&result))
assert.Equal(t, "Test Name", result.Name)
}

5
rockhopper_mysql.yaml Normal file
View File

@ -0,0 +1,5 @@
---
driver: mysql
dialect: mysql
dsn: "root@tcp(localhost:3306)/bbgo?parseTime=true"
migrationsDir: migrations/mysql

5
rockhopper_sqlite.yaml Normal file
View File

@ -0,0 +1,5 @@
---
driver: sqlite3
dialect: sqlite3
dsn: "bbgo.sqlite3"
migrationsDir: migrations/sqlite3

View File

@ -0,0 +1,2 @@
#!/bin/bash
rm -fv bbgo.sqlite3 && rockhopper --config rockhopper_sqlite.yaml up && rockhopper --config rockhopper_sqlite.yaml down --to 1