diff --git a/config/sync.yaml b/config/sync.yaml index 5f97fcee5..c711a3771 100644 --- a/config/sync.yaml +++ b/config/sync.yaml @@ -23,7 +23,7 @@ sync: filledOrders: true # since is the start date of your trading data - since: 2022-01-01 + since: 2019-01-01 # sessions is the list of session names you want to sync # by default, BBGO sync all your available sessions. @@ -48,5 +48,5 @@ sync: - USDT # depositHistory: true - rewardHistory: true + # rewardHistory: true withdrawHistory: true diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index ff582dc77..aca20eefb 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -587,6 +587,8 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig * sessions = environ.SelectSessions(selectedSessions...) } + since := userConfig.Sync.Since.Time() + for _, session := range sessions { if err := environ.syncSession(ctx, session, syncSymbols...); err != nil { return err @@ -599,7 +601,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig * } if userConfig.Sync.WithdrawHistory { - if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { + if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil { return err } } @@ -612,7 +614,7 @@ func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig * if userConfig.Sync.MarginHistory { if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange, - userConfig.Sync.Since.Time(), + since, userConfig.Sync.MarginAssets...); err != nil { return err } @@ -644,6 +646,8 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err return environ.syncWithUserConfig(ctx, userConfig[0]) } + since := time.Now().AddDate(0, -6, 0) + // the default sync logics for _, session := range environ.sessions { if err := environ.syncSession(ctx, session); err != nil { @@ -661,7 +665,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err } if userConfig[0].Sync.WithdrawHistory { - if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { + if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange, since); err != nil { return err } } diff --git a/pkg/exchange/batch/time_range_query.go b/pkg/exchange/batch/time_range_query.go index 954ace647..88ef0b755 100644 --- a/pkg/exchange/batch/time_range_query.go +++ b/pkg/exchange/batch/time_range_query.go @@ -61,12 +61,17 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s listRef := reflect.ValueOf(sliceInf) listLen := listRef.Len() + log.Debugf("batch querying %T: %d remote records", q.Type, listLen) + if listLen == 0 { if q.JumpIfEmpty > 0 { startTime = startTime.Add(q.JumpIfEmpty) + + log.Debugf("batch querying %T: empty records jump to %s", q.Type, startTime) continue } + log.Debugf("batch querying %T: empty records, query is completed", q.Type) return } @@ -94,7 +99,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s obj := item.Interface() id := q.ID(obj) if _, exists := idMap[id]; exists { - log.Debugf("batch querying %T: duplicated id %s", q.Type, id) + log.Debugf("batch querying %T: ignore duplicated record, id = %s", q.Type, id) continue } @@ -102,10 +107,11 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s cRef.Send(item) sentAny = true - startTime = entryTime + startTime = entryTime.Add(time.Millisecond) } if !sentAny { + log.Debugf("batch querying %T: %d/%d records are not sent", q.Type, listLen, listLen) return } } diff --git a/pkg/exchange/batch/withdraw.go b/pkg/exchange/batch/withdraw.go index 6ea42abe3..36fc37489 100644 --- a/pkg/exchange/batch/withdraw.go +++ b/pkg/exchange/batch/withdraw.go @@ -17,7 +17,7 @@ func (e *WithdrawBatchQuery) Query(ctx context.Context, asset string, startTime, query := &AsyncTimeRangedBatchQuery{ Type: types.Withdraw{}, Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), - JumpIfEmpty: time.Hour * 24 * 30, + JumpIfEmpty: time.Hour * 24 * 80, Q: func(startTime, endTime time.Time) (interface{}, error) { return e.ExchangeTransferService.QueryWithdrawHistory(ctx, asset, startTime, endTime) }, diff --git a/pkg/service/sync.go b/pkg/service/sync.go index c0d30443b..53561b1f4 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -100,9 +100,9 @@ func (s *SyncService) SyncDepositHistory(ctx context.Context, exchange types.Exc return nil } -func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange) error { +func (s *SyncService) SyncWithdrawHistory(ctx context.Context, exchange types.Exchange, startTime time.Time) error { log.Infof("syncing %s withdraw records...", exchange.Name()) - if err := s.WithdrawService.Sync(ctx, exchange); err != nil { + if err := s.WithdrawService.Sync(ctx, exchange, startTime); err != nil { if err != ErrNotImplemented { log.Warnf("%s withdraw service is not supported", exchange.Name()) return err diff --git a/pkg/service/withdraw.go b/pkg/service/withdraw.go index d6675c5e4..d5730bb25 100644 --- a/pkg/service/withdraw.go +++ b/pkg/service/withdraw.go @@ -2,13 +2,12 @@ package service import ( "context" - "fmt" "time" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" - log "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -17,50 +16,52 @@ type WithdrawService struct { } // Sync syncs the withdrawal records into db -func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange) error { - txnIDs := map[string]struct{}{} - - // query descending - records, err := s.QueryLast(ex.Name(), 10) - if err != nil { - return err - } - - for _, record := range records { - txnIDs[record.TransactionID] = struct{}{} +func (s *WithdrawService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error { + isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex) + if isMargin || isFutures || isIsolated { + // only works in spot + return nil } transferApi, ok := ex.(types.ExchangeTransferService) if !ok { - return ErrNotImplemented + return nil } - since := time.Time{} - if len(records) > 0 { - since = records[len(records)-1].ApplyTime.Time() + tasks := []SyncTask{ + { + Type: types.Withdraw{}, + Select: SelectLastWithdraws(ex.Name(), 100), + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.WithdrawBatchQuery{ + ExchangeTransferService: transferApi, + } + return query.Query(ctx, "", startTime, endTime) + }, + Time: func(obj interface{}) time.Time { + return obj.(types.Withdraw).ApplyTime.Time() + }, + ID: func(obj interface{}) string { + withdraw := obj.(types.Withdraw) + return withdraw.TransactionID + }, + Filter: func(obj interface{}) bool { + withdraw := obj.(types.Withdraw) + if withdraw.Status == "rejected" { + return false + } + + if len(withdraw.TransactionID) == 0 { + return false + } + + return true + }, + }, } - // asset "" means all assets - withdraws, err := transferApi.QueryWithdrawHistory(ctx, "", since, time.Now()) - if err != nil { - return err - } - - for _, withdraw := range withdraws { - if _, exists := txnIDs[withdraw.TransactionID]; exists { - continue - } - - if withdraw.Status == "rejected" { - log.Warnf("skip record, withdraw transaction rejected: %+v", withdraw) - continue - } - - if len(withdraw.TransactionID) == 0 { - return fmt.Errorf("empty withdraw transacion ID: %+v", withdraw) - } - - if err := s.Insert(withdraw); err != nil { + for _, sel := range tasks { + if err := sel.execute(ctx, s.DB, startTime); err != nil { return err } } diff --git a/pkg/types/withdraw.go b/pkg/types/withdraw.go index 6a3d5dae5..7c74084e9 100644 --- a/pkg/types/withdraw.go +++ b/pkg/types/withdraw.go @@ -2,8 +2,9 @@ package types import ( "fmt" - "github.com/c9s/bbgo/pkg/fixedpoint" "time" + + "github.com/c9s/bbgo/pkg/fixedpoint" ) type Withdraw struct { @@ -23,8 +24,32 @@ type Withdraw struct { Network string `json:"network" db:"network"` } -func (w Withdraw) String() string { - return fmt.Sprintf("withdraw %s %v to %s at %s", w.Asset, w.Amount, w.Address, w.ApplyTime.Time()) +func cutstr(s string, maxLen, head, tail int) string { + if len(s) > maxLen { + l := len(s) + return s[0:head] + "..." + s[l-tail:] + } + return s +} + +func (w Withdraw) String() (o string) { + o = fmt.Sprintf("withdraw %s %v -> ", w.Asset, w.Amount) + + if len(w.Network) > 0 && w.Network != w.Asset { + o += w.Network + ":" + } + + o += fmt.Sprintf("%s at %s", w.Address, w.ApplyTime.Time()) + + if !w.TransactionFee.IsZero() { + o += fmt.Sprintf("fee %f %s", w.TransactionFee.Float64(), w.TransactionFeeCurrency) + } + + if len(w.TransactionID) > 0 { + o += fmt.Sprintf("txID: %s", cutstr(w.TransactionID, 12, 4, 4)) + } + + return o } func (w Withdraw) EffectiveTime() time.Time {