implement sync method on reward service

This commit is contained in:
c9s 2021-03-14 11:03:22 +08:00
parent 5a02cdbda3
commit 8fc7c4798e

View File

@ -8,7 +8,9 @@ import (
"time"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -31,6 +33,64 @@ func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Rew
return s.scanRows(rows)
}
func (s *RewardService) Sync(ctx context.Context, exchange types.Exchange) error {
service, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrNotImplemented
}
var rewardKeys = map[string]struct{}{}
var startTime time.Time
records, err := s.QueryLast(exchange.Name(), 50)
if err != nil {
return err
}
if len(records) > 0 {
lastRecord := records[0]
startTime = lastRecord.CreatedAt.Time()
for _, record := range records {
rewardKeys[record.UUID] = struct{}{}
}
}
batchQuery := &batch.RewardBatchQuery{Service: service}
rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now())
for reward := range rewardsC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if _, ok := rewardKeys[reward.UUID]; ok {
continue
}
logrus.Infof("inserting reward: %s %s %s %f %s", reward.Exchange, reward.Type, reward.Currency, reward.Quantity.Float64(), reward.CreatedAt)
if err := s.Insert(reward); err != nil {
return err
}
}
return <-errC
}
type CurrencyPositionMap map[string]fixedpoint.Value
func (s *RewardService) AggregateUnspentCurrencyPosition(ctx context.Context, ex types.ExchangeName, since time.Time) (CurrencyPositionMap, error) {