From 5a7cf05701bdfea986988f1c554b9ef09f40f848 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 23 Feb 2021 16:39:48 +0800 Subject: [PATCH] integrate reward service into the sync service --- examples/max-rewards/main.go | 4 + .../20210223080622_add_rewards_table.sql | 4 +- .../20210223080622_add_rewards_table.sql | 2 +- pkg/bbgo/environment.go | 16 ++- pkg/bbgo/session.go | 2 +- pkg/exchange/max/convert.go | 21 ++++ pkg/exchange/max/exchange.go | 62 +++++++-- pkg/exchange/max/maxapi/restapi.go | 14 +++ pkg/exchange/max/maxapi/reward.go | 60 +++++++++ .../mysql/20210223080622_add_rewards_table.go | 2 +- .../20210223080622_add_rewards_table.go | 2 +- pkg/service/reward.go | 44 ++++--- pkg/service/sync.go | 118 +++++++++++++++++- pkg/types/exchange.go | 4 + pkg/types/reward.go | 25 +++- pkg/types/trade.go | 2 +- 16 files changed, 341 insertions(+), 41 deletions(-) diff --git a/examples/max-rewards/main.go b/examples/max-rewards/main.go index dbe978db0..3f75a7476 100644 --- a/examples/max-rewards/main.go +++ b/examples/max-rewards/main.go @@ -31,8 +31,12 @@ func main() { } else { req = api.RewardService.NewRewardsRequest() } + // req.From(1613931192) // req.From(1613240048) + // req.From(maxapi.TimestampSince) + // req.To(maxapi.TimestampSince + 3600 * 24) + req.Limit(100) rewards, err := req.Do(ctx) if err != nil { diff --git a/migrations/mysql/20210223080622_add_rewards_table.sql b/migrations/mysql/20210223080622_add_rewards_table.sql index c686a8ab5..ca7c40386 100644 --- a/migrations/mysql/20210223080622_add_rewards_table.sql +++ b/migrations/mysql/20210223080622_add_rewards_table.sql @@ -20,12 +20,12 @@ CREATE TABLE `rewards` `created_at` DATETIME NOT NULL, - `used` BOOLEAN NOT NULL DEFAULT FALSE, + `spent` BOOLEAN NOT NULL DEFAULT FALSE, `note` TEXT NULL, PRIMARY KEY (`gid`), - UNIQUE KEY `id` (`id`) + UNIQUE KEY `uuid` (`exchange`, `uuid`) ); -- +down diff --git a/migrations/sqlite3/20210223080622_add_rewards_table.sql b/migrations/sqlite3/20210223080622_add_rewards_table.sql index 71f5efd82..ff6b3ae58 100644 --- a/migrations/sqlite3/20210223080622_add_rewards_table.sql +++ b/migrations/sqlite3/20210223080622_add_rewards_table.sql @@ -20,7 +20,7 @@ CREATE TABLE `rewards` `created_at` DATETIME NOT NULL, - `used` BOOLEAN NOT NULL DEFAULT FALSE, + `spent` BOOLEAN NOT NULL DEFAULT FALSE, `note` TEXT NULL ); diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 4cf1eedc3..5630a6d16 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -68,7 +68,8 @@ type Environment struct { DatabaseService *service.DatabaseService OrderService *service.OrderService TradeService *service.TradeService - TradeSync *service.SyncService + RewardService *service.RewardService + SyncService *service.SyncService // startTime is the time of start point (which is used in the backtest) startTime time.Time @@ -157,9 +158,12 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver db := environ.DatabaseService.DB environ.OrderService = &service.OrderService{DB: db} environ.TradeService = &service.TradeService{DB: db} - environ.TradeSync = &service.SyncService{ - TradeService: environ.TradeService, - OrderService: environ.OrderService, + environ.RewardService = &service.RewardService{DB: db} + + environ.SyncService = &service.SyncService{ + TradeService: environ.TradeService, + OrderService: environ.OrderService, + RewardService: environ.RewardService, } return nil @@ -528,7 +532,7 @@ func (environ *Environment) SyncSession(ctx context.Context, session *ExchangeSe } func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSession, defaultSymbols ...string) error { - if err := session.Init(ctx, environ) ; err != nil { + if err := session.Init(ctx, environ); err != nil { return err } @@ -539,7 +543,7 @@ func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSe log.Infof("syncing symbols %v from session %s", symbols, session.Name) - return environ.TradeSync.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) + return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) } func getSessionSymbols(session *ExchangeSession, defaultSymbols ...string) ([]string, error) { diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index c77eab6cc..9f82e32c4 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -291,7 +291,7 @@ func (session *ExchangeSession) InitSymbol(ctx context.Context, environ *Environ var err error var trades []types.Trade - if environ.TradeSync != nil { + if environ.SyncService != nil { tradingFeeCurrency := session.Exchange.PlatformFeeCurrency() if strings.HasPrefix(symbol, tradingFeeCurrency) { trades, err = environ.TradeService.QueryForTradingFeeCurrency(session.Exchange.Name(), symbol, tradingFeeCurrency) diff --git a/pkg/exchange/max/convert.go b/pkg/exchange/max/convert.go index 2940d55fa..54c9c3f64 100644 --- a/pkg/exchange/max/convert.go +++ b/pkg/exchange/max/convert.go @@ -49,6 +49,27 @@ func toGlobalSideType(v string) types.SideType { return types.SideType(v) } +func toGlobalRewards(maxRewards []max.Reward) ([]types.Reward, error) { + // convert to global reward + var rewards []types.Reward + for _, r := range maxRewards { + // ignore "accepted" + if r.State != "done" { + continue + } + + + reward, err := r.Reward() + if err != nil { + return nil, err + } + + rewards = append(rewards, *reward) + } + + return rewards, nil +} + func toGlobalOrderStatus(orderStatus max.OrderState, executedVolume, remainingVolume fixedpoint.Value) types.OrderStatus { switch orderStatus { diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index cfdfd03ac..c9de4871a 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "sort" "strconv" "time" @@ -69,7 +70,7 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke } func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) { - if err := marketDataLimiter.Wait(ctx) ; err != nil { + if err := marketDataLimiter.Wait(ctx); err != nil { return nil, err } @@ -174,7 +175,7 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [ // lastOrderID is not supported on MAX func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) { - if err := closedOrderQueryLimiter.Wait(ctx) ; err != nil { + if err := closedOrderQueryLimiter.Wait(ctx); err != nil { return nil, err } @@ -376,7 +377,7 @@ func (e *Exchange) PlatformFeeCurrency() string { } func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) { - if err := accountQueryLimiter.Wait(ctx) ; err != nil { + if err := accountQueryLimiter.Wait(ctx); err != nil { return nil, err } @@ -521,7 +522,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) { - if err := accountQueryLimiter.Wait(ctx) ; err != nil { + if err := accountQueryLimiter.Wait(ctx); err != nil { return nil, err } @@ -544,7 +545,7 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, } func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) { - if err := tradeQueryLimiter.Wait(ctx) ; err != nil { + if err := tradeQueryLimiter.Wait(ctx); err != nil { return nil, err } @@ -583,8 +584,56 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type return trades, nil } +func (e *Exchange) QueryRewards(ctx context.Context, startTime time.Time) ([]types.Reward, error) { + var from = startTime + var emptyTime = time.Time{} + + if from == emptyTime { + from = time.Unix(maxapi.TimestampSince, 0) + } + + var now = time.Now() + for { + if from.After(now) { + break + } + + // scan by 30 days + // an user might get most 14 commission records by currency per day + // limit 1000 / 14 = 71 days + to := from.Add(time.Hour * 24 * 30) + req := e.client.RewardService.NewRewardsRequest() + req.From(from.Unix()) + req.To(to.Unix()) + req.Limit(1000) + + maxRewards, err := req.Do(ctx) + if err != nil { + return nil, err + } + + if len(maxRewards) == 0 { + // next page + from = to + continue + } + + rewards, err := toGlobalRewards(maxRewards) + if err != nil { + return nil, err + } + + // sort them in the ascending order + sort.Reverse(types.RewardSliceByCreationTime{rewards}) + + return rewards, err + } + + return nil, errors.New("unknown error") +} + func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) { - if err := marketDataLimiter.Wait(ctx) ; err != nil { + if err := marketDataLimiter.Wait(ctx); err != nil { return nil, err } @@ -605,7 +654,6 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type return nil, errors.New("start time can not be empty") } - log.Infof("querying kline %s %s %+v", symbol, interval, options) localKLines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), string(interval), *options.StartTime, limit) if err != nil { diff --git a/pkg/exchange/max/maxapi/restapi.go b/pkg/exchange/max/maxapi/restapi.go index 77b53c286..6b27d1dc1 100644 --- a/pkg/exchange/max/maxapi/restapi.go +++ b/pkg/exchange/max/maxapi/restapi.go @@ -20,6 +20,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" + "github.com/spf13/viper" "github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/version" @@ -32,8 +33,16 @@ const ( UserAgent = "bbgo/" + version.Version defaultHTTPTimeout = time.Second * 30 + + TimestampSince = 1535760000 ) +var debugMaxRequestPayload = true + +func init() { + debugMaxRequestPayload = viper.GetBool("MAX_DEBUG_REQUEST_PAYLOAD") +} + var logger = log.WithField("exchange", "max") var htmlTagPattern = regexp.MustCompile("<[/]?[a-zA-Z-]+.*?>") @@ -187,6 +196,10 @@ func (c *RestClient) newAuthenticatedRequest(m string, refURL string, data inter p, err = json.Marshal(d) } + if debugMaxRequestPayload { + log.Infof("request payload: %s", p) + } + if err != nil { return nil, err } @@ -204,6 +217,7 @@ func (c *RestClient) newAuthenticatedRequest(m string, refURL string, data inter return nil, err } + encoded := base64.StdEncoding.EncodeToString(p) req.Header.Add("Content-Type", "application/json") diff --git a/pkg/exchange/max/maxapi/reward.go b/pkg/exchange/max/maxapi/reward.go index 7154e64cd..dea3bebbb 100644 --- a/pkg/exchange/max/maxapi/reward.go +++ b/pkg/exchange/max/maxapi/reward.go @@ -4,8 +4,11 @@ import ( "context" "encoding/json" "fmt" + "strings" + "github.com/c9s/bbgo/pkg/datatype" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" ) type RewardType string @@ -55,7 +58,34 @@ func (t *RewardType) UnmarshalJSON(o []byte) error { return nil } +func (t RewardType) RewardType() (types.RewardType, error) { + switch t { + + case RewardAirdrop: + return types.RewardAirdrop, nil + + case RewardCommission: + return types.RewardCommission, nil + + case RewardHolding: + return types.RewardHolding, nil + + case RewardMining: + return types.RewardMining, nil + + case RewardTrading: + return types.RewardTrading, nil + + case RewardVipRebate: + return types.RewardVipRebate, nil + + } + + return types.RewardType(""), fmt.Errorf("unknown reward type: %s", t) +} + type Reward struct { + // UUID here is more like SN, not the real UUID UUID string `json:"uuid"` Type RewardType `json:"type"` Currency string `json:"currency"` @@ -67,6 +97,25 @@ type Reward struct { CreatedAt Timestamp `json:"created_at"` } +func (reward Reward) Reward() (*types.Reward, error) { + rt, err := reward.Type.RewardType() + if err != nil { + return nil, err + } + + return &types.Reward{ + UUID: reward.UUID, + Exchange: types.ExchangeMax, + Type: rt, + Currency: strings.ToUpper(reward.Currency), + Quantity: reward.Amount, + State: reward.State, + Note: reward.Note, + Used: false, + CreatedAt: datatype.Time(reward.CreatedAt), + }, nil +} + type RewardService struct { client *RestClient } @@ -91,6 +140,8 @@ type RewardsRequest struct { // To Unix-timestamp to *int64 + + limit *int } func (r *RewardsRequest) Currency(currency string) *RewardsRequest { @@ -103,6 +154,11 @@ func (r *RewardsRequest) From(from int64) *RewardsRequest { return r } +func (r *RewardsRequest) Limit(limit int) *RewardsRequest { + r.limit = &limit + return r +} + func (r *RewardsRequest) To(to int64) *RewardsRequest { r.to = &to return r @@ -123,6 +179,10 @@ func (r *RewardsRequest) Do(ctx context.Context) (rewards []Reward, err error) { payload["from"] = r.from } + if r.limit != nil { + payload["limit"] = r.limit + } + refURL := "v2/rewards" if r.pathType != nil { diff --git a/pkg/migrations/mysql/20210223080622_add_rewards_table.go b/pkg/migrations/mysql/20210223080622_add_rewards_table.go index fd14ef680..edf0f9ab8 100644 --- a/pkg/migrations/mysql/20210223080622_add_rewards_table.go +++ b/pkg/migrations/mysql/20210223080622_add_rewards_table.go @@ -14,7 +14,7 @@ func init() { func upAddRewardsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `used` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`id`)\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `spent` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `uuid` (`exchange`, `uuid`)\n);") if err != nil { return err } diff --git a/pkg/migrations/sqlite3/20210223080622_add_rewards_table.go b/pkg/migrations/sqlite3/20210223080622_add_rewards_table.go index e04b023ee..5118f443f 100644 --- a/pkg/migrations/sqlite3/20210223080622_add_rewards_table.go +++ b/pkg/migrations/sqlite3/20210223080622_add_rewards_table.go @@ -14,7 +14,7 @@ func init() { func upAddRewardsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) { // This code is executed when the migration is applied. - _, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `used` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL\n);") + _, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `spent` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL\n);") if err != nil { return err } diff --git a/pkg/service/reward.go b/pkg/service/reward.go index d81b49591..a7fd3bae0 100644 --- a/pkg/service/reward.go +++ b/pkg/service/reward.go @@ -15,11 +15,24 @@ type RewardService struct { func NewRewardService(db *sqlx.DB) *RewardService { return &RewardService{db} } -func (s *RewardService) Query(ex types.ExchangeName, rewardType string, from time.Time) ([]types.Trade, error) { - rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC`, map[string]interface{}{ - "exchange": ex, - "reward_type": rewardType, - "from": from.Unix(), + +func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Reward, error) { + rows, err := s.DB.NamedQuery(`SELECT * FROM rewards WHERE exchange = :exchange ORDER BY created_at DESC LIMIT :limit`, map[string]interface{}{ + "exchange": ex, + "limit": limit, + }) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows) +} + +func (s *RewardService) QueryUnspent(ex types.ExchangeName, from time.Time) ([]types.Reward, error) { + rows, err := s.DB.NamedQuery(`SELECT * FROM rewards WHERE exchange = :exchange AND spent IS FALSE ORDER BY created_at ASC`, map[string]interface{}{ + "exchange": ex, + "from": from, }) if err != nil { return nil, err @@ -30,24 +43,23 @@ func (s *RewardService) Query(ex types.ExchangeName, rewardType string, from tim return s.scanRows(rows) } -func (s *RewardService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err error) { +func (s *RewardService) scanRows(rows *sqlx.Rows) (rewards []types.Reward, err error) { for rows.Next() { - var trade types.Trade - if err := rows.StructScan(&trade); err != nil { - return trades, err + var reward types.Reward + if err := rows.StructScan(&reward); err != nil { + return rewards, err } - trades = append(trades, trade) + rewards = append(rewards, reward) } - return trades, rows.Err() + return rewards, rows.Err() } -func (s *RewardService) Insert(trade types.Trade) error { +func (s *RewardService) Insert(reward types.Reward) error { _, err := s.DB.NamedExec(` - INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated) - VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at, :is_margin, :is_isolated)`, - trade) + INSERT INTO rewards (exchange, uuid, reward_type, quantity, state, created_at) + VALUES (:exchange, :uuid, :reward_type, :quantity, :state, :created_at)`, + reward) return err } - diff --git a/pkg/service/sync.go b/pkg/service/sync.go index cb9d5e7e8..eca058290 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -2,19 +2,124 @@ package service import ( "context" + "errors" "time" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) +var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface") + type SyncService struct { - TradeService *TradeService - OrderService *OrderService + TradeService *TradeService + OrderService *OrderService + RewardService *RewardService } +func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error { + service, ok := exchange.(types.ExchangeRewardService) + if !ok { + return ErrNotImplemented + } + + + var startTime time.Time + lastRecords, err := s.RewardService.QueryLast(exchange.Name(), 10) + if err != nil { + return err + } + if len(lastRecords) > 0 { + end := len(lastRecords) - 1 + lastRecord := lastRecords[end] + startTime = lastRecord.CreatedAt.Time() + } + + batchQuery := &RewardBatchQuery{Service: service} + rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now()) + + for reward := range rewardsC { + select { + + case <-ctx.Done(): + return ctx.Err() + + case err := <-errC: + if err != nil { + return err + } + + default: + + } + + if err := s.RewardService.Insert(reward); err != nil { + return err + } + } + + return <-errC +} + +type RewardBatchQuery struct { + Service types.ExchangeRewardService +} + +func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) { + c = make(chan types.Reward, 500) + errC = make(chan error, 1) + + go func() { + limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) + + defer close(c) + defer close(errC) + + rewardKeys := make(map[string]struct{}, 500) + + for startTime.Before(endTime) { + if err := limiter.Wait(ctx); err != nil { + logrus.WithError(err).Error("rate limit error") + } + + logrus.Infof("batch querying rewards %s <=> %s", startTime, endTime) + + rewards, err := q.Service.QueryRewards(ctx, startTime) + if err != nil { + errC <- err + return + } + + if len(rewards) == 0 { + return + } + + for _, o := range rewards { + if _, ok := rewardKeys[o.UUID]; ok { + logrus.Infof("skipping duplicated order id: %s", o.UUID) + continue + } + + if o.CreatedAt.Time().After(endTime) { + // stop batch query + return + } + + c <- o + startTime = o.CreatedAt.Time() + rewardKeys[o.UUID] = struct{}{} + } + } + + }() + + return c, errC +} + + func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { isMargin := false isIsolated := false @@ -147,6 +252,15 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil { return err } + + + if err := s.SyncRewards(ctx, exchange) ; err != nil { + if err == ErrNotImplemented { + continue + } + + return err + } } return nil diff --git a/pkg/types/exchange.go b/pkg/types/exchange.go index e65f5b57f..1e36346ad 100644 --- a/pkg/types/exchange.go +++ b/pkg/types/exchange.go @@ -85,6 +85,10 @@ type Exchange interface { CancelOrders(ctx context.Context, orders ...Order) error } +type ExchangeRewardService interface { + QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error) +} + type TradeQueryOptions struct { StartTime *time.Time EndTime *time.Time diff --git a/pkg/types/reward.go b/pkg/types/reward.go index a1d233a7e..e95a27595 100644 --- a/pkg/types/reward.go +++ b/pkg/types/reward.go @@ -1,6 +1,8 @@ package types import ( + "time" + "github.com/c9s/bbgo/pkg/datatype" "github.com/c9s/bbgo/pkg/fixedpoint" ) @@ -18,13 +20,30 @@ const ( type Reward struct { UUID string `json:"uuid" db:"uuid"` + Exchange ExchangeName `json:"exchange" db:"exchange"` Type RewardType `json:"reward_type" db:"reward_type"` Currency string `json:"currency" db:"currency"` - Amount fixedpoint.Value `json:"quantity" db:"quantity"` + Quantity fixedpoint.Value `json:"quantity" db:"quantity"` State string `json:"state" db:"state"` Note string `json:"note" db:"note"` - Used bool `json:"used" db:"used"` + Used bool `json:"spent" db:"spent"` // Unix timestamp in seconds - CreatedAt datatype.Time `json:"created_at"` + CreatedAt datatype.Time `json:"created_at" db:"created_at"` +} + +type RewardSlice []Reward + +func (s RewardSlice) Len() int { return len(s) } +func (s RewardSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +type RewardSliceByCreationTime struct { + RewardSlice +} + +// Less reports whether x[i] should be ordered before x[j] +func (s RewardSliceByCreationTime) Less(i, j int) bool { + return time.Time(s.RewardSlice[i].CreatedAt).After( + time.Time(s.RewardSlice[j].CreatedAt), + ) } diff --git a/pkg/types/trade.go b/pkg/types/trade.go index 0d8ce7fde..71cfa4d70 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -90,7 +90,7 @@ func (trade Trade) SlackAttachment() slack.Attachment { {Title: "Exchange", Value: trade.Exchange, Short: true}, {Title: "Price", Value: util.FormatFloat(trade.Price, 2), Short: true}, {Title: "Volume", Value: util.FormatFloat(trade.Quantity, 4), Short: true}, - {Title: "Amount", Value: util.FormatFloat(trade.QuoteQuantity, 2)}, + {Title: "Quantity", Value: util.FormatFloat(trade.QuoteQuantity, 2)}, {Title: "Fee", Value: util.FormatFloat(trade.Fee, 4), Short: true}, {Title: "FeeCurrency", Value: trade.FeeCurrency, Short: true}, },