implement deposit sync

This commit is contained in:
c9s 2021-03-14 10:43:42 +08:00
parent 0246e298d2
commit 54ba240317

View File

@ -1,6 +1,9 @@
package service
import (
"context"
"time"
"github.com/jmoiron/sqlx"
"github.com/c9s/bbgo/pkg/types"
@ -10,6 +13,50 @@ type DepositService struct {
DB *sqlx.DB
}
// Sync syncs the withdraw records into db
func (s *DepositService) Sync(ctx context.Context, ex types.Exchange) error {
txnIDs := map[string]struct{}{}
// query descending
records, err := s.QueryLast(ex.Name(), 100)
if err != nil {
return err
}
for _, record := range records {
txnIDs[record.TransactionID] = struct{}{}
}
transferApi, ok := ex.(types.ExchangeTransferService)
if !ok {
return ErrNotImplemented
}
since := time.Time{}
if len(records) > 0 {
since = records[len(records)-1].Time.Time()
}
// asset "" means all assets
deposits, err := transferApi.QueryDepositHistory(ctx, "", since, time.Now())
if err != nil {
return err
}
for _, deposit := range deposits {
if _, exists := txnIDs[deposit.TransactionID]; exists {
continue
}
if err := s.Insert(deposit); err != nil {
return err
}
}
return nil
}
func (s *DepositService) QueryLast(ex types.ExchangeName, limit int) ([]types.Deposit, error) {
sql := "SELECT * FROM `deposits` WHERE `exchange` = :exchange ORDER BY `time` DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{