Merge pull request #693 from c9s/fix/binance-deposit-history-sync

fix: fix and rewrite binance deposit history sync
This commit is contained in:
Yo-An Lin 2022-06-08 19:16:10 +08:00 committed by GitHub
commit 60af0b08e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 152 additions and 147 deletions

View File

@ -600,7 +600,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
}
if userConfig.Sync.DepositHistory {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange, since); err != nil {
return err
}
}
@ -664,7 +664,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
}
if userConfig[0].Sync.DepositHistory {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange, since); err != nil {
return err
}
}

View File

@ -0,0 +1,36 @@
package batch
import (
"context"
"time"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/types"
)
type DepositBatchQuery struct {
types.ExchangeTransferService
}
func (e *DepositBatchQuery) Query(ctx context.Context, asset string, startTime, endTime time.Time) (c chan types.Deposit, errC chan error) {
query := &AsyncTimeRangedBatchQuery{
Type: types.Deposit{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 80,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.ExchangeTransferService.QueryDepositHistory(ctx, asset, startTime, endTime)
},
T: func(obj interface{}) time.Time {
return time.Time(obj.(types.Deposit).Time)
},
ID: func(obj interface{}) string {
deposit := obj.(types.Deposit)
return deposit.TransactionID
},
}
c = make(chan types.Deposit, 100)
errC = query.Query(ctx, c, startTime, endTime)
return c, errC
}

View File

