Merge pull request #150 from c9s/fix/pnl

feature: add deposit service and withdraw service for sync
This commit is contained in:
Yo-An Lin 2021-03-15 09:01:24 +08:00 committed by GitHub
commit e0d7fefbf2
31 changed files with 1062 additions and 289 deletions

View File

@ -10,6 +10,7 @@ services:
before_install: before_install:
- mysql -e 'CREATE DATABASE bbgo;' - mysql -e 'CREATE DATABASE bbgo;'
- mysql -e 'CREATE DATABASE bbgo_dev;'
install: install:
- go get github.com/c9s/rockhopper/cmd/rockhopper - go get github.com/c9s/rockhopper/cmd/rockhopper

View File

@ -341,6 +341,7 @@ for lorca
make embed && go run -tags web ./cmd/bbgo-lorca make embed && go run -tags web ./cmd/bbgo-lorca
``` ```
## Support ## Support
### By contributing pull requests ### By contributing pull requests

View File

@ -0,0 +1,28 @@
-- +up
-- +begin
CREATE TABLE `withdraws`
(
`gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`exchange` VARCHAR(24) NOT NULL DEFAULT '',
-- asset is the asset name (currency)
`asset` VARCHAR(10) NOT NULL,
`address` VARCHAR(128) NOT NULL,
`network` VARCHAR(32) NOT NULL DEFAULT '',
`amount` DECIMAL(16, 8) NOT NULL,
`txn_id` VARCHAR(256) NOT NULL,
`txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,
`txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',
`time` DATETIME(3) NOT NULL,
PRIMARY KEY (`gid`),
UNIQUE KEY `txn_id` (`exchange`, `txn_id`)
);
-- +end
-- +down
-- +begin
DROP TABLE IF EXISTS `withdraws`;
-- +end

View File

@ -0,0 +1,26 @@
-- +up
-- +begin
CREATE TABLE `deposits`
(
`gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,
`exchange` VARCHAR(24) NOT NULL,
-- asset is the asset name (currency)
`asset` VARCHAR(10) NOT NULL,
`address` VARCHAR(128) NOT NULL DEFAULT '',
`amount` DECIMAL(16, 8) NOT NULL,
`txn_id` VARCHAR(256) NOT NULL,
`time` DATETIME(3) NOT NULL,
PRIMARY KEY (`gid`),
UNIQUE KEY `txn_id` (`exchange`, `txn_id`)
);
-- +end
-- +down
-- +begin
DROP TABLE IF EXISTS `deposits`;
-- +end

View File

@ -0,0 +1,36 @@
-- +up
-- +begin
CREATE TABLE `withdraws`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR(24) NOT NULL DEFAULT '',
-- asset is the asset name (currency)
`asset` VARCHAR(10) NOT NULL,
`address` VARCHAR(128) NOT NULL,
`network` VARCHAR(32) NOT NULL DEFAULT '',
`amount` DECIMAL(16, 8) NOT NULL,
`txn_id` VARCHAR(256) NOT NULL,
`txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,
`txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',
`time` DATETIME(3) NOT NULL
);
-- +end
-- +begin
CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`);
-- +end
-- +down
-- +begin
DROP INDEX IF EXISTS `withdraws_txn_id`;
-- +end
-- +begin
DROP TABLE IF EXISTS `withdraws`;
-- +end

View File

@ -0,0 +1,31 @@
-- +up
-- +begin
CREATE TABLE `deposits`
(
`gid` INTEGER PRIMARY KEY AUTOINCREMENT,
`exchange` VARCHAR(24) NOT NULL,
-- asset is the asset name (currency)
`asset` VARCHAR(10) NOT NULL,
`address` VARCHAR(128) NOT NULL DEFAULT '',
`amount` DECIMAL(16, 8) NOT NULL,
`txn_id` VARCHAR(256) NOT NULL,
`time` DATETIME(3) NOT NULL
);
-- +end
-- +begin
CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`);
-- +end
-- +down
-- +begin
DROP INDEX IF EXISTS `deposits_txn_id`;
-- +end
-- +begin
DROP TABLE IF EXISTS `deposits`;
-- +end

View File

@ -161,9 +161,11 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver
environ.RewardService = &service.RewardService{DB: db} environ.RewardService = &service.RewardService{DB: db}
environ.SyncService = &service.SyncService{ environ.SyncService = &service.SyncService{
TradeService: environ.TradeService, TradeService: environ.TradeService,
OrderService: environ.OrderService, OrderService: environ.OrderService,
RewardService: environ.RewardService, RewardService: environ.RewardService,
WithdrawService: &service.WithdrawService{DB: db},
DepositService: &service.DepositService{DB: db},
} }
return nil return nil

View File

