diff --git a/.travis.yml b/.travis.yml index 25604dbb5..0653603a7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ services: before_install: - mysql -e 'CREATE DATABASE bbgo;' +- mysql -e 'CREATE DATABASE bbgo_dev;' install: - go get github.com/c9s/rockhopper/cmd/rockhopper diff --git a/README.md b/README.md index debb3b2df..c61911f0b 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ for lorca make embed && go run -tags web ./cmd/bbgo-lorca ``` + ## Support ### By contributing pull requests diff --git a/migrations/mysql/20210301140656_add_withdraws_table.sql b/migrations/mysql/20210301140656_add_withdraws_table.sql new file mode 100644 index 000000000..73b1efea3 --- /dev/null +++ b/migrations/mysql/20210301140656_add_withdraws_table.sql @@ -0,0 +1,28 @@ +-- +up +-- +begin +CREATE TABLE `withdraws` +( + `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `exchange` VARCHAR(24) NOT NULL DEFAULT '', + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(128) NOT NULL, + `network` VARCHAR(32) NOT NULL DEFAULT '', + + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, + `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, + `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', + `time` DATETIME(3) NOT NULL, + + PRIMARY KEY (`gid`), + UNIQUE KEY `txn_id` (`exchange`, `txn_id`) +); +-- +end + +-- +down +-- +begin +DROP TABLE IF EXISTS `withdraws`; +-- +end diff --git a/migrations/mysql/20210307201830_add_deposits_table.sql b/migrations/mysql/20210307201830_add_deposits_table.sql new file mode 100644 index 000000000..dba549e52 --- /dev/null +++ b/migrations/mysql/20210307201830_add_deposits_table.sql @@ -0,0 +1,26 @@ +-- +up +-- +begin +CREATE TABLE `deposits` +( + `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `exchange` VARCHAR(24) NOT NULL, + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(128) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, + `time` DATETIME(3) NOT NULL, + + PRIMARY KEY (`gid`), + UNIQUE KEY `txn_id` (`exchange`, `txn_id`) +); +-- +end + + +-- +down + +-- +begin +DROP TABLE IF EXISTS `deposits`; +-- +end diff --git a/migrations/sqlite3/20210301140656_add_withdraws_table.sql b/migrations/sqlite3/20210301140656_add_withdraws_table.sql new file mode 100644 index 000000000..d1b2cb282 --- /dev/null +++ b/migrations/sqlite3/20210301140656_add_withdraws_table.sql @@ -0,0 +1,36 @@ +-- +up +-- +begin +CREATE TABLE `withdraws` +( + `gid` INTEGER PRIMARY KEY AUTOINCREMENT, + `exchange` VARCHAR(24) NOT NULL DEFAULT '', + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(128) NOT NULL, + `network` VARCHAR(32) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + + `txn_id` VARCHAR(256) NOT NULL, + `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, + `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', + `time` DATETIME(3) NOT NULL +); +-- +end + +-- +begin +CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`); +-- +end + + +-- +down + +-- +begin +DROP INDEX IF EXISTS `withdraws_txn_id`; +-- +end + +-- +begin +DROP TABLE IF EXISTS `withdraws`; +-- +end + diff --git a/migrations/sqlite3/20210307201830_add_deposits_table.sql b/migrations/sqlite3/20210307201830_add_deposits_table.sql new file mode 100644 index 000000000..2c8e0baf8 --- /dev/null +++ b/migrations/sqlite3/20210307201830_add_deposits_table.sql @@ -0,0 +1,31 @@ +-- +up +-- +begin +CREATE TABLE `deposits` +( + `gid` INTEGER PRIMARY KEY AUTOINCREMENT, + `exchange` VARCHAR(24) NOT NULL, + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(128) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, + `time` DATETIME(3) NOT NULL +); +-- +end +-- +begin +CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`); +-- +end + + +-- +down + +-- +begin +DROP INDEX IF EXISTS `deposits_txn_id`; +-- +end + +-- +begin +DROP TABLE IF EXISTS `deposits`; +-- +end + diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index c651d065c..63e9f2fb9 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -161,9 +161,11 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver environ.RewardService = &service.RewardService{DB: db} environ.SyncService = &service.SyncService{ - TradeService: environ.TradeService, - OrderService: environ.OrderService, - RewardService: environ.RewardService, + TradeService: environ.TradeService, + OrderService: environ.OrderService, + RewardService: environ.RewardService, + WithdrawService: &service.WithdrawService{DB: db}, + DepositService: &service.DepositService{DB: db}, } return nil diff --git a/pkg/cmd/pnl.go b/pkg/cmd/pnl.go index b42cfb4b4..37f7b2071 100644 --- a/pkg/cmd/pnl.go +++ b/pkg/cmd/pnl.go @@ -2,7 +2,10 @@ package cmd import ( "context" + "fmt" + "os" "strings" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -11,14 +14,14 @@ import ( "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) func init() { - PnLCmd.Flags().String("exchange", "", "target exchange") - PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") + PnLCmd.Flags().String("session", "", "target exchange") + PnLCmd.Flags().String("symbol", "", "trading symbol") + PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades") PnLCmd.Flags().Int("limit", 500, "number of trades") RootCmd.AddCommand(PnLCmd) } @@ -30,12 +33,26 @@ var PnLCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - exchangeNameStr, err := cmd.Flags().GetString("exchange") + configFile, err := cmd.Flags().GetString("config") if err != nil { return err } - exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if len(configFile) == 0 { + return errors.New("--config option is required") + } + + if _, err := os.Stat(configFile); os.IsNotExist(err) { + return err + } + + userConfig, err := bbgo.Load(configFile, false) + if err != nil { + return err + } + + + sessionName, err := cmd.Flags().GetString("session") if err != nil { return err } @@ -45,23 +62,73 @@ var PnLCmd = &cobra.Command{ return err } + if len(symbol) == 0 { + return errors.New("--symbol [SYMBOL] is required") + } + limit, err := cmd.Flags().GetInt("limit") if err != nil { return err } - exchange, err := cmdutil.NewExchange(exchangeName) + environ := bbgo.NewEnvironment() + if err := environ.ConfigureDatabase(ctx); err != nil { + return err + } + + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + if err := environ.Sync(ctx) ; err != nil { + return err + } + + exchange := session.Exchange + + market, ok := session.Market(symbol) + if !ok { + return fmt.Errorf("market config %s not found", symbol) + } + + since := time.Now().AddDate(-1, 0, 0) + until := time.Now() + + includeTransfer, err := cmd.Flags().GetBool("include-transfer") if err != nil { return err } + if includeTransfer { + transferService, ok := exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("session exchange %s does not implement transfer service", sessionName) + } - environ := bbgo.NewEnvironment() - if err := environ.ConfigureDatabase(ctx) ; err != nil { - return err + deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = deposits + + withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = withdrawals + + // we need the backtest klines for the daily prices + backtestService := &service.BacktestService{DB: environ.DatabaseService.DB} + if err := backtestService.SyncKLineByInterval(ctx, exchange, symbol, types.Interval1d, since, until); err != nil { + return err + } } - var trades []types.Trade tradingFeeCurrency := exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { @@ -71,7 +138,7 @@ var PnLCmd = &cobra.Command{ trades, err = environ.TradeService.Query(service.QueryTradesOptions{ Exchange: exchange.Name(), Symbol: symbol, - Limit: limit, + Limit: limit, }) } diff --git a/pkg/cmd/transfer.go b/pkg/cmd/transfer.go index 8f3bf5657..d8d39770b 100644 --- a/pkg/cmd/transfer.go +++ b/pkg/cmd/transfer.go @@ -101,7 +101,11 @@ var TransferHistoryCmd = &cobra.Command{ var records timeSlice - exchange := session.Exchange + exchange, ok := session.Exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("exchange session %s does not implement transfer service", sessionName) + } + deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until) if err != nil { return err diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 14bb48dde..e18c83347 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -48,7 +48,6 @@ func (e ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTi for _, o := range orders { if _, ok := orderIDs[o.OrderID]; ok { - logrus.Infof("skipping duplicated order id: %d", o.OrderID) continue } @@ -224,7 +223,6 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti for _, o := range rewards { if _, ok := rewardKeys[o.UUID]; ok { - logrus.Debugf("skipping duplicated order id: %s", o.UUID) continue } diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 7d483e0aa..43f16f44f 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/datatype" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -193,9 +194,27 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st return toGlobalIsolatedMarginAccount(account), nil } -func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { +func (e *Exchange) getLaunchDate() (time.Time, error) { + // binance launch date 12:00 July 14th, 2017 + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + return time.Time{}, err + } + return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil +} + +func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { startTime := since + + var emptyTime = time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + txIDs := map[string]struct{}{} for startTime.Before(until) { @@ -247,7 +266,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since txIDs[d.TxID] = struct{}{} allWithdraws = append(allWithdraws, types.Withdraw{ - ApplyTime: time.Unix(0, d.ApplyTime*int64(time.Millisecond)), + Exchange: types.ExchangeBinance, + ApplyTime: datatype.Time(time.Unix(0, d.ApplyTime*int64(time.Millisecond))), Asset: d.Asset, Amount: d.Amount, Address: d.Address, @@ -311,7 +331,8 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, txIDs[d.TxID] = struct{}{} allDeposits = append(allDeposits, types.Deposit{ - Time: time.Unix(0, d.InsertTime*int64(time.Millisecond)), + Exchange: types.ExchangeBinance, + Time: datatype.Time(time.Unix(0, d.InsertTime*int64(time.Millisecond))), Asset: d.Asset, Amount: d.Amount, Address: d.Address, diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index f5eff8158..6a8082c35 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/datatype" maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -376,6 +377,16 @@ func (e *Exchange) PlatformFeeCurrency() string { return toGlobalCurrency("max") } +func (e *Exchange) getLaunchDate() (time.Time, error) { + // MAX launch date June 21th, 2018 + loc, err := time.LoadLocation("Asia/Taipei") + if err != nil { + return time.Time{}, err + } + + return time.Date(2018, time.June, 21, 0, 0, 0, 0, loc), nil +} + func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { if err := accountQueryLimiter.Wait(ctx); err != nil { return nil, err @@ -406,10 +417,19 @@ func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { startTime := since + limit := 1000 txIDs := map[string]struct{}{} + emptyTime := time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + for startTime.Before(until) { - // startTime ~ endTime must be in 90 days + // startTime ~ endTime must be in 60 days endTime := startTime.AddDate(0, 0, 60) if endTime.After(until) { endTime = until @@ -424,13 +444,20 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since withdraws, err := req. From(startTime.Unix()). To(endTime.Unix()). + Limit(limit). Do(ctx) if err != nil { return allWithdraws, err } - for _, d := range withdraws { + if len(withdraws) == 0 { + startTime = endTime + continue + } + + for i := len(withdraws) - 1; i >= 0; i-- { + d := withdraws[i] if _, ok := txIDs[d.TxID]; ok { continue } @@ -455,21 +482,30 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since } txIDs[d.TxID] = struct{}{} - allWithdraws = append(allWithdraws, types.Withdraw{ - ApplyTime: time.Unix(d.CreatedAt, 0), - Asset: toGlobalCurrency(d.Currency), - Amount: util.MustParseFloat(d.Amount), - Address: "", - AddressTag: "", - TransactionID: d.TxID, - TransactionFee: util.MustParseFloat(d.Fee), + withdraw := types.Withdraw{ + Exchange: types.ExchangeMax, + ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)), + Asset: toGlobalCurrency(d.Currency), + Amount: util.MustParseFloat(d.Amount), + Address: "", + AddressTag: "", + TransactionID: d.TxID, + TransactionFee: util.MustParseFloat(d.Fee), + TransactionFeeCurrency: d.FeeCurrency, // WithdrawOrderID: d.WithdrawOrderID, // Network: d.Network, Status: status, - }) + } + allWithdraws = append(allWithdraws, withdraw) } - startTime = endTime + // go next time frame + if len(withdraws) < limit { + startTime = endTime + } else { + // its in descending order, so we get the first record + startTime = time.Unix(withdraws[0].CreatedAt, 0) + } } return allWithdraws, nil @@ -477,7 +513,17 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) { startTime := since + limit := 1000 txIDs := map[string]struct{}{} + + emptyTime := time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + for startTime.Before(until) { // startTime ~ endTime must be in 90 days endTime := startTime.AddDate(0, 0, 60) @@ -486,6 +532,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } log.Infof("querying deposit history %s: %s <=> %s", asset, startTime, endTime) + req := e.client.AccountService.NewGetDepositHistoryRequest() if len(asset) > 0 { req.Currency(toLocalCurrency(asset)) @@ -493,19 +540,23 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, deposits, err := req. From(startTime.Unix()). - To(endTime.Unix()).Do(ctx) + To(endTime.Unix()). + Limit(limit). + Do(ctx) if err != nil { return nil, err } - for _, d := range deposits { + for i := len(deposits) - 1; i >= 0; i-- { + d := deposits[i] if _, ok := txIDs[d.TxID]; ok { continue } allDeposits = append(allDeposits, types.Deposit{ - Time: time.Unix(d.CreatedAt, 0), + Exchange: types.ExchangeMax, + Time: datatype.Time(time.Unix(d.CreatedAt, 0)), Amount: util.MustParseFloat(d.Amount), Asset: toGlobalCurrency(d.Currency), Address: "", // not supported @@ -515,7 +566,11 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, }) } - startTime = endTime + if len(deposits) < limit { + startTime = endTime + } else { + startTime = time.Unix(deposits[0].CreatedAt, 0) + } } return allDeposits, err diff --git a/pkg/exchange/max/maxapi/account.go b/pkg/exchange/max/maxapi/account.go index 2a43f2fbb..cd6582167 100644 --- a/pkg/exchange/max/maxapi/account.go +++ b/pkg/exchange/max/maxapi/account.go @@ -118,7 +118,7 @@ type Deposit struct { Fee string `json:"fee"` TxID string `json:"txid"` State string `json:"state"` - Confirmations string `json:"confirmations"` + Confirmations string `json:"confirmations"` CreatedAt int64 `json:"created_at"` UpdatedAt int64 `json:"updated_at"` } @@ -187,14 +187,13 @@ func (s *AccountService) NewGetDepositHistoryRequest() *GetDepositHistoryRequest } } - - type Withdraw struct { - UUID string `json:"uuid"` + UUID string `json:"uuid"` Currency string `json:"currency"` CurrencyVersion string `json:"currency_version"` // "eth" Amount string `json:"amount"` Fee string `json:"fee"` + FeeCurrency string `json:"fee_currency"` TxID string `json:"txid"` // State can be "submitting", "submitted", @@ -203,10 +202,10 @@ type Withdraw struct { // "failed", "pending", "confirmed", // "kgi_manually_processing", "kgi_manually_confirmed", "kgi_possible_failed", // "sygna_verifying" - State string `json:"state"` - Confirmations int `json:"confirmations"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + State string `json:"state"` + Confirmations int `json:"confirmations"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } type GetWithdrawHistoryRequestParams struct { diff --git a/pkg/migrations/mysql/20210301140656_add_withdraws_table.go b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go new file mode 100644 index 000000000..3f6c296a4 --- /dev/null +++ b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go @@ -0,0 +1,34 @@ +package mysql + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddWithdrawsTable, downAddWithdrawsTable) + +} + +func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + if err != nil { + return err + } + + return err +} + +func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/mysql/20210307201830_add_deposits_table.go b/pkg/migrations/mysql/20210307201830_add_deposits_table.go new file mode 100644 index 000000000..753658b03 --- /dev/null +++ b/pkg/migrations/mysql/20210307201830_add_deposits_table.go @@ -0,0 +1,34 @@ +package mysql + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddDepositsTable, downAddDepositsTable) + +} + +func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + if err != nil { + return err + } + + return err +} + +func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go new file mode 100644 index 000000000..d6e4d0a5f --- /dev/null +++ b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go @@ -0,0 +1,44 @@ +package sqlite3 + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddWithdrawsTable, downAddWithdrawsTable) + +} + +func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL\n);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`);") + if err != nil { + return err + } + + return err +} + +func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `withdraws_txn_id`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go new file mode 100644 index 000000000..0416c5439 --- /dev/null +++ b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go @@ -0,0 +1,44 @@ +package sqlite3 + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddDepositsTable, downAddDepositsTable) + +} + +func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL\n);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`);") + if err != nil { + return err + } + + return err +} + +func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `deposits_txn_id`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index f29af189b..d41e0933c 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -17,33 +17,41 @@ type BacktestService struct { DB *sqlx.DB } -func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - now := time.Now() - for interval := range types.SupportedIntervals { - log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) +func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { + log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) - if err != nil { + lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) + if err != nil { + return err + } + + if lastKLine != nil { + log.Infof("found last checkpoint %s", lastKLine.EndTime) + startTime = lastKLine.StartTime.Add(time.Minute) + } + + batch := &batch2.KLineBatchQuery{Exchange: exchange} + + // should use channel here + klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime) + // var previousKLine types.KLine + for k := range klineC { + if err := s.Insert(k); err != nil { return err } + } - if lastKLine != nil { - log.Infof("found last checkpoint %s", lastKLine.EndTime) - startTime = lastKLine.StartTime.Add(time.Minute) - } + if err := <-errC; err != nil { + return err + } - batch := &batch2.KLineBatchQuery{Exchange: exchange} + return nil +} - // should use channel here - klineC, errC := batch.Query(ctx, symbol, interval, startTime, now) - // var previousKLine types.KLine - for k := range klineC { - if err := s.Insert(k); err != nil { - return err - } - } - - if err := <-errC; err != nil { +func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { + endTime := time.Now() + for interval := range types.SupportedIntervals { + if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) ; err != nil { return err } } diff --git a/pkg/service/deposit.go b/pkg/service/deposit.go new file mode 100644 index 000000000..2a066daf0 --- /dev/null +++ b/pkg/service/deposit.go @@ -0,0 +1,107 @@ +package service + +import ( + "context" + "time" + + "github.com/jmoiron/sqlx" + + "github.com/c9s/bbgo/pkg/types" +) + +type DepositService struct { + DB *sqlx.DB +} + +// Sync syncs the withdraw records into db +func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { + txnIDs := map[string]struct{}{} + + // query descending + records, err := s.QueryLast(ex.Name(), 100) + if err != nil { + return err + } + + for _, record := range records { + txnIDs[record.TransactionID] = struct{}{} + } + + transferApi, ok := ex.(types.ExchangeTransferService) + if !ok { + return ErrNotImplemented + } + + since := time.Time{} + if len(records) > 0 { + since = records[len(records)-1].Time.Time() + } + + // asset "" means all assets + deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now()) + if err != nil { + return err + } + + for _, deposit := range deposits { + if _, exists := txnIDs[deposit.TransactionID]; exists { + continue + } + + if err := s.Insert(deposit); err != nil { + return err + } + } + + return nil +} + + +func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) { + sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" + rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ + "exchange": ex, + "limit": limit, + }) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows) +} + +func (s *DepositService) Query(exchangeName types.ExchangeName) ([]types.Deposit, error) { + args := map[string]interface{}{ + "exchange": exchangeName, + } + sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` ASC" + rows, err := s.DB.NamedQuery(sql, args) + if err != nil { + return nil, err + } + + defer rows.Close() + + return s.scanRows(rows) +} + +func (s *DepositService) scanRows(rows *sqlx.Rows) (deposits []types.Deposit, err error) { + for rows.Next() { + var deposit types.Deposit + if err := rows.StructScan(&deposit); err != nil { + return deposits, err + } + + deposits = append(deposits, deposit) + } + + return deposits, rows.Err() +} + +func (s *DepositService) Insert(deposit types.Deposit) error { + sql := `INSERT INTO deposits (exchange, asset, address, amount, txn_id, time) + VALUES (:exchange, :asset, :address, :amount, :txn_id, :time)` + _, err := s.DB.NamedExec(sql, deposit) + return err +} diff --git a/pkg/service/deposit_test.go b/pkg/service/deposit_test.go new file mode 100644 index 000000000..8519098dc --- /dev/null +++ b/pkg/service/deposit_test.go @@ -0,0 +1,39 @@ +package service + +import ( + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/datatype" + "github.com/c9s/bbgo/pkg/types" +) + +func TestDepositService(t *testing.T) { + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + xdb := sqlx.NewDb(db.DB, "sqlite3") + service := &DepositService{DB: xdb} + + err = service.Insert(types.Deposit{ + Exchange: types.ExchangeMax, + Time: datatype.Time(time.Now()), + Amount: 0.001, + Asset: "BTC", + Address: "test", + TransactionID: "02", + Status: types.DepositSuccess, + }) + assert.NoError(t, err) + + deposits, err := service.Query(types.ExchangeMax) + assert.NoError(t, err) + assert.NotEmpty(t, deposits) +} diff --git a/pkg/service/order.go b/pkg/service/order.go index 769db5ce5..04753d304 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -1,13 +1,16 @@ package service import ( + "context" "strconv" "strings" + "time" "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -15,6 +18,65 @@ type OrderService struct { DB *sqlx.DB } +func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { + isMargin := false + isIsolated := false + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + symbol = marginSettings.IsolatedMarginSymbol + } + } + + records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) + if err != nil { + return err + } + + orderKeys := make(map[uint64]struct{}) + + var lastID uint64 = 0 + if len(records) > 0 { + for _, record := range records { + orderKeys[record.OrderID] = struct{}{} + } + + lastID = records[0].OrderID + startTime = records[0].CreationTime.Time() + } + + b := &batch.ClosedOrderBatchQuery{Exchange: exchange} + ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) + for order := range ordersC { + select { + + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + + } + + if _, exists := orderKeys[order.OrderID]; exists { + continue + } + + if err := s.Insert(order); err != nil { + return err + } + } + + return <-errC +} + + // QueryLast queries the last order from the database func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isIsolated bool, limit int) ([]types.Order, error) { log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated) diff --git a/pkg/service/reward.go b/pkg/service/reward.go index 760fb82f0..c90f73abb 100644 --- a/pkg/service/reward.go +++ b/pkg/service/reward.go @@ -8,11 +8,17 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) +// RewardService collects the reward records from the exchange, +// currently it's only available for MAX exchange. +// TODO: add summary query for calculating the reward amounts +// CREATE VIEW reward_summary_by_years AS SELECT YEAR(created_at) as year, reward_type, currency, SUM(quantity) FROM rewards WHERE reward_type != 'airdrop' GROUP BY YEAR(created_at), reward_type, currency ORDER BY year DESC; type RewardService struct { DB *sqlx.DB } @@ -31,6 +37,64 @@ func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Rew return s.scanRows(rows) } +func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange) error { + service, ok := exchange.(types.ExchangeRewardService) + if !ok { + return ErrExchangeRewardServiceNotImplemented + } + + var rewardKeys = map[string]struct{}{} + + var startTime time.Time + + records, err := s.QueryLast(exchange.Name(), 50) + if err != nil { + return err + } + + if len(records) > 0 { + lastRecord := records[0] + startTime = lastRecord.CreatedAt.Time() + + for _, record := range records { + rewardKeys[record.UUID] = struct{}{} + } + } + + batchQuery := &batch.RewardBatchQuery{Service: service} + rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) + + for reward := range rewardsC { + select { + + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + + } + + if _, ok := rewardKeys[reward.UUID]; ok { + continue + } + + logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt) + + if err := s.Insert(reward); err != nil { + return err + } + } + + return <-errC +} + + + type CurrencyPositionMap map[string]fixedpoint.Value func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) { diff --git a/pkg/service/sync.go b/pkg/service/sync.go index ab1d578ab..69f68ed39 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -5,221 +5,46 @@ import ( "errors" "time" - "github.com/sirupsen/logrus" - - "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) -var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") +var ErrNotImplemented = errors.New("not implemented") +var ErrExchangeRewardServiceNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") type SyncService struct { - TradeService *TradeService - OrderService *OrderService - RewardService *RewardService -} - -func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error { - service, ok := exchange.(types.ExchangeRewardService) - if !ok { - return ErrNotImplemented - } - - var rewardKeys = map[string]struct{}{} - - var startTime time.Time - - records, err := s.RewardService.QueryLast(exchange.Name(), 50) - if err != nil { - return err - } - - if len(records) > 0 { - lastRecord := records[0] - startTime = lastRecord.CreatedAt.Time() - - for _, record := range records { - rewardKeys[record.UUID] = struct{}{} - } - } - - batchQuery := &batch.RewardBatchQuery{Service: service} - rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) - - for reward := range rewardsC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, ok := rewardKeys[reward.UUID]; ok { - continue - } - - logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt) - - if err := s.RewardService.Insert(reward); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - records, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - orderKeys := make(map[uint64]struct{}) - - var lastID uint64 = 0 - if len(records) > 0 { - for _, record := range records { - orderKeys[record.OrderID] = struct{}{} - } - - lastID = records[0].OrderID - startTime = records[0].CreationTime.Time() - } - - b := &batch.ClosedOrderBatchQuery{Exchange: exchange} - ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) - for order := range ordersC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, exists := orderKeys[order.OrderID]; exists { - continue - } - - if err := s.OrderService.Insert(order); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - // records descending ordered - records, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - var tradeKeys = map[types.TradeKey]struct{}{} - var lastTradeID int64 = 1 - if len(records) > 0 { - for _, record := range records { - tradeKeys[record.Key()] = struct{}{} - } - - lastTradeID = records[0].ID - } - - b := &batch.TradeBatchQuery{Exchange: exchange} - tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ - LastTradeID: lastTradeID, - }) - - for trade := range tradeC { - select { - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - } - - key := trade.Key() - if _, exists := tradeKeys[key]; exists { - continue - } - - tradeKeys[key] = struct{}{} - - logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", - trade.Exchange, - trade.ID, - trade.Symbol, - trade.Side, - trade.Price, - trade.Quantity, - trade.MakerOrTakerLabel(), - trade.Time.String()) - - if err := s.TradeService.Insert(trade); err != nil { - return err - } - } - - return <-errC + TradeService *TradeService + OrderService *OrderService + RewardService *RewardService + WithdrawService *WithdrawService + DepositService *DepositService } // SyncSessionSymbols syncs the trades from the given exchange session func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { for _, symbol := range symbols { - if err := s.SyncTrades(ctx, exchange, symbol); err != nil { + if err := s.TradeService.Sync(ctx, exchange, symbol); err != nil { return err } - if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { + if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil { return err } + } - if err := s.SyncRewards(ctx, exchange); err != nil { - if err == ErrNotImplemented { - continue - } + if err := s.DepositService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { + return err + } + } + if err := s.WithdrawService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { + return err + } + } + + if err := s.RewardService.Sync(ctx, exchange); err != nil { + if err != ErrExchangeRewardServiceNotImplemented { return err } } diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 525cc892b..26601ac70 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -49,6 +50,78 @@ func NewTradeService(db *sqlx.DB) *TradeService { return &TradeService{db} } +func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string) error { + isMargin := false + isIsolated := false + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + symbol = marginSettings.IsolatedMarginSymbol + } + } + + // records descending ordered + records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) + if err != nil { + return err + } + + var tradeKeys = map[types.TradeKey]struct{}{} + var lastTradeID int64 = 1 + if len(records) > 0 { + for _, record := range records { + tradeKeys[record.Key()] = struct{}{} + } + + lastTradeID = records[0].ID + } + + b := &batch.TradeBatchQuery{Exchange: exchange} + tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ + LastTradeID: lastTradeID, + }) + + for trade := range tradeC { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + } + + key := trade.Key() + if _, exists := tradeKeys[key]; exists { + continue + } + + tradeKeys[key] = struct{}{} + + log.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", + trade.Exchange, + trade.ID, + trade.Symbol, + trade.Side, + trade.Price, + trade.Quantity, + trade.MakerOrTakerLabel(), + trade.Time.String()) + + if err := s.Insert(trade); err != nil { + return err + } + } + + return <-errC +} + + func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) { args := map[string]interface{}{ // "symbol": symbol, diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go new file mode 100644 index 000000000..4a84a80dc --- /dev/null +++ b/pkg/service/withdraw.go @@ -0,0 +1,106 @@ +package service + +import ( + "context" + "time" + + "github.com/jmoiron/sqlx" + + "github.com/c9s/bbgo/pkg/types" +) + +type WithdrawService struct { + DB *sqlx.DB +} + +// Sync syncs the withdraw records into db +func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { + txnIDs := map[string]struct{}{} + + // query descending + records, err := s.QueryLast(ex.Name(), 100) + if err != nil { + return err + } + + for _, record := range records { + txnIDs[record.TransactionID] = struct{}{} + } + + transferApi, ok := ex.(types.ExchangeTransferService) + if !ok { + return ErrNotImplemented + } + + since := time.Time{} + if len(records) > 0 { + since = records[len(records)-1].ApplyTime.Time() + } + + // asset "" means all assets + withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now()) + if err != nil { + return err + } + + for _, withdraw := range withdraws { + if _, exists := txnIDs[withdraw.TransactionID]; exists { + continue + } + + if err := s.Insert(withdraw); err != nil { + return err + } + } + + return nil +} + +func (s *WithdrawService) QueryLast(ex types.ExchangeName, limit int) ([]types.Withdraw, error) { + sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" + rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ + "exchange": ex, + "limit": limit, + }) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows) +} + +func (s *WithdrawService) Query(exchangeName types.ExchangeName) ([]types.Withdraw, error) { + args := map[string]interface{}{ + "exchange": exchangeName, + } + sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` ASC" + rows, err := s.DB.NamedQuery(sql, args) + if err != nil { + return nil, err + } + + defer rows.Close() + + return s.scanRows(rows) +} + +func (s *WithdrawService) scanRows(rows *sqlx.Rows) (withdraws []types.Withdraw, err error) { + for rows.Next() { + var withdraw types.Withdraw + if err := rows.StructScan(&withdraw); err != nil { + return withdraws, err + } + + withdraws = append(withdraws, withdraw) + } + + return withdraws, rows.Err() +} + +func (s *WithdrawService) Insert(withdrawal types.Withdraw) error { + sql := `INSERT INTO withdraws (exchange, asset, network, address, amount, txn_id, txn_fee, time) + VALUES (:exchange, :asset, :network, :address, :amount, :txn_id, :txn_fee, :time)` + _, err := s.DB.NamedExec(sql, withdrawal) + return err +} diff --git a/pkg/service/withdraw_test.go b/pkg/service/withdraw_test.go new file mode 100644 index 000000000..68ececc18 --- /dev/null +++ b/pkg/service/withdraw_test.go @@ -0,0 +1,41 @@ +package service + +import ( + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/datatype" + "github.com/c9s/bbgo/pkg/types" +) + +func TestWithdrawService(t *testing.T) { + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + xdb := sqlx.NewDb(db.DB, "sqlite3") + service := &WithdrawService{DB: xdb} + + err = service.Insert(types.Withdraw{ + Exchange: types.ExchangeMax, + Asset: "BTC", + Amount: 0.0001, + Address: "test", + TransactionID: "01", + TransactionFee: 0.0001, + Network: "omni", + ApplyTime: datatype.Time(time.Now()), + }) + assert.NoError(t, err) + + withdraws, err := service.Query(types.ExchangeMax) + assert.NoError(t, err) + assert.NotEmpty(t, withdraws) + assert.Equal(t, types.ExchangeMax, withdraws[0].Exchange) +} diff --git a/pkg/types/deposit.go b/pkg/types/deposit.go index e75117f99..1f78c85e8 100644 --- a/pkg/types/deposit.go +++ b/pkg/types/deposit.go @@ -1,6 +1,10 @@ package types -import "time" +import ( + "time" + + "github.com/c9s/bbgo/pkg/datatype" +) type DepositStatus string @@ -20,15 +24,17 @@ const ( ) type Deposit struct { - Time time.Time `json:"time"` - Amount float64 `json:"amount"` - Asset string `json:"asset"` - Address string `json:"address"` + GID int64 `json:"gid" db:"gid"` + Exchange ExchangeName `json:"exchange" db:"exchange"` + Time datatype.Time `json:"time" db:"time"` + Amount float64 `json:"amount" db:"amount"` + Asset string `json:"asset" db:"asset"` + Address string `json:"address" db:"address"` AddressTag string `json:"addressTag"` - TransactionID string `json:"txId"` + TransactionID string `json:"transactionID" db:"txn_id"` Status DepositStatus `json:"status"` } func (d Deposit) EffectiveTime() time.Time { - return d.Time + return d.Time.Time() } diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 1e36346ad..018f66484 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -2,6 +2,7 @@ package types import ( "context" + "database/sql/driver" "encoding/json" "fmt" "strings" @@ -14,9 +15,13 @@ const DateFormat = "2006-01-02" type ExchangeName string +func (n *ExchangeName) Value() (driver.Value, error) { + return n.String(), nil +} + func (n *ExchangeName) UnmarshalJSON(data []byte) error { var s string - if err := json.Unmarshal(data, &s) ; err != nil { + if err := json.Unmarshal(data, &s); err != nil { return err } @@ -72,10 +77,6 @@ type Exchange interface { QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) - QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) - - QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) - SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error) QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error) @@ -85,6 +86,11 @@ type Exchange interface { CancelOrders(ctx context.Context, orders ...Order) error } +type ExchangeTransferService interface { + QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) + QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) +} + type ExchangeRewardService interface { QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error) } diff --git a/pkg/types/withdraw.go b/pkg/types/withdraw.go index 803d585a6..d3ea2501d 100644 --- a/pkg/types/withdraw.go +++ b/pkg/types/withdraw.go @@ -1,22 +1,33 @@ package types -import "time" +import ( + "fmt" + "time" + + "github.com/c9s/bbgo/pkg/datatype" +) type Withdraw struct { - ID string `json:"id"` - Asset string `json:"asset"` - Amount float64 `json:"amount"` - Address string `json:"address"` - AddressTag string `json:"addressTag"` - Status string `json:"status"` + GID int64 `json:"gid" db:"gid"` + Exchange ExchangeName `json:"exchange" db:"exchange"` + Asset string `json:"asset" db:"asset"` + Amount float64 `json:"amount" db:"amount"` + Address string `json:"address" db:"address"` + AddressTag string `json:"addressTag"` + Status string `json:"status"` - TransactionID string `json:"txId"` - TransactionFee float64 `json:"transactionFee"` - WithdrawOrderID string `json:"withdrawOrderId"` - ApplyTime time.Time `json:"applyTime"` - Network string `json:"network"` + TransactionID string `json:"transactionID" db:"txn_id"` + TransactionFee float64 `json:"transactionFee" db:"txn_fee"` + TransactionFeeCurrency string `json:"transactionFeeCurrency" db:"txn_fee_currency"` + WithdrawOrderID string `json:"withdrawOrderId"` + ApplyTime datatype.Time `json:"applyTime" db:"time"` + Network string `json:"network" db:"network"` +} + +func (w Withdraw) String() string { + return fmt.Sprintf("withdraw %s %f to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time()) } func (w Withdraw) EffectiveTime() time.Time { - return w.ApplyTime + return w.ApplyTime.Time() } diff --git a/rockhopper_mysql.yaml b/rockhopper_mysql.yaml index 6dde3ff23..af9636351 100644 --- a/rockhopper_mysql.yaml +++ b/rockhopper_mysql.yaml @@ -1,5 +1,5 @@ --- driver: mysql dialect: mysql -dsn: "root@tcp(localhost:3306)/bbgo?parseTime=true" +dsn: "root@tcp(localhost:3306)/bbgo_dev?parseTime=true" migrationsDir: migrations/mysql diff --git a/scripts/test-mysql-migrations.sh b/scripts/test-mysql-migrations.sh old mode 100644 new mode 100755