integrate reward service into the sync service

This commit is contained in:
c9s 2021-02-23 16:39:48 +08:00
parent c02213c8e5
commit 5a7cf05701
16 changed files with 341 additions and 41 deletions

View File

@ -31,8 +31,12 @@ func main() {
} else {
req = api.RewardService.NewRewardsRequest()
}
// req.From(1613931192)
// req.From(1613240048)
// req.From(maxapi.TimestampSince)
// req.To(maxapi.TimestampSince + 3600 * 24)
req.Limit(100)
rewards, err := req.Do(ctx)
if err != nil {

View File

@ -20,12 +20,12 @@ CREATE TABLE `rewards`
`created_at` DATETIME NOT NULL,
`used` BOOLEAN NOT NULL DEFAULT FALSE,
`spent` BOOLEAN NOT NULL DEFAULT FALSE,
`note` TEXT NULL,
PRIMARY KEY (`gid`),
UNIQUE KEY `id` (`id`)
UNIQUE KEY `uuid` (`exchange`, `uuid`)
);
-- +down

View File

@ -20,7 +20,7 @@ CREATE TABLE `rewards`
`created_at` DATETIME NOT NULL,
`used` BOOLEAN NOT NULL DEFAULT FALSE,
`spent` BOOLEAN NOT NULL DEFAULT FALSE,
`note` TEXT NULL
);

View File

