fix withdraw sync and improve withdraw string format

This commit is contained in:
c9s 2022-06-02 13:38:54 +08:00
parent 813166dd92
commit 5d98674ab5
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
7 changed files with 87 additions and 51 deletions

View File

@ -23,7 +23,7 @@ sync:
filledOrders: true filledOrders: true
# since is the start date of your trading data # since is the start date of your trading data
since: 2022-01-01 since: 2019-01-01
# sessions is the list of session names you want to sync # sessions is the list of session names you want to sync
# by default, BBGO sync all your available sessions. # by default, BBGO sync all your available sessions.
@ -48,5 +48,5 @@ sync:
- USDT - USDT
# depositHistory: true # depositHistory: true
rewardHistory: true # rewardHistory: true
withdrawHistory: true withdrawHistory: true

View File

@ -587,6 +587,8 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
sessions = environ.SelectSessions(selectedSessions...) sessions = environ.SelectSessions(selectedSessions...)
} }
since := userConfig.Sync.Since.Time()
for _, session := range sessions { for _, session := range sessions {
if err := environ.syncSession(ctx, session, syncSymbols...); err != nil { if err := environ.syncSession(ctx, session, syncSymbols...); err != nil {
return err return err
@ -599,7 +601,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
} }
if userConfig.Sync.WithdrawHistory { if userConfig.Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil {
return err return err
} }
} }
@ -612,7 +614,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *
if userConfig.Sync.MarginHistory { if userConfig.Sync.MarginHistory {
if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange, if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange,
userConfig.Sync.Since.Time(), since,
userConfig.Sync.MarginAssets...); err != nil { userConfig.Sync.MarginAssets...); err != nil {
return err return err
} }
@ -644,6 +646,8 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
return environ.syncWithUserConfig(ctx, userConfig[0]) return environ.syncWithUserConfig(ctx, userConfig[0])
} }
since := time.Now().AddDate(0, -6, 0)
// the default sync logics // the default sync logics
for _, session := range environ.sessions { for _, session := range environ.sessions {
if err := environ.syncSession(ctx, session); err != nil { if err := environ.syncSession(ctx, session); err != nil {
@ -661,7 +665,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
} }
if userConfig[0].Sync.WithdrawHistory { if userConfig[0].Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil {
return err return err
} }
} }

View File

@ -61,12 +61,17 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
listRef := reflect.ValueOf(sliceInf) listRef := reflect.ValueOf(sliceInf)
listLen := listRef.Len() listLen := listRef.Len()
log.Debugf("batch querying %T: %d remote records", q.Type, listLen)
if listLen == 0 { if listLen == 0 {
if q.JumpIfEmpty > 0 { if q.JumpIfEmpty > 0 {
startTime = startTime.Add(q.JumpIfEmpty) startTime = startTime.Add(q.JumpIfEmpty)
log.Debugf("batch querying %T: empty records jump to %s", q.Type, startTime)
continue continue
} }
log.Debugf("batch querying %T: empty records, query is completed", q.Type)
return return
} }
@ -94,7 +99,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
obj := item.Interface() obj := item.Interface()
id := q.ID(obj) id := q.ID(obj)
if _, exists := idMap[id]; exists { if _, exists := idMap[id]; exists {
log.Debugf("batch querying %T: duplicated id %s", q.Type, id) log.Debugf("batch querying %T: ignore duplicated record, id = %s", q.Type, id)
continue continue
} }
@ -102,10 +107,11 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
cRef.Send(item) cRef.Send(item)
sentAny = true sentAny = true
startTime = entryTime startTime = entryTime.Add(time.Millisecond)
} }
if !sentAny { if !sentAny {
log.Debugf("batch querying %T: %d/%d records are not sent", q.Type, listLen, listLen)
return return
} }
} }

View File