@ -2,7 +2,10 @@ package cmd
import ( import (
"context" "context"
"fmt"
"os"
"strings" "strings"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -11,14 +14,14 @@ import (
"github.com/c9s/bbgo/pkg/accounting" "github.com/c9s/bbgo/pkg/accounting"
"github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
func init() { func init() {
PnLCmd.Flags().String("exchange", "", "target exchange") PnLCmd.Flags().String("session", "", "target exchange")
PnLCmd.Flags().String("symbol", "BTCUSDT", "trading symbol") PnLCmd.Flags().String("symbol", "", "trading symbol")
PnLCmd.Flags().Bool("include-transfer", false, "convert transfer records into trades")
PnLCmd.Flags().Int("limit", 500, "number of trades") PnLCmd.Flags().Int("limit", 500, "number of trades")
RootCmd.AddCommand(PnLCmd) RootCmd.AddCommand(PnLCmd)
} }
@ -30,12 +33,26 @@ var PnLCmd = &cobra.Command{
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
ctx := context.Background() ctx := context.Background()
exchangeNameStr, err := cmd.Flags().GetString("exchange") configFile, err := cmd.Flags().GetString("config")
if err != nil { if err != nil {
return err return err
} }
exchangeName, err := types.ValidExchangeName(exchangeNameStr) if len(configFile) == 0 {
return errors.New("--config option is required")
}
if _, err := os.Stat(configFile); os.IsNotExist(err) {
return err
}
userConfig, err := bbgo.Load(configFile, false)
if err != nil {
return err
}
sessionName, err := cmd.Flags().GetString("session")
if err != nil { if err != nil {
return err return err
} }
@ -45,23 +62,73 @@ var PnLCmd = &cobra.Command{
return err return err
} }
if len(symbol) == 0 {
return errors.New("--symbol [SYMBOL] is required")
}
limit, err := cmd.Flags().GetInt("limit") limit, err := cmd.Flags().GetInt("limit")
if err != nil { if err != nil {
return err return err
} }
exchange, err := cmdutil.NewExchange(exchangeName) environ := bbgo.NewEnvironment()
if err := environ.ConfigureDatabase(ctx); err != nil {
return err
}
if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err
}
session, ok := environ.Session(sessionName)
if !ok {
return fmt.Errorf("session %s not found", sessionName)
}
if err := environ.Sync(ctx) ; err != nil {
return err
}
exchange := session.Exchange
market, ok := session.Market(symbol)
if !ok {
return fmt.Errorf("market config %s not found", symbol)
}
since := time.Now().AddDate(-1, 0, 0)
until := time.Now()
includeTransfer, err := cmd.Flags().GetBool("include-transfer")
if err != nil { if err != nil {
return err return err
} }
if includeTransfer {
transferService, ok := exchange.(types.ExchangeTransferService)
if !ok {
return fmt.Errorf("session exchange %s does not implement transfer service", sessionName)
}
environ := bbgo.NewEnvironment() deposits, err := transferService.QueryDepositHistory(ctx, market.BaseCurrency, since, until)
if err := environ.ConfigureDatabase(ctx) ; err != nil { if err != nil {
return err return err
}
_ = deposits
withdrawals, err := transferService.QueryWithdrawHistory(ctx, market.BaseCurrency, since, until)
if err != nil {
return err
}
_ = withdrawals
// we need the backtest klines for the daily prices
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if err := backtestService.SyncKLineByInterval(ctx, exchange, symbol, types.Interval1d, since, until); err != nil {
return err
}
} }
var trades []types.Trade var trades []types.Trade
tradingFeeCurrency := exchange.PlatformFeeCurrency() tradingFeeCurrency := exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) { if strings.HasPrefix(symbol, tradingFeeCurrency) {
@ -71,7 +138,7 @@ var PnLCmd = &cobra.Command{
trades, err = environ.TradeService.Query(service.QueryTradesOptions{ trades, err = environ.TradeService.Query(service.QueryTradesOptions{
Exchange: exchange.Name(), Exchange: exchange.Name(),
Symbol: symbol, Symbol: symbol,
Limit: limit, Limit: limit,
}) })
} }

View File

@ -101,7 +101,11 @@ var TransferHistoryCmd = &cobra.Command{
var records timeSlice var records timeSlice
exchange := session.Exchange exchange, ok := session.Exchange.(types.ExchangeTransferService)
if !ok {
return fmt.Errorf("exchange session %s does not implement transfer service", sessionName)
}
deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until) deposits, err := exchange.QueryDepositHistory(ctx, asset, since, until)
if err != nil { if err != nil {
return err return err

View File

@ -48,7 +48,6 @@ func (e ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTi
for _, o := range orders { for _, o := range orders {
if _, ok := orderIDs[o.OrderID]; ok { if _, ok := orderIDs[o.OrderID]; ok {
logrus.Infof("skipping duplicated order id: %d", o.OrderID)
continue continue
} }
@ -224,7 +223,6 @@ func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Ti
for _, o := range rewards { for _, o := range rewards {
if _, ok := rewardKeys[o.UUID]; ok { if _, ok := rewardKeys[o.UUID]; ok {
logrus.Debugf("skipping duplicated order id: %s", o.UUID)
continue continue
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
@ -193,9 +194,27 @@ func (e *Exchange) QueryIsolatedMarginAccount(ctx context.Context, symbols ...st
return toGlobalIsolatedMarginAccount(account), nil return toGlobalIsolatedMarginAccount(account), nil
} }
func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { func (e *Exchange) getLaunchDate() (time.Time, error) {
// binance launch date 12:00 July 14th, 2017
loc, err := time.LoadLocation("Asia/Shanghai")
if err != nil {
return time.Time{}, err
}
return time.Date(2017, time.July, 14, 0, 0, 0, 0, loc), nil
}
func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) {
startTime := since startTime := since
var emptyTime = time.Time{}
if startTime == emptyTime {
startTime, err = e.getLaunchDate()
if err != nil {
return nil, err
}
}
txIDs := map[string]struct{}{} txIDs := map[string]struct{}{}
for startTime.Before(until) { for startTime.Before(until) {
@ -247,7 +266,8 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
txIDs[d.TxID] = struct{}{} txIDs[d.TxID] = struct{}{}
allWithdraws = append(allWithdraws, types.Withdraw{ allWithdraws = append(allWithdraws, types.Withdraw{
ApplyTime: time.Unix(0, d.ApplyTime*int64(time.Millisecond)), Exchange: types.ExchangeBinance,
ApplyTime: datatype.Time(time.Unix(0, d.ApplyTime*int64(time.Millisecond))),
Asset: d.Asset, Asset: d.Asset,
Amount: d.Amount, Amount: d.Amount,
Address: d.Address, Address: d.Address,
@ -311,7 +331,8 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
txIDs[d.TxID] = struct{}{} txIDs[d.TxID] = struct{}{}
allDeposits = append(allDeposits, types.Deposit{ allDeposits = append(allDeposits, types.Deposit{
Time: time.Unix(0, d.InsertTime*int64(time.Millisecond)), Exchange: types.ExchangeBinance,
Time: datatype.Time(time.Unix(0, d.InsertTime*int64(time.Millisecond))),
Asset: d.Asset, Asset: d.Asset,
Amount: d.Amount, Amount: d.Amount,
Address: d.Address, Address: d.Address,

View File

@ -14,6 +14,7 @@ import (
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/datatype"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi" maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -376,6 +377,16 @@ func (e *Exchange) PlatformFeeCurrency() string {
return toGlobalCurrency("max") return toGlobalCurrency("max")
} }
func (e *Exchange) getLaunchDate() (time.Time, error) {
// MAX launch date June 21th, 2018
loc, err := time.LoadLocation("Asia/Taipei")
if err != nil {
return time.Time{}, err
}
return time.Date(2018, time.June, 21, 0, 0, 0, 0, loc), nil
}
func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
if err := accountQueryLimiter.Wait(ctx); err != nil { if err := accountQueryLimiter.Wait(ctx); err != nil {
return nil, err return nil, err
@ -406,10 +417,19 @@ func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) { func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []types.Withdraw, err error) {
startTime := since startTime := since
limit := 1000
txIDs := map[string]struct{}{} txIDs := map[string]struct{}{}
emptyTime := time.Time{}
if startTime == emptyTime {
startTime, err = e.getLaunchDate()
if err != nil {
return nil, err
}
}
for startTime.Before(until) { for startTime.Before(until) {
// startTime ~ endTime must be in 90 days // startTime ~ endTime must be in 60 days
endTime := startTime.AddDate(0, 0, 60) endTime := startTime.AddDate(0, 0, 60)
if endTime.After(until) { if endTime.After(until) {
endTime = until endTime = until
@ -424,13 +444,20 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
withdraws, err := req. withdraws, err := req.
From(startTime.Unix()). From(startTime.Unix()).
To(endTime.Unix()). To(endTime.Unix()).
Limit(limit).
Do(ctx) Do(ctx)
if err != nil { if err != nil {
return allWithdraws, err return allWithdraws, err
} }
for _, d := range withdraws { if len(withdraws) == 0 {
startTime = endTime
continue
}
for i := len(withdraws) - 1; i >= 0; i-- {
d := withdraws[i]
if _, ok := txIDs[d.TxID]; ok { if _, ok := txIDs[d.TxID]; ok {
continue continue
} }
@ -455,21 +482,30 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
} }
txIDs[d.TxID] = struct{}{} txIDs[d.TxID] = struct{}{}
allWithdraws = append(allWithdraws, types.Withdraw{ withdraw := types.Withdraw{
ApplyTime: time.Unix(d.CreatedAt, 0), Exchange: types.ExchangeMax,
Asset: toGlobalCurrency(d.Currency), ApplyTime: datatype.Time(time.Unix(d.CreatedAt, 0)),
Amount: util.MustParseFloat(d.Amount), Asset: toGlobalCurrency(d.Currency),
Address: "", Amount: util.MustParseFloat(d.Amount),
AddressTag: "", Address: "",
TransactionID: d.TxID, AddressTag: "",
TransactionFee: util.MustParseFloat(d.Fee), TransactionID: d.TxID,
TransactionFee: util.MustParseFloat(d.Fee),
TransactionFeeCurrency: d.FeeCurrency,
// WithdrawOrderID: d.WithdrawOrderID, // WithdrawOrderID: d.WithdrawOrderID,
// Network: d.Network, // Network: d.Network,
Status: status, Status: status,
}) }
allWithdraws = append(allWithdraws, withdraw)
} }
startTime = endTime // go next time frame
if len(withdraws) < limit {
startTime = endTime
} else {
// its in descending order, so we get the first record
startTime = time.Unix(withdraws[0].CreatedAt, 0)
}
} }
return allWithdraws, nil return allWithdraws, nil
@ -477,7 +513,17 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) { func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) {
startTime := since startTime := since
limit := 1000
txIDs := map[string]struct{}{} txIDs := map[string]struct{}{}
emptyTime := time.Time{}
if startTime == emptyTime {
startTime, err = e.getLaunchDate()
if err != nil {
return nil, err
}
}
for startTime.Before(until) { for startTime.Before(until) {
// startTime ~ endTime must be in 90 days // startTime ~ endTime must be in 90 days
endTime := startTime.AddDate(0, 0, 60) endTime := startTime.AddDate(0, 0, 60)
@ -486,6 +532,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
} }
log.Infof("querying deposit history %s: %s <=> %s", asset, startTime, endTime) log.Infof("querying deposit history %s: %s <=> %s", asset, startTime, endTime)
req := e.client.AccountService.NewGetDepositHistoryRequest() req := e.client.AccountService.NewGetDepositHistoryRequest()
if len(asset) > 0 { if len(asset) > 0 {
req.Currency(toLocalCurrency(asset)) req.Currency(toLocalCurrency(asset))
@ -493,19 +540,23 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
deposits, err := req. deposits, err := req.
From(startTime.Unix()). From(startTime.Unix()).
To(endTime.Unix()).Do(ctx) To(endTime.Unix()).
Limit(limit).
Do(ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, d := range deposits { for i := len(deposits) - 1; i >= 0; i-- {
d := deposits[i]
if _, ok := txIDs[d.TxID]; ok { if _, ok := txIDs[d.TxID]; ok {
continue continue
} }
allDeposits = append(allDeposits, types.Deposit{ allDeposits = append(allDeposits, types.Deposit{
Time: time.Unix(d.CreatedAt, 0), Exchange: types.ExchangeMax,
Time: datatype.Time(time.Unix(d.CreatedAt, 0)),
Amount: util.MustParseFloat(d.Amount), Amount: util.MustParseFloat(d.Amount),
Asset: toGlobalCurrency(d.Currency), Asset: toGlobalCurrency(d.Currency),
Address: "", // not supported Address: "", // not supported
@ -515,7 +566,11 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
}) })
} }
startTime = endTime if len(deposits) < limit {
startTime = endTime
} else {
startTime = time.Unix(deposits[0].CreatedAt, 0)
}
} }
return allDeposits, err return allDeposits, err

View File

@ -118,7 +118,7 @@ type Deposit struct {
Fee string `json:"fee"` Fee string `json:"fee"`
TxID string `json:"txid"` TxID string `json:"txid"`
State string `json:"state"` State string `json:"state"`
Confirmations string `json:"confirmations"` Confirmations string `json:"confirmations"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"` UpdatedAt int64 `json:"updated_at"`
} }
@ -187,14 +187,13 @@ func (s *AccountService) NewGetDepositHistoryRequest() *GetDepositHistoryRequest
} }
} }
type Withdraw struct { type Withdraw struct {
UUID string `json:"uuid"` UUID string `json:"uuid"`
Currency string `json:"currency"` Currency string `json:"currency"`
CurrencyVersion string `json:"currency_version"` // "eth" CurrencyVersion string `json:"currency_version"` // "eth"
Amount string `json:"amount"` Amount string `json:"amount"`
Fee string `json:"fee"` Fee string `json:"fee"`
FeeCurrency string `json:"fee_currency"`
TxID string `json:"txid"` TxID string `json:"txid"`
// State can be "submitting", "submitted", // State can be "submitting", "submitted",
@ -203,10 +202,10 @@ type Withdraw struct {
// "failed", "pending", "confirmed", // "failed", "pending", "confirmed",
// "kgi_manually_processing", "kgi_manually_confirmed", "kgi_possible_failed", // "kgi_manually_processing", "kgi_manually_confirmed", "kgi_possible_failed",
// "sygna_verifying" // "sygna_verifying"
State string `json:"state"` State string `json:"state"`
Confirmations int `json:"confirmations"` Confirmations int `json:"confirmations"`
CreatedAt int64 `json:"created_at"` CreatedAt int64 `json:"created_at"`
UpdatedAt int64 `json:"updated_at"` UpdatedAt int64 `json:"updated_at"`
} }
type GetWithdrawHistoryRequestParams struct { type GetWithdrawHistoryRequestParams struct {

View File

@ -0,0 +1,34 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddWithdrawsTable, downAddWithdrawsTable)
}
func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);")
if err != nil {
return err
}
return err
}
func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,34 @@
package mysql
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddDepositsTable, downAddDepositsTable)
}
func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `txn_id` (`exchange`, `txn_id`)\n);")
if err != nil {
return err
}
return err
}
func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,44 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddWithdrawsTable, downAddWithdrawsTable)
}
func upAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `withdraws`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL,\n `network` VARCHAR(32) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `txn_fee` DECIMAL(16, 8) NOT NULL DEFAULT 0,\n `txn_fee_currency` VARCHAR(32) NOT NULL DEFAULT '',\n `time` DATETIME(3) NOT NULL\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `withdraws_txn_id` ON `withdraws` (`exchange`, `txn_id`);")
if err != nil {
return err
}
return err
}
func downAddWithdrawsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `withdraws_txn_id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `withdraws`;")
if err != nil {
return err
}
return err
}

View File

@ -0,0 +1,44 @@
package sqlite3
import (
"context"
"github.com/c9s/rockhopper"
)
func init() {
AddMigration(upAddDepositsTable, downAddDepositsTable)
}
func upAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `deposits`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n `exchange` VARCHAR(24) NOT NULL,\n -- asset is the asset name (currency)\n `asset` VARCHAR(10) NOT NULL,\n `address` VARCHAR(128) NOT NULL DEFAULT '',\n `amount` DECIMAL(16, 8) NOT NULL,\n `txn_id` VARCHAR(256) NOT NULL,\n `time` DATETIME(3) NOT NULL\n);")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "CREATE UNIQUE INDEX `deposits_txn_id` ON `deposits` (`exchange`, `txn_id`);")
if err != nil {
return err
}
return err
}
func downAddDepositsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is rolled back.
_, err = tx.ExecContext(ctx, "DROP INDEX IF EXISTS `deposits_txn_id`;")
if err != nil {
return err
}
_, err = tx.ExecContext(ctx, "DROP TABLE IF EXISTS `deposits`;")
if err != nil {
return err
}
return err
}

View File

@ -17,33 +17,41 @@ type BacktestService struct {
DB *sqlx.DB DB *sqlx.DB
} }
func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
now := time.Now() log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
for interval := range types.SupportedIntervals {
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval) lastKLine, err := s.QueryLast(exchange.Name(), symbol, interval)
if err != nil { if err != nil {
return err
}
if lastKLine != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime)
startTime = lastKLine.StartTime.Add(time.Minute)
}
batch := &batch2.KLineBatchQuery{Exchange: exchange}
// should use channel here
klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime)
// var previousKLine types.KLine
for k := range klineC {
if err := s.Insert(k); err != nil {
return err return err
} }
}
if lastKLine != nil { if err := <-errC; err != nil {
log.Infof("found last checkpoint %s", lastKLine.EndTime) return err
startTime = lastKLine.StartTime.Add(time.Minute) }
}
batch := &batch2.KLineBatchQuery{Exchange: exchange} return nil
}
// should use channel here func (s *BacktestService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
klineC, errC := batch.Query(ctx, symbol, interval, startTime, now) endTime := time.Now()
// var previousKLine types.KLine for interval := range types.SupportedIntervals {
for k := range klineC { if err := s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime) ; err != nil {
if err := s.Insert(k); err != nil {
return err
}
}
if err := <-errC; err != nil {
return err return err
} }
} }

107
pkg/service/deposit.go Normal file
View File

@ -0,0 +1,107 @@
package service
import (
"context"
"time"
"github.com/jmoiron/sqlx"
"github.com/c9s/bbgo/pkg/types"
)
type DepositService struct {
DB *sqlx.DB
}
// Sync syncs the withdraw records into db
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 100)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].Time.Time()
}
// asset "" means all assets
deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, deposit := range deposits {
if _, exists := txnIDs[deposit.TransactionID]; exists {
continue
}
if err := s.Insert(deposit); err != nil {
return err
}
}
return nil
}
func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) {
sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"limit": limit,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *DepositService) Query(exchangeName types.ExchangeName) ([]types.Deposit, error) {
args := map[string]interface{}{
"exchange": exchangeName,
}
sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` ASC"
rows, err := s.DB.NamedQuery(sql, args)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *DepositService) scanRows(rows *sqlx.Rows) (deposits []types.Deposit, err error) {
for rows.Next() {
var deposit types.Deposit
if err := rows.StructScan(&deposit); err != nil {
return deposits, err
}
deposits = append(deposits, deposit)
}
return deposits, rows.Err()
}
func (s *DepositService) Insert(deposit types.Deposit) error {
sql := `INSERT INTO deposits (exchange, asset, address, amount, txn_id, time)
VALUES (:exchange, :asset, :address, :amount, :txn_id, :time)`
_, err := s.DB.NamedExec(sql, deposit)
return err
}

View File

@ -0,0 +1,39 @@
package service
import (
"testing"
"time"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/types"
)
func TestDepositService(t *testing.T) {
db, err := prepareDB(t)
if err != nil {
t.Fatal(err)
}
defer db.Close()
xdb := sqlx.NewDb(db.DB, "sqlite3")
service := &DepositService{DB: xdb}
err = service.Insert(types.Deposit{
Exchange: types.ExchangeMax,
Time: datatype.Time(time.Now()),
Amount: 0.001,
Asset: "BTC",
Address: "test",
TransactionID: "02",
Status: types.DepositSuccess,
})
assert.NoError(t, err)
deposits, err := service.Query(types.ExchangeMax)
assert.NoError(t, err)
assert.NotEmpty(t, deposits)
}

View File

@ -1,13 +1,16 @@
package service package service
import ( import (
"context"
"strconv" "strconv"
"strings" "strings"
"time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -15,6 +18,65 @@ type OrderService struct {
DB *sqlx.DB DB *sqlx.DB
} }
func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
orderKeys := make(map[uint64]struct{})
var lastID uint64 = 0
if len(records) > 0 {
for _, record := range records {
orderKeys[record.OrderID] = struct{}{}
}
lastID = records[0].OrderID
startTime = records[0].CreationTime.Time()
}
b := &batch.ClosedOrderBatchQuery{Exchange: exchange}
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, exists := orderKeys[order.OrderID]; exists {
continue
}
if err := s.Insert(order); err != nil {
return err
}
}
return <-errC
}
// QueryLast queries the last order from the database // QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isIsolated bool, limit int) ([]types.Order, error) { func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isIsolated bool, limit int) ([]types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated) log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated)

View File

@ -8,11 +8,17 @@ import (
"time" "time"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
// RewardService collects the reward records from the exchange,
// currently it's only available for MAX exchange.
// TODO: add summary query for calculating the reward amounts
// CREATE VIEW reward_summary_by_years AS SELECT YEAR(created_at) as year, reward_type, currency, SUM(quantity) FROM rewards WHERE reward_type != 'airdrop' GROUP BY YEAR(created_at), reward_type, currency ORDER BY year DESC;
type RewardService struct { type RewardService struct {
DB *sqlx.DB DB *sqlx.DB
} }
@ -31,6 +37,64 @@ func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Rew
return s.scanRows(rows) return s.scanRows(rows)
} }
func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange) error {
service, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrExchangeRewardServiceNotImplemented
}
var rewardKeys = map[string]struct{}{}
var startTime time.Time
records, err := s.QueryLast(exchange.Name(), 50)
if err != nil {
return err
}
if len(records) > 0 {
lastRecord := records[0]
startTime = lastRecord.CreatedAt.Time()
for _, record := range records {
rewardKeys[record.UUID] = struct{}{}
}
}
batchQuery := &batch.RewardBatchQuery{Service: service}
rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now())
for reward := range rewardsC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, ok := rewardKeys[reward.UUID]; ok {
continue
}
logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt)
if err := s.Insert(reward); err != nil {
return err
}
}
return <-errC
}
type CurrencyPositionMap map[string]fixedpoint.Value type CurrencyPositionMap map[string]fixedpoint.Value
func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) { func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) {

View File

@ -5,221 +5,46 @@ import (
"errors" "errors"
"time" "time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") var ErrNotImplemented = errors.New("not implemented")
var ErrExchangeRewardServiceNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface")
type SyncService struct { type SyncService struct {
TradeService *TradeService TradeService *TradeService
OrderService *OrderService OrderService *OrderService
RewardService *RewardService RewardService *RewardService
} WithdrawService *WithdrawService
DepositService *DepositService
func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error {
service, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrNotImplemented
}
var rewardKeys = map[string]struct{}{}
var startTime time.Time
records, err := s.RewardService.QueryLast(exchange.Name(), 50)
if err != nil {
return err
}
if len(records) > 0 {
lastRecord := records[0]
startTime = lastRecord.CreatedAt.Time()
for _, record := range records {
rewardKeys[record.UUID] = struct{}{}
}
}
batchQuery := &batch.RewardBatchQuery{Service: service}
rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now())
for reward := range rewardsC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, ok := rewardKeys[reward.UUID]; ok {
continue
}
logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt)
if err := s.RewardService.Insert(reward); err != nil {
return err
}
}
return <-errC
}
func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
records, err := s.OrderService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
orderKeys := make(map[uint64]struct{})
var lastID uint64 = 0
if len(records) > 0 {
for _, record := range records {
orderKeys[record.OrderID] = struct{}{}
}
lastID = records[0].OrderID
startTime = records[0].CreationTime.Time()
}
b := &batch.ClosedOrderBatchQuery{Exchange: exchange}
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
for order := range ordersC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, exists := orderKeys[order.OrderID]; exists {
continue
}
if err := s.OrderService.Insert(order); err != nil {
return err
}
}
return <-errC
}
func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, symbol string) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
// records descending ordered
records, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID int64 = 1
if len(records) > 0 {
for _, record := range records {
tradeKeys[record.Key()] = struct{}{}
}
lastTradeID = records[0].ID
}
b := &batch.TradeBatchQuery{Exchange: exchange}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
})
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
key := trade.Key()
if _, exists := tradeKeys[key]; exists {
continue
}
tradeKeys[key] = struct{}{}
logrus.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s",
trade.Exchange,
trade.ID,
trade.Symbol,
trade.Side,
trade.Price,
trade.Quantity,
trade.MakerOrTakerLabel(),
trade.Time.String())
if err := s.TradeService.Insert(trade); err != nil {
return err
}
}
return <-errC
} }
// SyncSessionSymbols syncs the trades from the given exchange session // SyncSessionSymbols syncs the trades from the given exchange session
func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error { func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exchange, startTime time.Time, symbols ...string) error {
for _, symbol := range symbols { for _, symbol := range symbols {
if err := s.SyncTrades(ctx, exchange, symbol); err != nil { if err := s.TradeService.Sync(ctx, exchange, symbol); err != nil {
return err return err
} }
if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { if err := s.OrderService.Sync(ctx, exchange, symbol, startTime); err != nil {
return err return err
} }
}
if err := s.SyncRewards(ctx, exchange); err != nil { if err := s.DepositService.Sync(ctx, exchange); err != nil {
if err == ErrNotImplemented { if err != ErrNotImplemented {
continue return err
} }
}
if err := s.WithdrawService.Sync(ctx, exchange); err != nil {
if err != ErrNotImplemented {
return err
}
}
if err := s.RewardService.Sync(ctx, exchange); err != nil {
if err != ErrExchangeRewardServiceNotImplemented {
return err return err
} }
} }

View File

@ -11,6 +11,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -49,6 +50,78 @@ func NewTradeService(db *sqlx.DB) *TradeService {
return &TradeService{db} return &TradeService{db}
} }
func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string) error {
isMargin := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
// records descending ordered
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 50)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID int64 = 1
if len(records) > 0 {
for _, record := range records {
tradeKeys[record.Key()] = struct{}{}
}
lastTradeID = records[0].ID
}
b := &batch.TradeBatchQuery{Exchange: exchange}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
})
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
key := trade.Key()
if _, exists := tradeKeys[key]; exists {
continue
}
tradeKeys[key] = struct{}{}
log.Infof("inserting trade: %s %d %s %-4s price: %-13f volume: %-11f %5s %s",
trade.Exchange,
trade.ID,
trade.Symbol,
trade.Side,
trade.Price,
trade.Quantity,
trade.MakerOrTakerLabel(),
trade.Time.String())
if err := s.Insert(trade); err != nil {
return err
}
}
return <-errC
}
func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) { func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) {
args := map[string]interface{}{ args := map[string]interface{}{
// "symbol": symbol, // "symbol": symbol,

106
pkg/service/withdraw.go Normal file
View File

@ -0,0 +1,106 @@
package service
import (
"context"
"time"
"github.com/jmoiron/sqlx"
"github.com/c9s/bbgo/pkg/types"
)
type WithdrawService struct {
DB *sqlx.DB
}
// Sync syncs the withdraw records into db
func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 100)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].ApplyTime.Time()
}
// asset "" means all assets
withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, withdraw := range withdraws {
if _, exists := txnIDs[withdraw.TransactionID]; exists {
continue
}
if err := s.Insert(withdraw); err != nil {
return err
}
}
return nil
}
func (s *WithdrawService) QueryLast(ex types.ExchangeName, limit int) ([]types.Withdraw, error) {
sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"limit": limit,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *WithdrawService) Query(exchangeName types.ExchangeName) ([]types.Withdraw, error) {
args := map[string]interface{}{
"exchange": exchangeName,
}
sql := "SELECT * FROM `withdraws` WHERE `exchange` = :exchange ORDER BY `time` ASC"
rows, err := s.DB.NamedQuery(sql, args)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *WithdrawService) scanRows(rows *sqlx.Rows) (withdraws []types.Withdraw, err error) {
for rows.Next() {
var withdraw types.Withdraw
if err := rows.StructScan(&withdraw); err != nil {
return withdraws, err
}
withdraws = append(withdraws, withdraw)
}
return withdraws, rows.Err()
}
func (s *WithdrawService) Insert(withdrawal types.Withdraw) error {
sql := `INSERT INTO withdraws (exchange, asset, network, address, amount, txn_id, txn_fee, time)
VALUES (:exchange, :asset, :network, :address, :amount, :txn_id, :txn_fee, :time)`
_, err := s.DB.NamedExec(sql, withdrawal)
return err
}

