rewrite deposit sync service

This commit is contained in:
c9s 2022-06-08 15:49:28 +08:00
parent 5f075af24f
commit 6d78b05b41
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -4,8 +4,10 @@ import (
"context"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
@ -14,41 +16,49 @@ type DepositService struct {
}
// Sync syncs the withdraw records into db
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 10)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange, startTime time.Time) error {
isMargin, isFutures, isIsolated, _ := getExchangeAttributes(ex)
if isMargin || isFutures || isIsolated {
// only works in spot
return nil
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
return nil
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].Time.Time()
tasks := []SyncTask{
{
Type: types.Deposit{},
Select: SelectLastDeposits(ex.Name(), 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.DepositBatchQuery{
ExchangeTransferService: transferApi,
}
return query.Query(ctx, "", startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.Deposit).Time.Time()
},
ID: func(obj interface{}) string {
deposit := obj.(types.Deposit)
return deposit.TransactionID
},
Filter: func(obj interface{}) bool {
deposit := obj.(types.Deposit)
if len(deposit.TransactionID) == 0 {
return false
}
return true
},
LogInsert: true,
},
}
// asset "" means all assets
deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, deposit := range deposits {
if _, exists := txnIDs[deposit.TransactionID]; exists {
continue
}
if err := s.Insert(deposit); err != nil {
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
@ -104,3 +114,14 @@ func (s *DepositService) Insert(deposit types.Deposit) error {
_, err := s.DB.NamedExec(sql, deposit)
return err
}
func SelectLastDeposits(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("deposits").
Where(sq.And{
sq.Eq{"exchange": ex},
}).
OrderBy("time DESC").
Limit(limit)
}