@ -68,7 +68,8 @@ type Environment struct {
DatabaseService *service.DatabaseService
OrderService *service.OrderService
TradeService *service.TradeService
TradeSync *service.SyncService
RewardService *service.RewardService
SyncService *service.SyncService
// startTime is the time of start point (which is used in the backtest)
startTime time.Time
@ -157,9 +158,12 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver
db := environ.DatabaseService.DB
environ.OrderService = &service.OrderService{DB: db}
environ.TradeService = &service.TradeService{DB: db}
environ.TradeSync = &service.SyncService{
TradeService: environ.TradeService,
OrderService: environ.OrderService,
environ.RewardService = &service.RewardService{DB: db}
environ.SyncService = &service.SyncService{
TradeService: environ.TradeService,
OrderService: environ.OrderService,
RewardService: environ.RewardService,
}
return nil
@ -528,7 +532,7 @@ func (environ *Environment) SyncSession(ctx context.Context, session *ExchangeSe
}
func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSession, defaultSymbols ...string) error {
if err := session.Init(ctx, environ) ; err != nil {
if err := session.Init(ctx, environ); err != nil {
return err
}
@ -539,7 +543,7 @@ func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSe
log.Infof("syncing symbols %v from session %s", symbols, session.Name)
return environ.TradeSync.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...)
return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...)
}
func getSessionSymbols(session *ExchangeSession, defaultSymbols ...string) ([]string, error) {

View File

@ -291,7 +291,7 @@ func (session *ExchangeSession) InitSymbol(ctx context.Context, environ *Environ
var err error
var trades []types.Trade
if environ.TradeSync != nil {
if environ.SyncService != nil {
tradingFeeCurrency := session.Exchange.PlatformFeeCurrency()
if strings.HasPrefix(symbol, tradingFeeCurrency) {
trades, err = environ.TradeService.QueryForTradingFeeCurrency(session.Exchange.Name(), symbol, tradingFeeCurrency)

View File

@ -49,6 +49,27 @@ func toGlobalSideType(v string) types.SideType {
return types.SideType(v)
}
func toGlobalRewards(maxRewards []max.Reward) ([]types.Reward, error) {
// convert to global reward
var rewards []types.Reward
for _, r := range maxRewards {
// ignore "accepted"
if r.State != "done" {
continue
}
reward, err := r.Reward()
if err != nil {
return nil, err
}
rewards = append(rewards, *reward)
}
return rewards, nil
}
func toGlobalOrderStatus(orderStatus max.OrderState, executedVolume, remainingVolume fixedpoint.Value) types.OrderStatus {
switch orderStatus {

View File

@ -5,6 +5,7 @@ import (
"fmt"
"math"
"os"
"sort"
"strconv"
"time"
@ -69,7 +70,7 @@ func (e *Exchange) QueryTicker(ctx context.Context, symbol string) (*types.Ticke
}
func (e *Exchange) QueryTickers(ctx context.Context, symbol ...string) (map[string]types.Ticker, error) {
if err := marketDataLimiter.Wait(ctx) ; err != nil {
if err := marketDataLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -174,7 +175,7 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
// lastOrderID is not supported on MAX
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
if err := closedOrderQueryLimiter.Wait(ctx) ; err != nil {
if err := closedOrderQueryLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -376,7 +377,7 @@ func (e *Exchange) PlatformFeeCurrency() string {
}
func (e *Exchange) QueryAccount(ctx context.Context) (*types.Account, error) {
if err := accountQueryLimiter.Wait(ctx) ; err != nil {
if err := accountQueryLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -521,7 +522,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since,
}
func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap, error) {
if err := accountQueryLimiter.Wait(ctx) ; err != nil {
if err := accountQueryLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -544,7 +545,7 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (types.BalanceMap,
}
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
if err := tradeQueryLimiter.Wait(ctx) ; err != nil {
if err := tradeQueryLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -583,8 +584,56 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
return trades, nil
}
func (e *Exchange) QueryRewards(ctx context.Context, startTime time.Time) ([]types.Reward, error) {
var from = startTime
var emptyTime = time.Time{}
if from == emptyTime {
from = time.Unix(maxapi.TimestampSince, 0)
}
var now = time.Now()
for {
if from.After(now) {
break
}
// scan by 30 days
// an user might get most 14 commission records by currency per day
// limit 1000 / 14 = 71 days
to := from.Add(time.Hour * 24 * 30)
req := e.client.RewardService.NewRewardsRequest()
req.From(from.Unix())
req.To(to.Unix())
req.Limit(1000)
maxRewards, err := req.Do(ctx)
if err != nil {
return nil, err
}
if len(maxRewards) == 0 {
// next page
from = to
continue
}
rewards, err := toGlobalRewards(maxRewards)
if err != nil {
return nil, err
}
// sort them in the ascending order
sort.Reverse(types.RewardSliceByCreationTime{rewards})
return rewards, err
}
return nil, errors.New("unknown error")
}
func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval types.Interval, options types.KLineQueryOptions) ([]types.KLine, error) {
if err := marketDataLimiter.Wait(ctx) ; err != nil {
if err := marketDataLimiter.Wait(ctx); err != nil {
return nil, err
}
@ -605,7 +654,6 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
return nil, errors.New("start time can not be empty")
}
log.Infof("querying kline %s %s %+v", symbol, interval, options)
localKLines, err := e.client.PublicService.KLines(toLocalSymbol(symbol), string(interval), *options.StartTime, limit)
if err != nil {

View File

@ -20,6 +20,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/version"
@ -32,8 +33,16 @@ const (
UserAgent = "bbgo/" + version.Version
defaultHTTPTimeout = time.Second * 30
TimestampSince = 1535760000
)
var debugMaxRequestPayload = true
func init() {
debugMaxRequestPayload = viper.GetBool("MAX_DEBUG_REQUEST_PAYLOAD")
}
var logger = log.WithField("exchange", "max")
var htmlTagPattern = regexp.MustCompile("<[/]?[a-zA-Z-]+.*?>")
@ -187,6 +196,10 @@ func (c *RestClient) newAuthenticatedRequest(m string, refURL string, data inter
p, err = json.Marshal(d)
}
if debugMaxRequestPayload {
log.Infof("request payload: %s", p)
}
if err != nil {
return nil, err
}
@ -204,6 +217,7 @@ func (c *RestClient) newAuthenticatedRequest(m string, refURL string, data inter
return nil, err
}
encoded := base64.StdEncoding.EncodeToString(p)
req.Header.Add("Content-Type", "application/json")

View File

@ -4,8 +4,11 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type RewardType string
@ -55,7 +58,34 @@ func (t *RewardType) UnmarshalJSON(o []byte) error {
return nil
}
func (t RewardType) RewardType() (types.RewardType, error) {
switch t {
case RewardAirdrop:
return types.RewardAirdrop, nil
case RewardCommission:
return types.RewardCommission, nil
case RewardHolding:
return types.RewardHolding, nil
case RewardMining:
return types.RewardMining, nil
case RewardTrading:
return types.RewardTrading, nil
case RewardVipRebate:
return types.RewardVipRebate, nil
}
return types.RewardType(""), fmt.Errorf("unknown reward type: %s", t)
}
type Reward struct {
// UUID here is more like SN, not the real UUID
UUID string `json:"uuid"`
Type RewardType `json:"type"`
Currency string `json:"currency"`
@ -67,6 +97,25 @@ type Reward struct {
CreatedAt Timestamp `json:"created_at"`
}
func (reward Reward) Reward() (*types.Reward, error) {
rt, err := reward.Type.RewardType()
if err != nil {
return nil, err
}
return &types.Reward{
UUID: reward.UUID,
Exchange: types.ExchangeMax,
Type: rt,
Currency: strings.ToUpper(reward.Currency),
Quantity: reward.Amount,
State: reward.State,
Note: reward.Note,
Used: false,
CreatedAt: datatype.Time(reward.CreatedAt),
}, nil
}
type RewardService struct {
client *RestClient
}
@ -91,6 +140,8 @@ type RewardsRequest struct {
// To Unix-timestamp
to *int64
limit *int
}
func (r *RewardsRequest) Currency(currency string) *RewardsRequest {
@ -103,6 +154,11 @@ func (r *RewardsRequest) From(from int64) *RewardsRequest {
return r
}
func (r *RewardsRequest) Limit(limit int) *RewardsRequest {
r.limit = &limit
return r
}
func (r *RewardsRequest) To(to int64) *RewardsRequest {
r.to = &to
return r
@ -123,6 +179,10 @@ func (r *RewardsRequest) Do(ctx context.Context) (rewards []Reward, err error) {
payload["from"] = r.from
}
if r.limit != nil {
payload["limit"] = r.limit
}
refURL := "v2/rewards"
if r.pathType != nil {

View File

@ -14,7 +14,7 @@ func init() {
func upAddRewardsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `used` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `id` (`id`)\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) UNSIGNED NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `spent` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL,\n PRIMARY KEY (`gid`),\n UNIQUE KEY `uuid` (`exchange`, `uuid`)\n);")
if err != nil {
return err
}

View File

@ -14,7 +14,7 @@ func init() {
func upAddRewardsTable(ctx context.Context, tx rockhopper.SQLExecutor) (err error) {
// This code is executed when the migration is applied.
_, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `used` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL\n);")
_, err = tx.ExecContext(ctx, "CREATE TABLE `rewards`\n(\n `gid` INTEGER PRIMARY KEY AUTOINCREMENT,\n -- for exchange\n `exchange` VARCHAR(24) NOT NULL DEFAULT '',\n -- reward record id\n `uuid` VARCHAR(32) NOT NULL,\n `reward_type` VARCHAR(24) NOT NULL DEFAULT '',\n -- currency symbol, BTC, MAX, USDT ... etc\n `currency` VARCHAR(5) NOT NULL,\n -- the quantity of the rewards\n `quantity` DECIMAL(16, 8) NOT NULL,\n `state` VARCHAR(5) NOT NULL,\n `created_at` DATETIME NOT NULL,\n `spent` BOOLEAN NOT NULL DEFAULT FALSE,\n `note` TEXT NULL\n);")
if err != nil {
return err
}

View File

@ -15,11 +15,24 @@ type RewardService struct {
func NewRewardService(db *sqlx.DB) *RewardService {
return &RewardService{db}
}
func (s *RewardService) Query(ex types.ExchangeName, rewardType string, from time.Time) ([]types.Trade, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND (symbol = :symbol OR fee_currency = :fee_currency) ORDER BY traded_at ASC`, map[string]interface{}{
"exchange": ex,
"reward_type": rewardType,
"from": from.Unix(),
func (s *RewardService) QueryLast(ex types.ExchangeName, limit int) ([]types.Reward, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM rewards WHERE exchange = :exchange ORDER BY created_at DESC LIMIT :limit`, map[string]interface{}{
"exchange": ex,
"limit": limit,
})
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows)
}
func (s *RewardService) QueryUnspent(ex types.ExchangeName, from time.Time) ([]types.Reward, error) {
rows, err := s.DB.NamedQuery(`SELECT * FROM rewards WHERE exchange = :exchange AND spent IS FALSE ORDER BY created_at ASC`, map[string]interface{}{
"exchange": ex,
"from": from,
})
if err != nil {
return nil, err
@ -30,24 +43,23 @@ func (s *RewardService) Query(ex types.ExchangeName, rewardType string, from tim
return s.scanRows(rows)
}
func (s *RewardService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err error) {
func (s *RewardService) scanRows(rows *sqlx.Rows) (rewards []types.Reward, err error) {
for rows.Next() {
var trade types.Trade
if err := rows.StructScan(&trade); err != nil {
return trades, err
var reward types.Reward
if err := rows.StructScan(&reward); err != nil {
return rewards, err
}
trades = append(trades, trade)
rewards = append(rewards, reward)
}
return trades, rows.Err()
return rewards, rows.Err()
}
func (s *RewardService) Insert(trade types.Trade) error {
func (s *RewardService) Insert(reward types.Reward) error {
_, err := s.DB.NamedExec(`
INSERT INTO trades (id, exchange, order_id, symbol, price, quantity, quote_quantity, side, is_buyer, is_maker, fee, fee_currency, traded_at, is_margin, is_isolated)
VALUES (:id, :exchange, :order_id, :symbol, :price, :quantity, :quote_quantity, :side, :is_buyer, :is_maker, :fee, :fee_currency, :traded_at, :is_margin, :is_isolated)`,
trade)
INSERT INTO rewards (exchange, uuid, reward_type, quantity, state, created_at)
VALUES (:exchange, :uuid, :reward_type, :quantity, :state, :created_at)`,
reward)
return err
}

View File

@ -2,19 +2,124 @@ package service
import (
"context"
"errors"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
var ErrNotImplemented = errors.New("exchange does not implement ExchangeRewardService interface")
type SyncService struct {
TradeService *TradeService
OrderService *OrderService
TradeService *TradeService
OrderService *OrderService
RewardService *RewardService
}
func (s *SyncService) SyncRewards(ctx context.Context, exchange types.Exchange) error {
service, ok := exchange.(types.ExchangeRewardService)
if !ok {
return ErrNotImplemented
}
var startTime time.Time
lastRecords, err := s.RewardService.QueryLast(exchange.Name(), 10)
if err != nil {
return err
}
if len(lastRecords) > 0 {
end := len(lastRecords) - 1
lastRecord := lastRecords[end]
startTime = lastRecord.CreatedAt.Time()
}
batchQuery := &RewardBatchQuery{Service: service}
rewardsC, errC := batchQuery.Query(ctx, startTime, time.Now())
for reward := range rewardsC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
}
if err := s.RewardService.Insert(reward); err != nil {
return err
}
}
return <-errC
}
type RewardBatchQuery struct {
Service types.ExchangeRewardService
}
func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) {
c = make(chan types.Reward, 500)
errC = make(chan error, 1)
go func() {
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
defer close(c)
defer close(errC)
rewardKeys := make(map[string]struct{}, 500)
for startTime.Before(endTime) {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
}
logrus.Infof("batch querying rewards %s <=> %s", startTime, endTime)
rewards, err := q.Service.QueryRewards(ctx, startTime)
if err != nil {
errC <- err
return
}
if len(rewards) == 0 {
return
}
for _, o := range rewards {
if _, ok := rewardKeys[o.UUID]; ok {
logrus.Infof("skipping duplicated order id: %s", o.UUID)
continue
}
if o.CreatedAt.Time().After(endTime) {
// stop batch query
return
}
c <- o
startTime = o.CreatedAt.Time()
rewardKeys[o.UUID] = struct{}{}
}
}
}()
return c, errC
}
func (s *SyncService) SyncOrders(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isIsolated := false
@ -147,6 +252,15 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc
if err := s.SyncOrders(ctx, exchange, symbol, startTime); err != nil {
return err
}
if err := s.SyncRewards(ctx, exchange) ; err != nil {
if err == ErrNotImplemented {
continue
}
return err
}
}
return nil

View File

@ -85,6 +85,10 @@ type Exchange interface {
CancelOrders(ctx context.Context, orders ...Order) error
}
type ExchangeRewardService interface {
QueryRewards(ctx context.Context, startTime time.Time) ([]Reward, error)
}
type TradeQueryOptions struct {
StartTime *time.Time
EndTime *time.Time

View File

@ -1,6 +1,8 @@
package types
import (
"time"
"github.com/c9s/bbgo/pkg/datatype"
"github.com/c9s/bbgo/pkg/fixedpoint"
)
@ -18,13 +20,30 @@ const (
type Reward struct {
UUID string `json:"uuid" db:"uuid"`
Exchange ExchangeName `json:"exchange" db:"exchange"`
Type RewardType `json:"reward_type" db:"reward_type"`
Currency string `json:"currency" db:"currency"`
Amount fixedpoint.Value `json:"quantity" db:"quantity"`
Quantity fixedpoint.Value `json:"quantity" db:"quantity"`
State string `json:"state" db:"state"`
Note string `json:"note" db:"note"`
Used bool `json:"used" db:"used"`
Used bool `json:"spent" db:"spent"`
// Unix timestamp in seconds
CreatedAt datatype.Time `json:"created_at"`
CreatedAt datatype.Time `json:"created_at" db:"created_at"`
}
type RewardSlice []Reward
func (s RewardSlice) Len() int { return len(s) }
func (s RewardSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
type RewardSliceByCreationTime struct {
RewardSlice
}
// Less reports whether x[i] should be ordered before x[j]
func (s RewardSliceByCreationTime) Less(i, j int) bool {
return time.Time(s.RewardSlice[i].CreatedAt).After(
time.Time(s.RewardSlice[j].CreatedAt),
)
}

View File

@ -90,7 +90,7 @@ func (trade Trade) SlackAttachment() slack.Attachment {
{Title: "Exchange", Value: trade.Exchange, Short: true},
{Title: "Price", Value: util.FormatFloat(trade.Price, 2), Short: true},
{Title: "Volume", Value: util.FormatFloat(trade.Quantity, 4), Short: true},
{Title: "Amount", Value: util.FormatFloat(trade.QuoteQuantity, 2)},
{Title: "Quantity", Value: util.FormatFloat(trade.QuoteQuantity, 2)},
{Title: "Fee", Value: util.FormatFloat(trade.Fee, 4), Short: true},
{Title: "FeeCurrency", Value: trade.FeeCurrency, Short: true},
},