View File

@ -0,0 +1,41 @@
package service
import (
"testing"
"time"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/types"
)
func TestWithdrawService(t *testing.T) {
db, err := prepareDB(t)
if err != nil {
t.Fatal(err)
}
defer db.Close()
xdb := sqlx.NewDb(db.DB, "sqlite3")
service := &WithdrawService{DB: xdb}
err = service.Insert(types.Withdraw{
Exchange: types.ExchangeMax,
Asset: "BTC",
Amount: 0.0001,
Address: "test",
TransactionID: "01",
TransactionFee: 0.0001,
Network: "omni",
ApplyTime: datatype.Time(time.Now()),
})
assert.NoError(t, err)
withdraws, err := service.Query(types.ExchangeMax)
assert.NoError(t, err)
assert.NotEmpty(t, withdraws)
assert.Equal(t, types.ExchangeMax, withdraws[0].Exchange)
}

View File

@ -1,6 +1,10 @@
package types package types
import "time" import (
"time"
"github.com/c9s/bbgo/pkg/datatype"
)
type DepositStatus string type DepositStatus string
@ -20,15 +24,17 @@ const (
) )
type Deposit struct { type Deposit struct {
Time time.Time `json:"time"` GID int64 `json:"gid" db:"gid"`
Amount float64 `json:"amount"` Exchange ExchangeName `json:"exchange" db:"exchange"`
Asset string `json:"asset"` Time datatype.Time `json:"time" db:"time"`
Address string `json:"address"` Amount float64 `json:"amount" db:"amount"`
Asset string `json:"asset" db:"asset"`
Address string `json:"address" db:"address"`
AddressTag string `json:"addressTag"` AddressTag string `json:"addressTag"`
TransactionID string `json:"txId"` TransactionID string `json:"transactionID" db:"txn_id"`
Status DepositStatus `json:"status"` Status DepositStatus `json:"status"`
} }
func (d Deposit) EffectiveTime() time.Time { func (d Deposit) EffectiveTime() time.Time {
return d.Time return d.Time.Time()
} }