@ -17,7 +17,7 @@ func (e *WithdrawBatchQuery) Query(ctx context.Context, asset string, startTime,
query := &AsyncTimeRangedBatchQuery{ query := &AsyncTimeRangedBatchQuery{
Type: types.Withdraw{}, Type: types.Withdraw{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 30, JumpIfEmpty: time.Hour * 24 * 80,
Q: func(startTime, endTime time.Time) (interface{}, error) { Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.ExchangeTransferService.QueryWithdrawHistory(ctx, asset, startTime, endTime) return e.ExchangeTransferService.QueryWithdrawHistory(ctx, asset, startTime, endTime)
}, },

View File

@ -100,9 +100,9 @@ func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exc
return nil return nil
} }
func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange) error { func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange, startTime time.Time) error {
log.Infof("syncing %s withdraw records...", exchange.Name()) log.Infof("syncing %s withdraw records...", exchange.Name())
if err := s.WithdrawService.Sync(ctx, exchange); err != nil { if err := s.WithdrawService.Sync(ctx, exchange, startTime); err != nil {
if err != ErrNotImplemented { if err != ErrNotImplemented {
log.Warnf("%s withdraw service is not supported", exchange.Name()) log.Warnf("%s withdraw service is not supported", exchange.Name())
return err return err

View File

@ -2,13 +2,12 @@ package service
import ( import (
"context" "context"
"fmt"
"time" "time"
sq "github.com/Masterminds/squirrel" sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -17,50 +16,52 @@ type WithdrawService struct {
} }
// Sync syncs the withdrawal records into db // Sync syncs the withdrawal records into db
func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error {
txnIDs := map[string]struct{}{} isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex)
if isMargin || isFutures || isIsolated {
// query descending // only works in spot
records, err := s.QueryLast(ex.Name(), 10) return nil
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
} }
transferApi, ok := ex.(types.ExchangeTransferService) transferApi, ok := ex.(types.ExchangeTransferService)
if !ok { if !ok {
return ErrNotImplemented return nil
} }
since := time.Time{} tasks := []SyncTask{
if len(records) > 0 { {
since = records[len(records)-1].ApplyTime.Time() Type: types.Withdraw{},
Select: SelectLastWithdraws(ex.Name(), 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.WithdrawBatchQuery{
ExchangeTransferService: transferApi,
}
return query.Query(ctx, "", startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.Withdraw).ApplyTime.Time()
},
ID: func(obj interface{}) string {
withdraw := obj.(types.Withdraw)
return withdraw.TransactionID
},
Filter: func(obj interface{}) bool {
withdraw := obj.(types.Withdraw)
if withdraw.Status == "rejected" {
return false
}
if len(withdraw.TransactionID) == 0 {
return false
}
return true
},
},
} }
// asset "" means all assets for _, sel := range tasks {
withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now()) if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err != nil {
return err
}
for _, withdraw := range withdraws {
if _, exists := txnIDs[withdraw.TransactionID]; exists {
continue
}
if withdraw.Status == "rejected" {
log.Warnf("skip record, withdraw transaction rejected: %+v", withdraw)
continue
}
if len(withdraw.TransactionID) == 0 {
return fmt.Errorf("empty withdraw transacion ID: %+v", withdraw)
}
if err := s.Insert(withdraw); err != nil {
return err return err
} }
} }

View File

@ -2,8 +2,9 @@ package types
import ( import (
"fmt" "fmt"
"github.com/c9s/bbgo/pkg/fixedpoint"
"time" "time"
"github.com/c9s/bbgo/pkg/fixedpoint"
) )
type Withdraw struct { type Withdraw struct {
@ -23,8 +24,32 @@ type Withdraw struct {
Network string `json:"network" db:"network"` Network string `json:"network" db:"network"`
} }
func (w Withdraw) String() string { func cutstr(s string, maxLen, head, tail int) string {
return fmt.Sprintf("withdraw %s %v to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time()) if len(s) > maxLen {
l := len(s)
return s[0:head] + "..." + s[l-tail:]
}
return s
}
func (w Withdraw) String() (o string) {
o = fmt.Sprintf("withdraw %s %v -> ", w.Asset, w.Amount)
if len(w.Network) > 0 && w.Network != w.Asset {
o += w.Network + ":"
}
o += fmt.Sprintf("%s at %s", w.Address, w.ApplyTime.Time())
if !w.TransactionFee.IsZero() {
o += fmt.Sprintf("fee %f %s", w.TransactionFee.Float64(), w.TransactionFeeCurrency)
}
if len(w.TransactionID) > 0 {
o += fmt.Sprintf("txID: %s", cutstr(w.TransactionID, 12, 4, 4))
}
return o
} }
func (w Withdraw) EffectiveTime() time.Time { func (w Withdraw) EffectiveTime() time.Time {