From 4e9973681a8e21b9bed20795487873c4df3a89fa Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:58:26 +0800 Subject: [PATCH 01/26] add migration files --- .../20210301140656_add_withdraws_table.sql | 28 ++++++++++++ .../20210307201830_add_deposits_table.sql | 26 +++++++++++ .../20210301140656_add_withdraws_table.sql | 36 +++++++++++++++ .../20210307201830_add_deposits_table.sql | 31 +++++++++++++ .../20210301140656_add_withdraws_table.go | 34 ++++++++++++++ .../20210307201830_add_deposits_table.go | 34 ++++++++++++++ .../20210301140656_add_withdraws_table.go | 44 +++++++++++++++++++ .../20210307201830_add_deposits_table.go | 44 +++++++++++++++++++ 8 files changed, 277 insertions(+) create mode 100644 migrations/mysql/20210301140656_add_withdraws_table.sql create mode 100644 migrations/mysql/20210307201830_add_deposits_table.sql create mode 100644 migrations/sqlite3/20210301140656_add_withdraws_table.sql create mode 100644 migrations/sqlite3/20210307201830_add_deposits_table.sql create mode 100644 pkg/migrations/mysql/20210301140656_add_withdraws_table.go create mode 100644 pkg/migrations/mysql/20210307201830_add_deposits_table.go create mode 100644 pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go create mode 100644 pkg/migrations/sqlite3/20210307201830_add_deposits_table.go diff --git a/migrations/mysql/20210301140656_add_withdraws_table.sql b/migrations/mysql/20210301140656_add_withdraws_table.sql new file mode 100644 index 000000000..b86d45ef6 --- /dev/null +++ b/migrations/mysql/20210301140656_add_withdraws_table.sql @@ -0,0 +1,28 @@ +-- +up +-- +begin +CREATE TABLE `withdraws` +( + `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `exchange` VARCHAR(24) NOT NULL DEFAULT '', + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(64) NOT NULL, + `network` VARCHAR(32) NOT NULL DEFAULT '', + + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(64) NOT NULL, + `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, + `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', + `time` DATETIME(3) NOT NULL, + + PRIMARY KEY (`gid`), + UNIQUE KEY `txn_id` (`exchange`, `txn_id`) +); +-- +end + +-- +down +-- +begin +DROP TABLE IF EXISTS `withdraws`; +-- +end diff --git a/migrations/mysql/20210307201830_add_deposits_table.sql b/migrations/mysql/20210307201830_add_deposits_table.sql new file mode 100644 index 000000000..bf9b7e6af --- /dev/null +++ b/migrations/mysql/20210307201830_add_deposits_table.sql @@ -0,0 +1,26 @@ +-- +up +-- +begin +CREATE TABLE `deposits` +( + `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, + `exchange` VARCHAR(24) NOT NULL, + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(64) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(64) NOT NULL, + `time` DATETIME(3) NOT NULL, + + PRIMARY KEY (`gid`), + UNIQUE KEY `txn_id` (`exchange`, `txn_id`) +); +-- +end + + +-- +down + +-- +begin +DROP TABLE IF EXISTS `deposits`; +-- +end diff --git a/migrations/sqlite3/20210301140656_add_withdraws_table.sql b/migrations/sqlite3/20210301140656_add_withdraws_table.sql new file mode 100644 index 000000000..aa4649b7a --- /dev/null +++ b/migrations/sqlite3/20210301140656_add_withdraws_table.sql @@ -0,0 +1,36 @@ +-- +up +-- +begin +CREATE TABLE `withdraws` +( + `gid` INTEGER PRIMARY KEY AUTOINCREMENT, + `exchange` VARCHAR(24) NOT NULL DEFAULT '', + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(64) NOT NULL, + `network` VARCHAR(32) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + + `txn_id` VARCHAR(64) NOT NULL, + `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, + `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', + `time` DATETIME(3) NOT NULL +); +-- +end + +-- +begin +CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`); +-- +end + + +-- +down + +-- +begin +DROP INDEX IF EXISTS `withdraws_txn_id`; +-- +end + +-- +begin +DROP TABLE IF EXISTS `withdraws`; +-- +end + diff --git a/migrations/sqlite3/20210307201830_add_deposits_table.sql b/migrations/sqlite3/20210307201830_add_deposits_table.sql new file mode 100644 index 000000000..012e1960d --- /dev/null +++ b/migrations/sqlite3/20210307201830_add_deposits_table.sql @@ -0,0 +1,31 @@ +-- +up +-- +begin +CREATE TABLE `deposits` +( + `gid` INTEGER PRIMARY KEY AUTOINCREMENT, + `exchange` VARCHAR(24) NOT NULL, + + -- asset is the asset name (currency) + `asset` VARCHAR(10) NOT NULL, + + `address` VARCHAR(64) NOT NULL DEFAULT '', + `amount` DECIMAL(16, 8) NOT NULL, + `txn_id` VARCHAR(64) NOT NULL, + `time` DATETIME(3) NOT NULL +); +-- +end +-- +begin +CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`); +-- +end + + +-- +down + +-- +begin +DROP INDEX IF EXISTS `deposits_txn_id`; +-- +end + +-- +begin +DROP TABLE IF EXISTS `deposits`; +-- +end + diff --git a/pkg/migrations/mysql/20210301140656_add_withdraws_table.go b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go new file mode 100644 index 000000000..2c8030d3d --- /dev/null +++ b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go @@ -0,0 +1,34 @@ +package mysql + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddWithdrawsTable, downAddWithdrawsTable) + +} + +func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + if err != nil { + return err + } + + return err +} + +func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/mysql/20210307201830_add_deposits_table.go b/pkg/migrations/mysql/20210307201830_add_deposits_table.go new file mode 100644 index 000000000..2ab5a164a --- /dev/null +++ b/pkg/migrations/mysql/20210307201830_add_deposits_table.go @@ -0,0 +1,34 @@ +package mysql + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddDepositsTable, downAddDepositsTable) + +} + +func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + if err != nil { + return err + } + + return err +} + +func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go new file mode 100644 index 000000000..722a26d21 --- /dev/null +++ b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go @@ -0,0 +1,44 @@ +package sqlite3 + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddWithdrawsTable, downAddWithdrawsTable) + +} + +func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL\n);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`);") + if err != nil { + return err + } + + return err +} + +func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `withdraws_txn_id`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;") + if err != nil { + return err + } + + return err +} diff --git a/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go new file mode 100644 index 000000000..350c0f200 --- /dev/null +++ b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go @@ -0,0 +1,44 @@ +package sqlite3 + +import ( + "context" + + "github.com/c9s/rockhopper" +) + +func init() { + AddMigration(upAddDepositsTable, downAddDepositsTable) + +} + +func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is applied. + + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `time` DATETIME(3) NOT NULL\n);") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`);") + if err != nil { + return err + } + + return err +} + +func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { + // This code is executed when the migration is rolled back. + + _, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `deposits_txn_id`;") + if err != nil { + return err + } + + _, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;") + if err != nil { + return err + } + + return err +} From e9fd6e542df4936672d975b79a5f7052adb31199 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:58:45 +0800 Subject: [PATCH 02/26] add exec mode to scripts/test-mysql-migrations.sh --- scripts/test-mysql-migrations.sh | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 scripts/test-mysql-migrations.sh diff --git a/scripts/test-mysql-migrations.sh b/scripts/test-mysql-migrations.sh old mode 100644 new mode 100755 From 5b5f083e925a8b0e9f93d2b4e56832b10245b9ee Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:59:03 +0800 Subject: [PATCH 03/26] use bbgo_dev for mysql rockhopper --- .travis.yml | 1 + rockhopper_mysql.yaml | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 25604dbb5..0653603a7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,6 +10,7 @@ services: before_install: - mysql -e 'CREATE DATABASE bbgo;' +- mysql -e 'CREATE DATABASE bbgo_dev;' install: - go get github.com/c9s/rockhopper/cmd/rockhopper diff --git a/rockhopper_mysql.yaml b/rockhopper_mysql.yaml index 6dde3ff23..af9636351 100644 --- a/rockhopper_mysql.yaml +++ b/rockhopper_mysql.yaml @@ -1,5 +1,5 @@ --- driver: mysql dialect: mysql -dsn: "root@tcp(localhost:3306)/bbgo?parseTime=true" +dsn: "root@tcp(localhost:3306)/bbgo_dev?parseTime=true" migrationsDir: migrations/mysql From b0ea2bfe14c8180b494a5686401c5926e104ed4c Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:59:58 +0800 Subject: [PATCH 04/26] types: add exchange name sql value and unmarshalling --- pkg/types/exchange.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 1e36346ad..1060392de 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -2,6 +2,7 @@ package types import ( "context" + "database/sql/driver" "encoding/json" "fmt" "strings" @@ -14,9 +15,13 @@ const DateFormat = "2006-01-02" type ExchangeName string +func (n *ExchangeName) Value() (driver.Value, error) { + return n.String(), nil +} + func (n *ExchangeName) UnmarshalJSON(data []byte) error { var s string - if err := json.Unmarshal(data, &s) ; err != nil { + if err := json.Unmarshal(data, &s); err != nil { return err } From 2b485602ad3915aa3fdf4fca638f257a750dcd63 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:00:36 +0800 Subject: [PATCH 05/26] split ExchangeTransferService --- pkg/types/exchange.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index 1060392de..018f66484 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -77,10 +77,6 @@ type Exchange interface { QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) - QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) - - QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) - SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error) QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error) @@ -90,6 +86,11 @@ type Exchange interface { CancelOrders(ctx context.Context, orders ...Order) error } +type ExchangeTransferService interface { + QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error) + QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error) +} + type ExchangeRewardService interface { QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error) } From ac45bb306a953ec8d8dc78a30b83bd075714abac Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:01:05 +0800 Subject: [PATCH 06/26] types: update deposit fields and withdraw fields --- pkg/types/deposit.go | 20 +++++++++++++------- pkg/types/withdraw.go | 32 +++++++++++++++++++------------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/pkg/types/deposit.go b/pkg/types/deposit.go index e75117f99..1f78c85e8 100644 --- a/pkg/types/deposit.go +++ b/pkg/types/deposit.go @@ -1,6 +1,10 @@ package types -import "time" +import ( + "time" + + "github.com/c9s/bbgo/pkg/datatype" +) type DepositStatus string @@ -20,15 +24,17 @@ const ( ) type Deposit struct { - Time time.Time `json:"time"` - Amount float64 `json:"amount"` - Asset string `json:"asset"` - Address string `json:"address"` + GID int64 `json:"gid" db:"gid"` + Exchange ExchangeName `json:"exchange" db:"exchange"` + Time datatype.Time `json:"time" db:"time"` + Amount float64 `json:"amount" db:"amount"` + Asset string `json:"asset" db:"asset"` + Address string `json:"address" db:"address"` AddressTag string `json:"addressTag"` - TransactionID string `json:"txId"` + TransactionID string `json:"transactionID" db:"txn_id"` Status DepositStatus `json:"status"` } func (d Deposit) EffectiveTime() time.Time { - return d.Time + return d.Time.Time() } diff --git a/pkg/types/withdraw.go b/pkg/types/withdraw.go index 803d585a6..02748fe9f 100644 --- a/pkg/types/withdraw.go +++ b/pkg/types/withdraw.go @@ -1,22 +1,28 @@ package types -import "time" +import ( + "time" + + "github.com/c9s/bbgo/pkg/datatype" +) type Withdraw struct { - ID string `json:"id"` - Asset string `json:"asset"` - Amount float64 `json:"amount"` - Address string `json:"address"` - AddressTag string `json:"addressTag"` - Status string `json:"status"` + GID int64 `json:"gid" db:"gid"` + Exchange ExchangeName `json:"exchange" db:"exchange"` + Asset string `json:"asset" db:"asset"` + Amount float64 `json:"amount" db:"amount"` + Address string `json:"address" db:"address"` + AddressTag string `json:"addressTag"` + Status string `json:"status"` - TransactionID string `json:"txId"` - TransactionFee float64 `json:"transactionFee"` - WithdrawOrderID string `json:"withdrawOrderId"` - ApplyTime time.Time `json:"applyTime"` - Network string `json:"network"` + TransactionID string `json:"transactionID" db:"txn_id"` + TransactionFee float64 `json:"transactionFee" db:"txn_fee"` + TransactionFeeCurrency string `json:"transactionFeeCurrency" db:"txn_fee_currency"` + WithdrawOrderID string `json:"withdrawOrderId"` + ApplyTime datatype.Time `json:"applyTime" db:"time"` + Network string `json:"network" db:"network"` } func (w Withdraw) EffectiveTime() time.Time { - return w.ApplyTime + return w.ApplyTime.Time() } From 3f0290479b3db3d75ea3b6567c7abe758730c578 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:01:40 +0800 Subject: [PATCH 07/26] binance: update withdraw and deposit types --- pkg/exchange/binance/exchange.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 7d483e0aa..f6fc5d480 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/datatype" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" @@ -247,7 +248,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since txIDs[d.TxID] = struct{}{} allWithdraws = append(allWithdraws, types.Withdraw{ - ApplyTime: time.Unix(0, d.ApplyTime*int64(time.Millisecond)), + Exchange: types.ExchangeBinance, + ApplyTime: datatype.Time(time.Unix(0, d.ApplyTime*int64(time.Millisecond))), Asset: d.Asset, Amount: d.Amount, Address: d.Address, @@ -311,7 +313,8 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, txIDs[d.TxID] = struct{}{} allDeposits = append(allDeposits, types.Deposit{ - Time: time.Unix(0, d.InsertTime*int64(time.Millisecond)), + Exchange: types.ExchangeBinance, + Time: datatype.Time(time.Unix(0, d.InsertTime*int64(time.Millisecond))), Asset: d.Asset, Amount: d.Amount, Address: d.Address, From be672c89e60b2a5de8dc9cd9b4695ac363415834 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:01:59 +0800 Subject: [PATCH 08/26] max: update deposit and withdraw types --- pkg/exchange/max/exchange.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index f5eff8158..129f7d454 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -14,6 +14,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/datatype" maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" @@ -456,7 +457,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since txIDs[d.TxID] = struct{}{} allWithdraws = append(allWithdraws, types.Withdraw{ - ApplyTime: time.Unix(d.CreatedAt, 0), + Exchange: types.ExchangeMax, + ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)), Asset: toGlobalCurrency(d.Currency), Amount: util.MustParseFloat(d.Amount), Address: "", @@ -505,13 +507,14 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } allDeposits = append(allDeposits, types.Deposit{ - Time: time.Unix(d.CreatedAt, 0), - Amount: util.MustParseFloat(d.Amount), - Asset: toGlobalCurrency(d.Currency), - Address: "", // not supported - AddressTag: "", // not supported - TransactionID: d.TxID, - Status: toGlobalDepositStatus(d.State), + Exchange: types.ExchangeMax, + Time: datatype.Time(time.Unix(d.CreatedAt, 0)), + Amount: util.MustParseFloat(d.Amount), + Asset: toGlobalCurrency(d.Currency), + Address: "", // not supported + AddressTag: "", // not supported + TransactionID: d.TxID, + Status: toGlobalDepositStatus(d.State), }) } From 877ea73435545db66a77944f73eeeb0844ade236 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:02:33 +0800 Subject: [PATCH 09/26] maxapi: align fields --- pkg/exchange/max/maxapi/account.go | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/exchange/max/maxapi/account.go b/pkg/exchange/max/maxapi/account.go index 2a43f2fbb..078883ced 100644 --- a/pkg/exchange/max/maxapi/account.go +++ b/pkg/exchange/max/maxapi/account.go @@ -118,7 +118,7 @@ type Deposit struct { Fee string `json:"fee"` TxID string `json:"txid"` State string `json:"state"` - Confirmations string `json:"confirmations"` + Confirmations string `json:"confirmations"` CreatedAt int64 `json:"created_at"` UpdatedAt int64 `json:"updated_at"` } @@ -187,10 +187,8 @@ func (s *AccountService) NewGetDepositHistoryRequest() *GetDepositHistoryRequest } } - - type Withdraw struct { - UUID string `json:"uuid"` + UUID string `json:"uuid"` Currency string `json:"currency"` CurrencyVersion string `json:"currency_version"` // "eth" Amount string `json:"amount"` @@ -203,10 +201,10 @@ type Withdraw struct { // "failed", "pending", "confirmed", // "kgi_manually_processing", "kgi_manually_confirmed", "kgi_possible_failed", // "sygna_verifying" - State string `json:"state"` - Confirmations int `json:"confirmations"` - CreatedAt int64 `json:"created_at"` - UpdatedAt int64 `json:"updated_at"` + State string `json:"state"` + Confirmations int `json:"confirmations"` + CreatedAt int64 `json:"created_at"` + UpdatedAt int64 `json:"updated_at"` } type GetWithdrawHistoryRequestParams struct { From 5a02cdbda3b6d3aaa6b82af8a289836f82ce9fc9 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:03:02 +0800 Subject: [PATCH 10/26] implement sync method on the order service --- pkg/service/order.go | 62 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/pkg/service/order.go b/pkg/service/order.go index 769db5ce5..04753d304 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -1,13 +1,16 @@ package service import ( + "context" "strconv" "strings" + "time" "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -15,6 +18,65 @@ type OrderService struct { DB *sqlx.DB } +func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { + isMargin := false + isIsolated := false + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + symbol = marginSettings.IsolatedMarginSymbol + } + } + + records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) + if err != nil { + return err + } + + orderKeys := make(map[uint64]struct{}) + + var lastID uint64 = 0 + if len(records) > 0 { + for _, record := range records { + orderKeys[record.OrderID] = struct{}{} + } + + lastID = records[0].OrderID + startTime = records[0].CreationTime.Time() + } + + b := &batch.ClosedOrderBatchQuery{Exchange: exchange} + ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) + for order := range ordersC { + select { + + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + + } + + if _, exists := orderKeys[order.OrderID]; exists { + continue + } + + if err := s.Insert(order); err != nil { + return err + } + } + + return <-errC +} + + // QueryLast queries the last order from the database func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isIsolated bool, limit int) ([]types.Order, error) { log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated) From 8fc7c4798e1fd14c3d63b133b0b6d975fba7248f Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:03:22 +0800 Subject: [PATCH 11/26] implement sync method on reward service --- pkg/service/reward.go | 60 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/pkg/service/reward.go b/pkg/service/reward.go index 760fb82f0..fee94e911 100644 --- a/pkg/service/reward.go +++ b/pkg/service/reward.go @@ -8,7 +8,9 @@ import ( "time" "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) @@ -31,6 +33,64 @@ func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Rew return s.scanRows(rows) } +func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange) error { + service, ok := exchange.(types.ExchangeRewardService) + if !ok { + return ErrNotImplemented + } + + var rewardKeys = map[string]struct{}{} + + var startTime time.Time + + records, err := s.QueryLast(exchange.Name(), 50) + if err != nil { + return err + } + + if len(records) > 0 { + lastRecord := records[0] + startTime = lastRecord.CreatedAt.Time() + + for _, record := range records { + rewardKeys[record.UUID] = struct{}{} + } + } + + batchQuery := &batch.RewardBatchQuery{Service: service} + rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) + + for reward := range rewardsC { + select { + + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + + } + + if _, ok := rewardKeys[reward.UUID]; ok { + continue + } + + logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt) + + if err := s.Insert(reward); err != nil { + return err + } + } + + return <-errC +} + + + type CurrencyPositionMap map[string]fixedpoint.Value func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) { From f22a6ee69703c0a1788326f9c1bbc56388637349 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:03:45 +0800 Subject: [PATCH 12/26] implement sync method on the trade service --- pkg/service/trade.go | 73 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 525cc892b..26601ac70 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -11,6 +11,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -49,6 +50,78 @@ func NewTradeService(db *sqlx.DB) *TradeService { return &TradeService{db} } +func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string) error { + isMargin := false + isIsolated := false + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + symbol = marginSettings.IsolatedMarginSymbol + } + } + + // records descending ordered + records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) + if err != nil { + return err + } + + var tradeKeys = map[types.TradeKey]struct{}{} + var lastTradeID int64 = 1 + if len(records) > 0 { + for _, record := range records { + tradeKeys[record.Key()] = struct{}{} + } + + lastTradeID = records[0].ID + } + + b := &batch.TradeBatchQuery{Exchange: exchange} + tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ + LastTradeID: lastTradeID, + }) + + for trade := range tradeC { + select { + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + } + + key := trade.Key() + if _, exists := tradeKeys[key]; exists { + continue + } + + tradeKeys[key] = struct{}{} + + log.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", + trade.Exchange, + trade.ID, + trade.Symbol, + trade.Side, + trade.Price, + trade.Quantity, + trade.MakerOrTakerLabel(), + trade.Time.String()) + + if err := s.Insert(trade); err != nil { + return err + } + } + + return <-errC +} + + func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) { args := map[string]interface{}{ // "symbol": symbol, From 3c90aa515dcfbada0c57e806f1226be60c475395 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:04:18 +0800 Subject: [PATCH 13/26] add deposit service and withdraw service --- pkg/service/deposit.go | 60 ++++++++++++++++++++++++++++++++++++ pkg/service/deposit_test.go | 39 +++++++++++++++++++++++ pkg/service/withdraw.go | 60 ++++++++++++++++++++++++++++++++++++ pkg/service/withdraw_test.go | 41 ++++++++++++++++++++++++ 4 files changed, 200 insertions(+) create mode 100644 pkg/service/deposit.go create mode 100644 pkg/service/deposit_test.go create mode 100644 pkg/service/withdraw.go create mode 100644 pkg/service/withdraw_test.go diff --git a/pkg/service/deposit.go b/pkg/service/deposit.go new file mode 100644 index 000000000..1df6a95a5 --- /dev/null +++ b/pkg/service/deposit.go @@ -0,0 +1,60 @@ +package service + +import ( + "github.com/jmoiron/sqlx" + + "github.com/c9s/bbgo/pkg/types" +) + +type DepositService struct { + DB *sqlx.DB +} + +func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) { + sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" + rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ + "exchange": ex, + "limit": limit, + }) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows) +} + +func (s *DepositService) Query(exchangeName types.ExchangeName) ([]types.Deposit, error) { + args := map[string]interface{}{ + "exchange": exchangeName, + } + sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` ASC" + rows, err := s.DB.NamedQuery(sql, args) + if err != nil { + return nil, err + } + + defer rows.Close() + + return s.scanRows(rows) +} + +func (s *DepositService) scanRows(rows *sqlx.Rows) (deposits []types.Deposit, err error) { + for rows.Next() { + var deposit types.Deposit + if err := rows.StructScan(&deposit); err != nil { + return deposits, err + } + + deposits = append(deposits, deposit) + } + + return deposits, rows.Err() +} + +func (s *DepositService) Insert(deposit types.Deposit) error { + sql := `INSERT INTO deposits (exchange, asset, address, amount, txn_id, time) + VALUES (:exchange, :asset, :address, :amount, :txn_id, :time)` + _, err := s.DB.NamedExec(sql, deposit) + return err +} diff --git a/pkg/service/deposit_test.go b/pkg/service/deposit_test.go new file mode 100644 index 000000000..8519098dc --- /dev/null +++ b/pkg/service/deposit_test.go @@ -0,0 +1,39 @@ +package service + +import ( + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/datatype" + "github.com/c9s/bbgo/pkg/types" +) + +func TestDepositService(t *testing.T) { + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + xdb := sqlx.NewDb(db.DB, "sqlite3") + service := &DepositService{DB: xdb} + + err = service.Insert(types.Deposit{ + Exchange: types.ExchangeMax, + Time: datatype.Time(time.Now()), + Amount: 0.001, + Asset: "BTC", + Address: "test", + TransactionID: "02", + Status: types.DepositSuccess, + }) + assert.NoError(t, err) + + deposits, err := service.Query(types.ExchangeMax) + assert.NoError(t, err) + assert.NotEmpty(t, deposits) +} diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go new file mode 100644 index 000000000..3091cc645 --- /dev/null +++ b/pkg/service/withdraw.go @@ -0,0 +1,60 @@ +package service + +import ( + "github.com/jmoiron/sqlx" + + "github.com/c9s/bbgo/pkg/types" +) + +type WithdrawService struct { + DB *sqlx.DB +} + +func (s *WithdrawService) QueryLast(ex types.ExchangeName, limit int) ([]types.Withdraw, error) { + sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" + rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ + "exchange": ex, + "limit": limit, + }) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows) +} + +func (s *WithdrawService) Query(exchangeName types.ExchangeName) ([]types.Withdraw, error) { + args := map[string]interface{}{ + "exchange": exchangeName, + } + sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` ASC" + rows, err := s.DB.NamedQuery(sql, args) + if err != nil { + return nil, err + } + + defer rows.Close() + + return s.scanRows(rows) +} + +func (s *WithdrawService) scanRows(rows *sqlx.Rows) (withdraws []types.Withdraw, err error) { + for rows.Next() { + var withdraw types.Withdraw + if err := rows.StructScan(&withdraw); err != nil { + return withdraws, err + } + + withdraws = append(withdraws, withdraw) + } + + return withdraws, rows.Err() +} + +func (s *WithdrawService) Insert(withdrawal types.Withdraw) error { + sql := `INSERT INTO withdraws (exchange, asset, network, address, amount, txn_id, txn_fee, time) + VALUES (:exchange, :asset, :network, :address, :amount, :txn_id, :txn_fee, :time)` + _, err := s.DB.NamedExec(sql, withdrawal) + return err +} diff --git a/pkg/service/withdraw_test.go b/pkg/service/withdraw_test.go new file mode 100644 index 000000000..68ececc18 --- /dev/null +++ b/pkg/service/withdraw_test.go @@ -0,0 +1,41 @@ +package service + +import ( + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/datatype" + "github.com/c9s/bbgo/pkg/types" +) + +func TestWithdrawService(t *testing.T) { + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + xdb := sqlx.NewDb(db.DB, "sqlite3") + service := &WithdrawService{DB: xdb} + + err = service.Insert(types.Withdraw{ + Exchange: types.ExchangeMax, + Asset: "BTC", + Amount: 0.0001, + Address: "test", + TransactionID: "01", + TransactionFee: 0.0001, + Network: "omni", + ApplyTime: datatype.Time(time.Now()), + }) + assert.NoError(t, err) + + withdraws, err := service.Query(types.ExchangeMax) + assert.NoError(t, err) + assert.NotEmpty(t, withdraws) + assert.Equal(t, types.ExchangeMax, withdraws[0].Exchange) +} From 4b49fda4636fd267c03c5bdc35684f82a0f34420 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 11:04:56 +0800 Subject: [PATCH 14/26] refactor sync service --- README.md | 1 + pkg/bbgo/environment.go | 8 +- pkg/cmd/pnl.go | 89 ++++++++++++++--- pkg/cmd/transfer.go | 6 +- pkg/service/backtest.go | 50 ++++++---- pkg/service/sync.go | 208 ++-------------------------------------- 6 files changed, 128 insertions(+), 234 deletions(-) diff --git a/README.md b/README.md index debb3b2df..c61911f0b 100644 --- a/README.md +++ b/README.md @@ -341,6 +341,7 @@ for lorca make embed && go run -tags web ./cmd/bbgo-lorca ``` + ## Support ### By contributing pull requests diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index c651d065c..63e9f2fb9 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -161,9 +161,11 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver environ.RewardService = &service.RewardService{DB: db} environ.SyncService = &service.SyncService{ - TradeService: environ.TradeService, - OrderService: environ.OrderService, - RewardService: environ.RewardService, + TradeService: environ.TradeService, + OrderService: environ.OrderService, + RewardService: environ.RewardService, + WithdrawService: &service.WithdrawService{DB: db}, + DepositService: &service.DepositService{DB: db}, } return nil diff --git a/pkg/cmd/pnl.go b/pkg/cmd/pnl.go index b42cfb4b4..37f7b2071 100644 --- a/pkg/cmd/pnl.go +++ b/pkg/cmd/pnl.go @@ -2,7 +2,10 @@ package cmd import ( "context" + "fmt" + "os" "strings" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -11,14 +14,14 @@ import ( "github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/bbgo" - "github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" ) func init() { - PnLCmd.Flags().String("exchange", "", "target exchange") - PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") + PnLCmd.Flags().String("session", "", "target exchange") + PnLCmd.Flags().String("symbol", "", "trading symbol") + PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades") PnLCmd.Flags().Int("limit", 500, "number of trades") RootCmd.AddCommand(PnLCmd) } @@ -30,12 +33,26 @@ var PnLCmd = &cobra.Command{ RunE: func(cmd *cobra.Command, args []string) error { ctx := context.Background() - exchangeNameStr, err := cmd.Flags().GetString("exchange") + configFile, err := cmd.Flags().GetString("config") if err != nil { return err } - exchangeName, err := types.ValidExchangeName(exchangeNameStr) + if len(configFile) == 0 { + return errors.New("--config option is required") + } + + if _, err := os.Stat(configFile); os.IsNotExist(err) { + return err + } + + userConfig, err := bbgo.Load(configFile, false) + if err != nil { + return err + } + + + sessionName, err := cmd.Flags().GetString("session") if err != nil { return err } @@ -45,23 +62,73 @@ var PnLCmd = &cobra.Command{ return err } + if len(symbol) == 0 { + return errors.New("--symbol [SYMBOL] is required") + } + limit, err := cmd.Flags().GetInt("limit") if err != nil { return err } - exchange, err := cmdutil.NewExchange(exchangeName) + environ := bbgo.NewEnvironment() + if err := environ.ConfigureDatabase(ctx); err != nil { + return err + } + + if err := environ.ConfigureExchangeSessions(userConfig); err != nil { + return err + } + + session, ok := environ.Session(sessionName) + if !ok { + return fmt.Errorf("session %s not found", sessionName) + } + + if err := environ.Sync(ctx) ; err != nil { + return err + } + + exchange := session.Exchange + + market, ok := session.Market(symbol) + if !ok { + return fmt.Errorf("market config %s not found", symbol) + } + + since := time.Now().AddDate(-1, 0, 0) + until := time.Now() + + includeTransfer, err := cmd.Flags().GetBool("include-transfer") if err != nil { return err } + if includeTransfer { + transferService, ok := exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("session exchange %s does not implement transfer service", sessionName) + } - environ := bbgo.NewEnvironment() - if err := environ.ConfigureDatabase(ctx) ; err != nil { - return err + deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = deposits + + withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until) + if err != nil { + return err + } + _ = withdrawals + + // we need the backtest klines for the daily prices + backtestService := &service.BacktestService{DB: environ.DatabaseService.DB} + if err := backtestService.SyncKLineByInterval(ctx, exchange, symbol, types.Interval1d, since, until); err != nil { + return err + } } - var trades []types.Trade tradingFeeCurrency := exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { @@ -71,7 +138,7 @@ var PnLCmd = &cobra.Command{ trades, err = environ.TradeService.Query(service.QueryTradesOptions{ Exchange: exchange.Name(), Symbol: symbol, - Limit: limit, + Limit: limit, }) } diff --git a/pkg/cmd/transfer.go b/pkg/cmd/transfer.go index 8f3bf5657..d8d39770b 100644 --- a/pkg/cmd/transfer.go +++ b/pkg/cmd/transfer.go @@ -101,7 +101,11 @@ var TransferHistoryCmd = &cobra.Command{ var records timeSlice - exchange := session.Exchange + exchange, ok := session.Exchange.(types.ExchangeTransferService) + if !ok { + return fmt.Errorf("exchange session %s does not implement transfer service", sessionName) + } + deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until) if err != nil { return err diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index f29af189b..d41e0933c 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -17,33 +17,41 @@ type BacktestService struct { DB *sqlx.DB } -func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - now := time.Now() - for interval := range types.SupportedIntervals { - log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) +func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { + log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) - if err != nil { + lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) + if err != nil { + return err + } + + if lastKLine != nil { + log.Infof("found last checkpoint %s", lastKLine.EndTime) + startTime = lastKLine.StartTime.Add(time.Minute) + } + + batch := &batch2.KLineBatchQuery{Exchange: exchange} + + // should use channel here + klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime) + // var previousKLine types.KLine + for k := range klineC { + if err := s.Insert(k); err != nil { return err } + } - if lastKLine != nil { - log.Infof("found last checkpoint %s", lastKLine.EndTime) - startTime = lastKLine.StartTime.Add(time.Minute) - } + if err := <-errC; err != nil { + return err + } - batch := &batch2.KLineBatchQuery{Exchange: exchange} + return nil +} - // should use channel here - klineC, errC := batch.Query(ctx, symbol, interval, startTime, now) - // var previousKLine types.KLine - for k := range klineC { - if err := s.Insert(k); err != nil { - return err - } - } - - if err := <-errC; err != nil { +func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { + endTime := time.Now() + for interval := range types.SupportedIntervals { + if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) ; err != nil { return err } } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index ab1d578ab..23b2e7970 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -5,221 +5,33 @@ import ( "errors" "time" - "github.com/sirupsen/logrus" - - "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") type SyncService struct { - TradeService *TradeService - OrderService *OrderService - RewardService *RewardService -} - -func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error { - service, ok := exchange.(types.ExchangeRewardService) - if !ok { - return ErrNotImplemented - } - - var rewardKeys = map[string]struct{}{} - - var startTime time.Time - - records, err := s.RewardService.QueryLast(exchange.Name(), 50) - if err != nil { - return err - } - - if len(records) > 0 { - lastRecord := records[0] - startTime = lastRecord.CreatedAt.Time() - - for _, record := range records { - rewardKeys[record.UUID] = struct{}{} - } - } - - batchQuery := &batch.RewardBatchQuery{Service: service} - rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) - - for reward := range rewardsC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, ok := rewardKeys[reward.UUID]; ok { - continue - } - - logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt) - - if err := s.RewardService.Insert(reward); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - records, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - orderKeys := make(map[uint64]struct{}) - - var lastID uint64 = 0 - if len(records) > 0 { - for _, record := range records { - orderKeys[record.OrderID] = struct{}{} - } - - lastID = records[0].OrderID - startTime = records[0].CreationTime.Time() - } - - b := &batch.ClosedOrderBatchQuery{Exchange: exchange} - ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) - for order := range ordersC { - select { - - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - - } - - if _, exists := orderKeys[order.OrderID]; exists { - continue - } - - if err := s.OrderService.Insert(order); err != nil { - return err - } - } - - return <-errC -} - -func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error { - isMargin := false - isIsolated := false - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - // records descending ordered - records, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50) - if err != nil { - return err - } - - var tradeKeys = map[types.TradeKey]struct{}{} - var lastTradeID int64 = 1 - if len(records) > 0 { - for _, record := range records { - tradeKeys[record.Key()] = struct{}{} - } - - lastTradeID = records[0].ID - } - - b := &batch.TradeBatchQuery{Exchange: exchange} - tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ - LastTradeID: lastTradeID, - }) - - for trade := range tradeC { - select { - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: - } - - key := trade.Key() - if _, exists := tradeKeys[key]; exists { - continue - } - - tradeKeys[key] = struct{}{} - - logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s", - trade.Exchange, - trade.ID, - trade.Symbol, - trade.Side, - trade.Price, - trade.Quantity, - trade.MakerOrTakerLabel(), - trade.Time.String()) - - if err := s.TradeService.Insert(trade); err != nil { - return err - } - } - - return <-errC + TradeService *TradeService + OrderService *OrderService + RewardService *RewardService + WithdrawService *WithdrawService + DepositService *DepositService } // SyncSessionSymbols syncs the trades from the given exchange session func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { for _, symbol := range symbols { - if err := s.SyncTrades(ctx, exchange, symbol); err != nil { + if err := s.TradeService.Sync(ctx, exchange, symbol); err != nil { return err } - if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { + if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil { return err } + } - if err := s.SyncRewards(ctx, exchange); err != nil { - if err == ErrNotImplemented { - continue - } - + if err := s.RewardService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { return err } } From 4d3b1ec938eca61a03f7de55b98762452d930f74 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 13 Mar 2021 20:49:51 +0800 Subject: [PATCH 15/26] fix QueryWithdrawHistory and QueryDepositHistory --- pkg/exchange/binance/exchange.go | 20 +++++++++++- pkg/exchange/max/exchange.go | 56 +++++++++++++++++++++++++------- pkg/service/reward.go | 2 +- pkg/service/sync.go | 5 +-- pkg/service/withdraw.go | 34 +++++++++++++++++++ pkg/types/withdraw.go | 5 +++ 6 files changed, 106 insertions(+), 16 deletions(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index f6fc5d480..43f16f44f 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -194,9 +194,27 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st return toGlobalIsolatedMarginAccount(account), nil } -func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { +func (e *Exchange) getLaunchDate() (time.Time, error) { + // binance launch date 12:00 July 14th, 2017 + loc, err := time.LoadLocation("Asia/Shanghai") + if err != nil { + return time.Time{}, err + } + return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil +} + +func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { startTime := since + + var emptyTime = time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + txIDs := map[string]struct{}{} for startTime.Before(until) { diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 129f7d454..2eed16286 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -377,6 +377,16 @@ func (e *Exchange) PlatformFeeCurrency() string { return toGlobalCurrency("max") } +func (e *Exchange) getLaunchDate() (time.Time, error) { + // MAX launch date June 21th, 2018 + loc, err := time.LoadLocation("Asia/Taipei") + if err != nil { + return time.Time{}, err + } + + return time.Date(2018, time.June, 21, 0, 0, 0, 0, loc), nil +} + func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { if err := accountQueryLimiter.Wait(ctx); err != nil { return nil, err @@ -407,10 +417,19 @@ func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { startTime := since + limit := 1000 txIDs := map[string]struct{}{} + emptyTime := time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + for startTime.Before(until) { - // startTime ~ endTime must be in 90 days + // startTime ~ endTime must be in 60 days endTime := startTime.AddDate(0, 0, 60) if endTime.After(until) { endTime = until @@ -425,12 +444,18 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since withdraws, err := req. From(startTime.Unix()). To(endTime.Unix()). + Limit(limit). Do(ctx) if err != nil { return allWithdraws, err } + if len(withdraws) == 0 { + startTime = endTime + continue + } + for _, d := range withdraws { if _, ok := txIDs[d.TxID]; ok { continue @@ -456,7 +481,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since } txIDs[d.TxID] = struct{}{} - allWithdraws = append(allWithdraws, types.Withdraw{ + withdraw := types.Withdraw{ Exchange: types.ExchangeMax, ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)), Asset: toGlobalCurrency(d.Currency), @@ -468,10 +493,17 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since // WithdrawOrderID: d.WithdrawOrderID, // Network: d.Network, Status: status, - }) + } + allWithdraws = append(allWithdraws, withdraw) + } + + // go next time frame + if len(withdraws) < limit { + startTime = endTime + } else { + startTime = time.Unix(withdraws[len(withdraws)-1].UpdatedAt, 0) } - startTime = endTime } return allWithdraws, nil @@ -507,14 +539,14 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } allDeposits = append(allDeposits, types.Deposit{ - Exchange: types.ExchangeMax, - Time: datatype.Time(time.Unix(d.CreatedAt, 0)), - Amount: util.MustParseFloat(d.Amount), - Asset: toGlobalCurrency(d.Currency), - Address: "", // not supported - AddressTag: "", // not supported - TransactionID: d.TxID, - Status: toGlobalDepositStatus(d.State), + Exchange: types.ExchangeMax, + Time: datatype.Time(time.Unix(d.CreatedAt, 0)), + Amount: util.MustParseFloat(d.Amount), + Asset: toGlobalCurrency(d.Currency), + Address: "", // not supported + AddressTag: "", // not supported + TransactionID: d.TxID, + Status: toGlobalDepositStatus(d.State), }) } diff --git a/pkg/service/reward.go b/pkg/service/reward.go index fee94e911..8d09c8a16 100644 --- a/pkg/service/reward.go +++ b/pkg/service/reward.go @@ -36,7 +36,7 @@ func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Rew func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange) error { service, ok := exchange.(types.ExchangeRewardService) if !ok { - return ErrNotImplemented + return ErrExchangeRewardServiceNotImplemented } var rewardKeys = map[string]struct{}{} diff --git a/pkg/service/sync.go b/pkg/service/sync.go index 23b2e7970..74cae1181 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -8,7 +8,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") +var ErrNotImplemented = errors.New("not implemented") +var ErrExchangeRewardServiceNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") type SyncService struct { TradeService *TradeService @@ -31,7 +32,7 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc } if err := s.RewardService.Sync(ctx, exchange); err != nil { - if err != ErrNotImplemented { + if err != ErrExchangeRewardServiceNotImplemented { return err } } diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go index 3091cc645..7c2fe8f14 100644 --- a/pkg/service/withdraw.go +++ b/pkg/service/withdraw.go @@ -1,6 +1,9 @@ package service import ( + "context" + "time" + "github.com/jmoiron/sqlx" "github.com/c9s/bbgo/pkg/types" @@ -10,6 +13,37 @@ type WithdrawService struct { DB *sqlx.DB } +func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { + txnIDs := map[string]struct{}{} + + records, err := s.QueryLast(ex.Name(), 10) + if err != nil { + return err + } + + for _, record := range records { + txnIDs[record.TransactionID] = struct{}{} + } + + transferApi, ok := ex.(types.ExchangeTransferService) + if !ok { + return ErrNotImplemented + } + + withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", records[0].ApplyTime.Time(), time.Now()) + if err != nil { + return err + } + + for _, withdraw := range withdraws { + if _, exists := txnIDs[withdraw.TransactionID] ; exists { + continue + } + } + + return nil +} + func (s *WithdrawService) QueryLast(ex types.ExchangeName, limit int) ([]types.Withdraw, error) { sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ diff --git a/pkg/types/withdraw.go b/pkg/types/withdraw.go index 02748fe9f..d3ea2501d 100644 --- a/pkg/types/withdraw.go +++ b/pkg/types/withdraw.go @@ -1,6 +1,7 @@ package types import ( + "fmt" "time" "github.com/c9s/bbgo/pkg/datatype" @@ -23,6 +24,10 @@ type Withdraw struct { Network string `json:"network" db:"network"` } +func (w Withdraw) String() string { + return fmt.Sprintf("withdraw %s %f to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time()) +} + func (w Withdraw) EffectiveTime() time.Time { return w.ApplyTime.Time() } From 2d6b6e7427055b26c1778483bb3148afeb08a0f4 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 11 Mar 2021 11:22:01 +0800 Subject: [PATCH 16/26] fix withdrawal data ordering --- pkg/exchange/max/exchange.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 2eed16286..0d6f16364 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -456,7 +456,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since continue } - for _, d := range withdraws { + for i := len(withdraws) - 1 ; i >= 0 ; i-- { + d := withdraws[i] if _, ok := txIDs[d.TxID]; ok { continue } @@ -490,6 +491,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since AddressTag: "", TransactionID: d.TxID, TransactionFee: util.MustParseFloat(d.Fee), + TransactionFeeCurrency: d.FeeCurrency, // WithdrawOrderID: d.WithdrawOrderID, // Network: d.Network, Status: status, @@ -503,7 +505,6 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since } else { startTime = time.Unix(withdraws[len(withdraws)-1].UpdatedAt, 0) } - } return allWithdraws, nil From dbcf35e4a400f95bc119c7511cc2cf25453cd651 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 11 Mar 2021 11:22:13 +0800 Subject: [PATCH 17/26] add FeeCurrency field --- pkg/exchange/max/maxapi/account.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/exchange/max/maxapi/account.go b/pkg/exchange/max/maxapi/account.go index 078883ced..cd6582167 100644 --- a/pkg/exchange/max/maxapi/account.go +++ b/pkg/exchange/max/maxapi/account.go @@ -193,6 +193,7 @@ type Withdraw struct { CurrencyVersion string `json:"currency_version"` // "eth" Amount string `json:"amount"` Fee string `json:"fee"` + FeeCurrency string `json:"fee_currency"` TxID string `json:"txid"` // State can be "submitting", "submitted", From 75778675e3efa72760ac34cbb393eea7d1f7de67 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 11 Mar 2021 15:57:24 +0800 Subject: [PATCH 18/26] fix withdraw query order --- pkg/exchange/max/exchange.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 0d6f16364..62b23fc19 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -503,7 +503,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since if len(withdraws) < limit { startTime = endTime } else { - startTime = time.Unix(withdraws[len(withdraws)-1].UpdatedAt, 0) + // its in descending order, so we get the first record + startTime = time.Unix(withdraws[0].UpdatedAt, 0) } } From b25671c864e65bb43230e623de58cc6efa056ec1 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 11 Mar 2021 16:03:07 +0800 Subject: [PATCH 19/26] fix max deposits history ordering --- pkg/exchange/max/exchange.go | 33 +++++++++++++++++++++------------ 1 file changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 62b23fc19..748b454e2 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -456,7 +456,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since continue } - for i := len(withdraws) - 1 ; i >= 0 ; i-- { + for i := len(withdraws) - 1; i >= 0; i-- { d := withdraws[i] if _, ok := txIDs[d.TxID]; ok { continue @@ -483,14 +483,14 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since txIDs[d.TxID] = struct{}{} withdraw := types.Withdraw{ - Exchange: types.ExchangeMax, - ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)), - Asset: toGlobalCurrency(d.Currency), - Amount: util.MustParseFloat(d.Amount), - Address: "", - AddressTag: "", - TransactionID: d.TxID, - TransactionFee: util.MustParseFloat(d.Fee), + Exchange: types.ExchangeMax, + ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)), + Asset: toGlobalCurrency(d.Currency), + Amount: util.MustParseFloat(d.Amount), + Address: "", + AddressTag: "", + TransactionID: d.TxID, + TransactionFee: util.MustParseFloat(d.Fee), TransactionFeeCurrency: d.FeeCurrency, // WithdrawOrderID: d.WithdrawOrderID, // Network: d.Network, @@ -513,6 +513,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) { startTime := since + limit := 1000 txIDs := map[string]struct{}{} for startTime.Before(until) { // startTime ~ endTime must be in 90 days @@ -522,6 +523,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } log.Infof("querying deposit history %s: %s <=> %s", asset, startTime, endTime) + req := e.client.AccountService.NewGetDepositHistoryRequest() if len(asset) > 0 { req.Currency(toLocalCurrency(asset)) @@ -529,13 +531,16 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, deposits, err := req. From(startTime.Unix()). - To(endTime.Unix()).Do(ctx) + To(endTime.Unix()). + Limit(limit). + Do(ctx) if err != nil { return nil, err } - for _, d := range deposits { + for i := len(deposits) - 1; i >= 0; i-- { + d := deposits[i] if _, ok := txIDs[d.TxID]; ok { continue } @@ -552,7 +557,11 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, }) } - startTime = endTime + if len(deposits) < limit { + startTime = endTime + } else { + startTime = time.Unix(deposits[0].UpdatedAt, 0) + } } return allDeposits, err From 8e85274876b10b1ade23614efcabfc6d704e2fba Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 11 Mar 2021 16:44:43 +0800 Subject: [PATCH 20/26] fix used time field for withdraw --- pkg/exchange/max/exchange.go | 4 ++-- pkg/service/withdraw.go | 18 +++++++++++++++--- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index 748b454e2..bb03b7deb 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -504,7 +504,7 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since startTime = endTime } else { // its in descending order, so we get the first record - startTime = time.Unix(withdraws[0].UpdatedAt, 0) + startTime = time.Unix(withdraws[0].CreatedAt, 0) } } @@ -560,7 +560,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, if len(deposits) < limit { startTime = endTime } else { - startTime = time.Unix(deposits[0].UpdatedAt, 0) + startTime = time.Unix(deposits[0].CreatedAt, 0) } } diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go index 7c2fe8f14..4a84a80dc 100644 --- a/pkg/service/withdraw.go +++ b/pkg/service/withdraw.go @@ -13,10 +13,12 @@ type WithdrawService struct { DB *sqlx.DB } +// Sync syncs the withdraw records into db func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { txnIDs := map[string]struct{}{} - records, err := s.QueryLast(ex.Name(), 10) + // query descending + records, err := s.QueryLast(ex.Name(), 100) if err != nil { return err } @@ -30,15 +32,25 @@ func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { return ErrNotImplemented } - withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", records[0].ApplyTime.Time(), time.Now()) + since := time.Time{} + if len(records) > 0 { + since = records[len(records)-1].ApplyTime.Time() + } + + // asset "" means all assets + withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now()) if err != nil { return err } for _, withdraw := range withdraws { - if _, exists := txnIDs[withdraw.TransactionID] ; exists { + if _, exists := txnIDs[withdraw.TransactionID]; exists { continue } + + if err := s.Insert(withdraw); err != nil { + return err + } } return nil From ccbb78ce4d97fe63b5306cc942f0c7cce7f5b9d0 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:42:39 +0800 Subject: [PATCH 21/26] migration: extend tx id and address size --- migrations/mysql/20210301140656_add_withdraws_table.sql | 4 ++-- migrations/mysql/20210307201830_add_deposits_table.sql | 4 ++-- migrations/sqlite3/20210301140656_add_withdraws_table.sql | 4 ++-- migrations/sqlite3/20210307201830_add_deposits_table.sql | 4 ++-- pkg/migrations/mysql/20210301140656_add_withdraws_table.go | 2 +- pkg/migrations/mysql/20210307201830_add_deposits_table.go | 2 +- pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go | 2 +- pkg/migrations/sqlite3/20210307201830_add_deposits_table.go | 2 +- 8 files changed, 12 insertions(+), 12 deletions(-) diff --git a/migrations/mysql/20210301140656_add_withdraws_table.sql b/migrations/mysql/20210301140656_add_withdraws_table.sql index b86d45ef6..73b1efea3 100644 --- a/migrations/mysql/20210301140656_add_withdraws_table.sql +++ b/migrations/mysql/20210301140656_add_withdraws_table.sql @@ -8,11 +8,11 @@ CREATE TABLE `withdraws` -- asset is the asset name (currency) `asset` VARCHAR(10) NOT NULL, - `address` VARCHAR(64) NOT NULL, + `address` VARCHAR(128) NOT NULL, `network` VARCHAR(32) NOT NULL DEFAULT '', `amount` DECIMAL(16, 8) NOT NULL, - `txn_id` VARCHAR(64) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', `time` DATETIME(3) NOT NULL, diff --git a/migrations/mysql/20210307201830_add_deposits_table.sql b/migrations/mysql/20210307201830_add_deposits_table.sql index bf9b7e6af..dba549e52 100644 --- a/migrations/mysql/20210307201830_add_deposits_table.sql +++ b/migrations/mysql/20210307201830_add_deposits_table.sql @@ -8,9 +8,9 @@ CREATE TABLE `deposits` -- asset is the asset name (currency) `asset` VARCHAR(10) NOT NULL, - `address` VARCHAR(64) NOT NULL DEFAULT '', + `address` VARCHAR(128) NOT NULL DEFAULT '', `amount` DECIMAL(16, 8) NOT NULL, - `txn_id` VARCHAR(64) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, `time` DATETIME(3) NOT NULL, PRIMARY KEY (`gid`), diff --git a/migrations/sqlite3/20210301140656_add_withdraws_table.sql b/migrations/sqlite3/20210301140656_add_withdraws_table.sql index aa4649b7a..d1b2cb282 100644 --- a/migrations/sqlite3/20210301140656_add_withdraws_table.sql +++ b/migrations/sqlite3/20210301140656_add_withdraws_table.sql @@ -8,11 +8,11 @@ CREATE TABLE `withdraws` -- asset is the asset name (currency) `asset` VARCHAR(10) NOT NULL, - `address` VARCHAR(64) NOT NULL, + `address` VARCHAR(128) NOT NULL, `network` VARCHAR(32) NOT NULL DEFAULT '', `amount` DECIMAL(16, 8) NOT NULL, - `txn_id` VARCHAR(64) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0, `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '', `time` DATETIME(3) NOT NULL diff --git a/migrations/sqlite3/20210307201830_add_deposits_table.sql b/migrations/sqlite3/20210307201830_add_deposits_table.sql index 012e1960d..2c8e0baf8 100644 --- a/migrations/sqlite3/20210307201830_add_deposits_table.sql +++ b/migrations/sqlite3/20210307201830_add_deposits_table.sql @@ -8,9 +8,9 @@ CREATE TABLE `deposits` -- asset is the asset name (currency) `asset` VARCHAR(10) NOT NULL, - `address` VARCHAR(64) NOT NULL DEFAULT '', + `address` VARCHAR(128) NOT NULL DEFAULT '', `amount` DECIMAL(16, 8) NOT NULL, - `txn_id` VARCHAR(64) NOT NULL, + `txn_id` VARCHAR(256) NOT NULL, `time` DATETIME(3) NOT NULL ); -- +end diff --git a/pkg/migrations/mysql/20210301140656_add_withdraws_table.go b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go index 2c8030d3d..3f6c296a4 100644 --- a/pkg/migrations/mysql/20210301140656_add_withdraws_table.go +++ b/pkg/migrations/mysql/20210301140656_add_withdraws_table.go @@ -14,7 +14,7 @@ func init() { func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") if err != nil { return err } diff --git a/pkg/migrations/mysql/20210307201830_add_deposits_table.go b/pkg/migrations/mysql/20210307201830_add_deposits_table.go index 2ab5a164a..753658b03 100644 --- a/pkg/migrations/mysql/20210307201830_add_deposits_table.go +++ b/pkg/migrations/mysql/20210307201830_add_deposits_table.go @@ -14,7 +14,7 @@ func init() { func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);") if err != nil { return err } diff --git a/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go index 722a26d21..d6e4d0a5f 100644 --- a/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go +++ b/pkg/migrations/sqlite3/20210301140656_add_withdraws_table.go @@ -14,7 +14,7 @@ func init() { func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL\n);") if err != nil { return err } diff --git a/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go index 350c0f200..0416c5439 100644 --- a/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go +++ b/pkg/migrations/sqlite3/20210307201830_add_deposits_table.go @@ -14,7 +14,7 @@ func init() { func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(64) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(64) NOT NULL,\n `time` DATETIME(3) NOT NULL\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL\n);") if err != nil { return err } From 75c6a2791c9cd386f23880d4b7bead3dbcafc4fb Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:43:00 +0800 Subject: [PATCH 22/26] reduce log --- pkg/exchange/batch/batch.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 14bb48dde..e18c83347 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -48,7 +48,6 @@ func (e ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTi for _, o := range orders { if _, ok := orderIDs[o.OrderID]; ok { - logrus.Infof("skipping duplicated order id: %d", o.OrderID) continue } @@ -224,7 +223,6 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti for _, o := range rewards { if _, ok := rewardKeys[o.UUID]; ok { - logrus.Debugf("skipping duplicated order id: %s", o.UUID) continue } From 0246e298d2c5c8ea7ec601c049725fccc260dc7b Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:43:19 +0800 Subject: [PATCH 23/26] apply launch date if since time is empty --- pkg/exchange/max/exchange.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index bb03b7deb..6a8082c35 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -515,6 +515,15 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, startTime := since limit := 1000 txIDs := map[string]struct{}{} + + emptyTime := time.Time{} + if startTime == emptyTime { + startTime, err = e.getLaunchDate() + if err != nil { + return nil, err + } + } + for startTime.Before(until) { // startTime ~ endTime must be in 90 days endTime := startTime.AddDate(0, 0, 60) From 54ba240317ef35023365fd686962804cf31983e4 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:43:42 +0800 Subject: [PATCH 24/26] implement deposit sync --- pkg/service/deposit.go | 47 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/pkg/service/deposit.go b/pkg/service/deposit.go index 1df6a95a5..2a066daf0 100644 --- a/pkg/service/deposit.go +++ b/pkg/service/deposit.go @@ -1,6 +1,9 @@ package service import ( + "context" + "time" + "github.com/jmoiron/sqlx" "github.com/c9s/bbgo/pkg/types" @@ -10,6 +13,50 @@ type DepositService struct { DB *sqlx.DB } +// Sync syncs the withdraw records into db +func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { + txnIDs := map[string]struct{}{} + + // query descending + records, err := s.QueryLast(ex.Name(), 100) + if err != nil { + return err + } + + for _, record := range records { + txnIDs[record.TransactionID] = struct{}{} + } + + transferApi, ok := ex.(types.ExchangeTransferService) + if !ok { + return ErrNotImplemented + } + + since := time.Time{} + if len(records) > 0 { + since = records[len(records)-1].Time.Time() + } + + // asset "" means all assets + deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now()) + if err != nil { + return err + } + + for _, deposit := range deposits { + if _, exists := txnIDs[deposit.TransactionID]; exists { + continue + } + + if err := s.Insert(deposit); err != nil { + return err + } + } + + return nil +} + + func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) { sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ From 38b9baf3407ce30aebfb03a51eb2d7fa97cb8805 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:44:02 +0800 Subject: [PATCH 25/26] connect sync with deposit and withdraw services --- pkg/service/sync.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/pkg/service/sync.go b/pkg/service/sync.go index 74cae1181..69f68ed39 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -31,6 +31,18 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc } } + if err := s.DepositService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { + return err + } + } + + if err := s.WithdrawService.Sync(ctx, exchange); err != nil { + if err != ErrNotImplemented { + return err + } + } + if err := s.RewardService.Sync(ctx, exchange); err != nil { if err != ErrExchangeRewardServiceNotImplemented { return err From afb81056946fec798b6d6c21aafe83c130a039c8 Mon Sep 17 00:00:00 2001 From: c9s Date: Sun, 14 Mar 2021 10:44:16 +0800 Subject: [PATCH 26/26] add reward service todo --- pkg/service/reward.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/service/reward.go b/pkg/service/reward.go index 8d09c8a16..c90f73abb 100644 --- a/pkg/service/reward.go +++ b/pkg/service/reward.go @@ -15,6 +15,10 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +// RewardService collects the reward records from the exchange, +// currently it's only available for MAX exchange. +// TODO: add summary query for calculating the reward amounts +// CREATE VIEW reward_summary_by_years AS SELECT YEAR(created_at) as year, reward_type, currency, SUM(quantity) FROM rewards WHERE reward_type != 'airdrop' GROUP BY YEAR(created_at), reward_type, currency ORDER BY year DESC; type RewardService struct { DB *sqlx.DB }