View File

@ -2,6 +2,7 @@ package types
import ( import (
"context" "context"
"database/sql/driver"
"encoding/json" "encoding/json"
"fmt" "fmt"
"strings" "strings"
@ -14,9 +15,13 @@ const DateFormat = "2006-01-02"
type ExchangeName string type ExchangeName string
func (n *ExchangeName) Value() (driver.Value, error) {
return n.String(), nil
}
func (n *ExchangeName) UnmarshalJSON(data []byte) error { func (n *ExchangeName) UnmarshalJSON(data []byte) error {
var s string var s string
if err := json.Unmarshal(data, &s) ; err != nil { if err := json.Unmarshal(data, &s); err != nil {
return err return err
} }
@ -72,10 +77,6 @@ type Exchange interface {
QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error)
QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error)
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)
SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error) SubmitOrders(ctx context.Context, orders ...SubmitOrder) (createdOrders OrderSlice, err error)
QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error) QueryOpenOrders(ctx context.Context, symbol string) (orders []Order, err error)
@ -85,6 +86,11 @@ type Exchange interface {
CancelOrders(ctx context.Context, orders ...Order) error CancelOrders(ctx context.Context, orders ...Order) error
} }
type ExchangeTransferService interface {
QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []Deposit, err error)
QueryWithdrawHistory(ctx context.Context, asset string, since, until time.Time) (allWithdraws []Withdraw, err error)
}
type ExchangeRewardService interface { type ExchangeRewardService interface {
QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error) QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error)
} }