@ -485,70 +485,57 @@ func (e *Exchange) QueryWithdrawHistory(ctx context.Context, asset string, since
}
func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, until time.Time) (allDeposits []types.Deposit, err error) {
startTime := since
var emptyTime = time.Time{}
if startTime == emptyTime {
startTime, err = getLaunchDate()
if since == emptyTime {
since, err = getLaunchDate()
if err != nil {
return nil, err
}
}
txIDs := map[string]struct{}{}
for startTime.Before(until) {
// startTime ~ endTime must be in 90 days
endTime := startTime.AddDate(0, 0, 60)
if endTime.After(until) {
endTime = until
// startTime ~ endTime must be in 90 days
historyDayRangeLimit := time.Hour * 24 * 89
if until.Sub(since) >= historyDayRangeLimit {
until = since.Add(historyDayRangeLimit)
}
req := e.client2.NewGetDepositHistoryRequest()
if len(asset) > 0 {
req.Coin(asset)
}
req.StartTime(since).
EndTime(until)
records, err := req.Do(ctx)
if err != nil {
return nil, err
}
for _, d := range records {
// 0(0:pending,6: credited but cannot withdraw, 1:success)
// set the default status
status := types.DepositStatus(fmt.Sprintf("code: %d", d.Status))
switch d.Status {
case 0:
status = types.DepositPending
case 6:
// https://www.binance.com/en/support/faq/115003736451
status = types.DepositCredited
case 1:
status = types.DepositSuccess
}
req := e.client.NewListDepositsService()
if len(asset) > 0 {
req.Coin(asset)
}
deposits, err := req.
StartTime(startTime.UnixNano() / int64(time.Millisecond)).
EndTime(endTime.UnixNano() / int64(time.Millisecond)).
Do(ctx)
if err != nil {
return nil, err
}
for _, d := range deposits {
if _, ok := txIDs[d.TxID]; ok {
continue
}
// 0(0:pending,6: credited but cannot withdraw, 1:success)
status := types.DepositStatus(fmt.Sprintf("code: %d", d.Status))
switch d.Status {
case 0:
status = types.DepositPending
case 6:
// https://www.binance.com/en/support/faq/115003736451
status = types.DepositCredited
case 1:
status = types.DepositSuccess
}
txIDs[d.TxID] = struct{}{}
allDeposits = append(allDeposits, types.Deposit{
Exchange: types.ExchangeBinance,
Time: types.Time(time.Unix(0, d.InsertTime*int64(time.Millisecond))),
Asset: d.Coin,
Amount: fixedpoint.MustNewFromString(d.Amount),
Address: d.Address,
AddressTag: d.AddressTag,
TransactionID: d.TxID,
Status: status,
})
}
startTime = endTime
allDeposits = append(allDeposits, types.Deposit{
Exchange: types.ExchangeBinance,
Time: types.Time(d.InsertTime.Time()),
Asset: d.Coin,
Amount: d.Amount,
Address: d.Address,
AddressTag: d.AddressTag,
TransactionID: d.TxId,
Status: status,
})
}
return allDeposits, nil

View File

@ -4,8 +4,10 @@ import (
"context"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -14,41 +16,49 @@ type DepositService struct {
}
// Sync syncs the withdraw records into db
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 10)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error {
isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex)
if isMargin || isFutures || isIsolated {
// only works in spot
return nil
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
return nil
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].Time.Time()
tasks := []SyncTask{
{
Type: types.Deposit{},
Select: SelectLastDeposits(ex.Name(), 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.DepositBatchQuery{
ExchangeTransferService: transferApi,
}
return query.Query(ctx, "", startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.Deposit).Time.Time()
},
ID: func(obj interface{}) string {
deposit := obj.(types.Deposit)
return deposit.TransactionID
},
Filter: func(obj interface{}) bool {
deposit := obj.(types.Deposit)
if len(deposit.TransactionID) == 0 {
return false
}
return true
},
LogInsert: true,
},
}
// asset "" means all assets
deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, deposit := range deposits {
if _, exists := txnIDs[deposit.TransactionID]; exists {
continue
}
if err := s.Insert(deposit); err != nil {
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
@ -56,20 +66,6 @@ func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
return nil
}
func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) {
sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex,
"limit": limit,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *DepositService) Query(exchangeName types.ExchangeName) ([]types.Deposit, error) {
args := map[string]interface{}{
"exchange": exchangeName,
@ -98,9 +94,12 @@ func (s *DepositService) scanRows(rows *sqlx.Rows) (deposits []types.Deposit, er
return deposits, rows.Err()
}
func (s *DepositService) Insert(deposit types.Deposit) error {
sql := `INSERT INTO deposits (exchange, asset, address, amount, txn_id, time)
VALUES (:exchange, :asset, :address, :amount, :txn_id, :time)`
_, err := s.DB.NamedExec(sql, deposit)
return err
func SelectLastDeposits(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("deposits").
Where(sq.And{
sq.Eq{"exchange": ex},
}).
OrderBy("time DESC").
Limit(limit)
}

View File

@ -1,39 +1 @@
package service
import (
"testing"
"time"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
func TestDepositService(t *testing.T) {
db, err := prepareDB(t)
if err != nil {
t.Fatal(err)
}
defer db.Close()
xdb := sqlx.NewDb(db.DB, "sqlite3")
service := &DepositService{DB: xdb}
err = service.Insert(types.Deposit{
Exchange: types.ExchangeMax,
Time: types.Time(time.Now()),
Amount: fixedpoint.NewFromFloat(0.001),
Asset: "BTC",
Address: "test",
TransactionID: "02",
Status: types.DepositSuccess,
})
assert.NoError(t, err)
deposits, err := service.Query(types.ExchangeMax)
assert.NoError(t, err)
assert.NotEmpty(t, deposits)
}

View File

@ -88,9 +88,9 @@ func (s *SyncService) SyncRewardHistory(ctx context.Context, exchange types.Exch
return nil
}
func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exchange) error {
func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exchange, startTime time.Time) error {
log.Infof("syncing %s deposit records...", exchange.Name())
if err := s.DepositService.Sync(ctx, exchange); err != nil {
if err := s.DepositService.Sync(ctx, exchange, startTime); err != nil {
if err != ErrNotImplemented {
log.Warnf("%s deposit service is not supported", exchange.Name())
return err

View File

@ -1,8 +1,10 @@
package types
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"fmt"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
type DepositStatus string
@ -37,3 +39,22 @@ type Deposit struct {
func (d Deposit) EffectiveTime() time.Time {
return d.Time.Time()
}
func (d Deposit) String() (o string) {
o = fmt.Sprintf("%s deposit %s %v <- ", d.Exchange, d.Asset, d.Amount)
if len(d.AddressTag) > 0 {
o += fmt.Sprintf("%s (tag: %s) at %s", d.Address, d.AddressTag, d.Time.Time())
} else {
o += fmt.Sprintf("%s at %s", d.Address, d.Time.Time())
}
if len(d.TransactionID) > 0 {
o += fmt.Sprintf("txID: %s", cutstr(d.TransactionID, 12, 4, 4))
}
if len(d.Status) > 0 {
o += "status: " + string(d.Status)
}
return o
}

View File

@ -146,7 +146,7 @@ func trimTrailingZero(a float64) string {
// String is for console output
func (trade Trade) String() string {
return fmt.Sprintf("TRADE %s %s %4s %-4s @ %6s | amount %s | fee %s %s | orderID %d | %s",
return fmt.Sprintf("TRADE %s %s %4s %-4s @ %-6s | amount %s | fee %s %s | orderID %d | %s",
trade.Exchange.String(),
trade.Symbol,
trade.Side,

View File

@ -33,7 +33,7 @@ func cutstr(s string, maxLen, head, tail int) string {
}
func (w Withdraw) String() (o string) {
o = fmt.Sprintf("withdraw %s %v -> ", w.Asset, w.Amount)
o = fmt.Sprintf("%s withdraw %s %v -> ", w.Exchange, w.Asset, w.Amount)
if len(w.Network) > 0 && w.Network != w.Asset {
o += w.Network + ":"