diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index bc681092e..410df8975 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -600,7 +600,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig * } if userConfig.Sync.DepositHistory { - if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil { + if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange, since); err != nil { return err } } @@ -664,7 +664,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err } if userConfig[0].Sync.DepositHistory { - if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil { + if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange, since); err != nil { return err } } diff --git a/pkg/exchange/batch/deposit.go b/pkg/exchange/batch/deposit.go new file mode 100644 index 000000000..fdb471782 --- /dev/null +++ b/pkg/exchange/batch/deposit.go @@ -0,0 +1,36 @@ +package batch + +import ( + "context" + "time" + + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/types" +) + +type DepositBatchQuery struct { + types.ExchangeTransferService +} + +func (e *DepositBatchQuery) Query(ctx context.Context, asset string, startTime, endTime time.Time) (c chan types.Deposit, errC chan error) { + query := &AsyncTimeRangedBatchQuery{ + Type: types.Deposit{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + JumpIfEmpty: time.Hour * 24 * 80, + Q: func(startTime, endTime time.Time) (interface{}, error) { + return e.ExchangeTransferService.QueryDepositHistory(ctx, asset, startTime, endTime) + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.Deposit).Time) + }, + ID: func(obj interface{}) string { + deposit := obj.(types.Deposit) + return deposit.TransactionID + }, + } + + c = make(chan types.Deposit, 100) + errC = query.Query(ctx, c, startTime, endTime) + return c, errC +} diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 18f2612ca..24d48b48b 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -485,70 +485,57 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since } func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) { - startTime := since - var emptyTime = time.Time{} - if startTime == emptyTime { - startTime, err = getLaunchDate() + if since == emptyTime { + since, err = getLaunchDate() if err != nil { return nil, err } } - txIDs := map[string]struct{}{} - for startTime.Before(until) { - // startTime ~ endTime must be in 90 days - endTime := startTime.AddDate(0, 0, 60) - if endTime.After(until) { - endTime = until + // startTime ~ endTime must be in 90 days + historyDayRangeLimit := time.Hour * 24 * 89 + if until.Sub(since) >= historyDayRangeLimit { + until = since.Add(historyDayRangeLimit) + } + + req := e.client2.NewGetDepositHistoryRequest() + if len(asset) > 0 { + req.Coin(asset) + } + + req.StartTime(since). + EndTime(until) + + records, err := req.Do(ctx) + if err != nil { + return nil, err + } + + for _, d := range records { + // 0(0:pending,6: credited but cannot withdraw, 1:success) + // set the default status + status := types.DepositStatus(fmt.Sprintf("code: %d", d.Status)) + switch d.Status { + case 0: + status = types.DepositPending + case 6: + // https://www.binance.com/en/support/faq/115003736451 + status = types.DepositCredited + case 1: + status = types.DepositSuccess } - req := e.client.NewListDepositsService() - if len(asset) > 0 { - req.Coin(asset) - } - - deposits, err := req. - StartTime(startTime.UnixNano() / int64(time.Millisecond)). - EndTime(endTime.UnixNano() / int64(time.Millisecond)). - Do(ctx) - - if err != nil { - return nil, err - } - - for _, d := range deposits { - if _, ok := txIDs[d.TxID]; ok { - continue - } - - // 0(0:pending,6: credited but cannot withdraw, 1:success) - status := types.DepositStatus(fmt.Sprintf("code: %d", d.Status)) - - switch d.Status { - case 0: - status = types.DepositPending - case 6: - // https://www.binance.com/en/support/faq/115003736451 - status = types.DepositCredited - case 1: - status = types.DepositSuccess - } - - txIDs[d.TxID] = struct{}{} - allDeposits = append(allDeposits, types.Deposit{ - Exchange: types.ExchangeBinance, - Time: types.Time(time.Unix(0, d.InsertTime*int64(time.Millisecond))), - Asset: d.Coin, - Amount: fixedpoint.MustNewFromString(d.Amount), - Address: d.Address, - AddressTag: d.AddressTag, - TransactionID: d.TxID, - Status: status, - }) - } - - startTime = endTime + allDeposits = append(allDeposits, types.Deposit{ + Exchange: types.ExchangeBinance, + Time: types.Time(d.InsertTime.Time()), + Asset: d.Coin, + Amount: d.Amount, + Address: d.Address, + AddressTag: d.AddressTag, + TransactionID: d.TxId, + Status: status, + }) } return allDeposits, nil diff --git a/pkg/service/deposit.go b/pkg/service/deposit.go index aff2bf77d..07ad21838 100644 --- a/pkg/service/deposit.go +++ b/pkg/service/deposit.go @@ -4,8 +4,10 @@ import ( "context" "time" + sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -14,41 +16,49 @@ type DepositService struct { } // Sync syncs the withdraw records into db -func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { - txnIDs := map[string]struct{}{} - - // query descending - records, err := s.QueryLast(ex.Name(), 10) - if err != nil { - return err - } - - for _, record := range records { - txnIDs[record.TransactionID] = struct{}{} +func (s *DepositService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error { + isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex) + if isMargin || isFutures || isIsolated { + // only works in spot + return nil } transferApi, ok := ex.(types.ExchangeTransferService) if !ok { - return ErrNotImplemented + return nil } - since := time.Time{} - if len(records) > 0 { - since = records[len(records)-1].Time.Time() + tasks := []SyncTask{ + { + Type: types.Deposit{}, + Select: SelectLastDeposits(ex.Name(), 100), + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.DepositBatchQuery{ + ExchangeTransferService: transferApi, + } + return query.Query(ctx, "", startTime, endTime) + }, + Time: func(obj interface{}) time.Time { + return obj.(types.Deposit).Time.Time() + }, + ID: func(obj interface{}) string { + deposit := obj.(types.Deposit) + return deposit.TransactionID + }, + Filter: func(obj interface{}) bool { + deposit := obj.(types.Deposit) + if len(deposit.TransactionID) == 0 { + return false + } + + return true + }, + LogInsert: true, + }, } - // asset "" means all assets - deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now()) - if err != nil { - return err - } - - for _, deposit := range deposits { - if _, exists := txnIDs[deposit.TransactionID]; exists { - continue - } - - if err := s.Insert(deposit); err != nil { + for _, sel := range tasks { + if err := sel.execute(ctx, s.DB, startTime); err != nil { return err } } @@ -56,20 +66,6 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error { return nil } -func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) { - sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit" - rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ - "exchange": ex, - "limit": limit, - }) - if err != nil { - return nil, err - } - - defer rows.Close() - return s.scanRows(rows) -} - func (s *DepositService) Query(exchangeName types.ExchangeName) ([]types.Deposit, error) { args := map[string]interface{}{ "exchange": exchangeName, @@ -98,9 +94,12 @@ func (s *DepositService) scanRows(rows *sqlx.Rows) (deposits []types.Deposit, er return deposits, rows.Err() } -func (s *DepositService) Insert(deposit types.Deposit) error { - sql := `INSERT INTO deposits (exchange, asset, address, amount, txn_id, time) - VALUES (:exchange, :asset, :address, :amount, :txn_id, :time)` - _, err := s.DB.NamedExec(sql, deposit) - return err +func SelectLastDeposits(ex types.ExchangeName, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("deposits"). + Where(sq.And{ + sq.Eq{"exchange": ex}, + }). + OrderBy("time DESC"). + Limit(limit) } diff --git a/pkg/service/deposit_test.go b/pkg/service/deposit_test.go index d3d60114f..6d43c3366 100644 --- a/pkg/service/deposit_test.go +++ b/pkg/service/deposit_test.go @@ -1,39 +1 @@ package service - -import ( - "testing" - "time" - - "github.com/jmoiron/sqlx" - "github.com/stretchr/testify/assert" - - "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/fixedpoint" -) - -func TestDepositService(t *testing.T) { - db, err := prepareDB(t) - if err != nil { - t.Fatal(err) - } - - defer db.Close() - - xdb := sqlx.NewDb(db.DB, "sqlite3") - service := &DepositService{DB: xdb} - - err = service.Insert(types.Deposit{ - Exchange: types.ExchangeMax, - Time: types.Time(time.Now()), - Amount: fixedpoint.NewFromFloat(0.001), - Asset: "BTC", - Address: "test", - TransactionID: "02", - Status: types.DepositSuccess, - }) - assert.NoError(t, err) - - deposits, err := service.Query(types.ExchangeMax) - assert.NoError(t, err) - assert.NotEmpty(t, deposits) -} diff --git a/pkg/service/sync.go b/pkg/service/sync.go index 53561b1f4..a7af599e6 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -88,9 +88,9 @@ func (s *SyncService) SyncRewardHistory(ctx context.Context, exchange types.Exch return nil } -func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exchange) error { +func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exchange, startTime time.Time) error { log.Infof("syncing %s deposit records...", exchange.Name()) - if err := s.DepositService.Sync(ctx, exchange); err != nil { + if err := s.DepositService.Sync(ctx, exchange, startTime); err != nil { if err != ErrNotImplemented { log.Warnf("%s deposit service is not supported", exchange.Name()) return err diff --git a/pkg/types/deposit.go b/pkg/types/deposit.go index d3340ff8d..2414da8fb 100644 --- a/pkg/types/deposit.go +++ b/pkg/types/deposit.go @@ -1,8 +1,10 @@ package types import ( - "github.com/c9s/bbgo/pkg/fixedpoint" + "fmt" "time" + + "github.com/c9s/bbgo/pkg/fixedpoint" ) type DepositStatus string @@ -37,3 +39,22 @@ type Deposit struct { func (d Deposit) EffectiveTime() time.Time { return d.Time.Time() } + +func (d Deposit) String() (o string) { + o = fmt.Sprintf("%s deposit %s %v <- ", d.Exchange, d.Asset, d.Amount) + + if len(d.AddressTag) > 0 { + o += fmt.Sprintf("%s (tag: %s) at %s", d.Address, d.AddressTag, d.Time.Time()) + } else { + o += fmt.Sprintf("%s at %s", d.Address, d.Time.Time()) + } + + if len(d.TransactionID) > 0 { + o += fmt.Sprintf("txID: %s", cutstr(d.TransactionID, 12, 4, 4)) + } + if len(d.Status) > 0 { + o += "status: " + string(d.Status) + } + + return o +} diff --git a/pkg/types/trade.go b/pkg/types/trade.go index 90053a4f1..2ea6c40c7 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -146,7 +146,7 @@ func trimTrailingZero(a float64) string { // String is for console output func (trade Trade) String() string { - return fmt.Sprintf("TRADE %s %s %4s %-4s @ %6s | amount %s | fee %s %s | orderID %d | %s", + return fmt.Sprintf("TRADE %s %s %4s %-4s @ %-6s | amount %s | fee %s %s | orderID %d | %s", trade.Exchange.String(), trade.Symbol, trade.Side, diff --git a/pkg/types/withdraw.go b/pkg/types/withdraw.go index 7c74084e9..be041df5d 100644 --- a/pkg/types/withdraw.go +++ b/pkg/types/withdraw.go @@ -33,7 +33,7 @@ func cutstr(s string, maxLen, head, tail int) string { } func (w Withdraw) String() (o string) { - o = fmt.Sprintf("withdraw %s %v -> ", w.Asset, w.Amount) + o = fmt.Sprintf("%s withdraw %s %v -> ", w.Exchange, w.Asset, w.Amount) if len(w.Network) > 0 && w.Network != w.Asset { o += w.Network + ":"