View File

@ -1,22 +1,33 @@
package types package types
import "time" import (
"fmt"
"time"
"github.com/c9s/bbgo/pkg/datatype"
)
type Withdraw struct { type Withdraw struct {
ID string `json:"id"` GID int64 `json:"gid" db:"gid"`
Asset string `json:"asset"` Exchange ExchangeName `json:"exchange" db:"exchange"`
Amount float64 `json:"amount"` Asset string `json:"asset" db:"asset"`
Address string `json:"address"` Amount float64 `json:"amount" db:"amount"`
AddressTag string `json:"addressTag"` Address string `json:"address" db:"address"`
Status string `json:"status"` AddressTag string `json:"addressTag"`
Status string `json:"status"`
TransactionID string `json:"txId"` TransactionID string `json:"transactionID" db:"txn_id"`
TransactionFee float64 `json:"transactionFee"` TransactionFee float64 `json:"transactionFee" db:"txn_fee"`
WithdrawOrderID string `json:"withdrawOrderId"` TransactionFeeCurrency string `json:"transactionFeeCurrency" db:"txn_fee_currency"`
ApplyTime time.Time `json:"applyTime"` WithdrawOrderID string `json:"withdrawOrderId"`
Network string `json:"network"` ApplyTime datatype.Time `json:"applyTime" db:"time"`
Network string `json:"network" db:"network"`
}
func (w Withdraw) String() string {
return fmt.Sprintf("withdraw %s %f to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time())
} }
func (w Withdraw) EffectiveTime() time.Time { func (w Withdraw) EffectiveTime() time.Time {
return w.ApplyTime return w.ApplyTime.Time()
} }

View File

@ -1,5 +1,5 @@
--- ---
driver: mysql driver: mysql
dialect: mysql dialect: mysql
dsn: "root@tcp(localhost:3306)/bbgo?parseTime=true" dsn: "root@tcp(localhost:3306)/bbgo_dev?parseTime=true"
migrationsDir: migrations/mysql migrationsDir: migrations/mysql

0
scripts/test-mysql-migrations.sh Normal file → Executable file
View File