Merge pull request #648 from c9s/feature/binance-margin-history

feature: binance margin history sync support
This commit is contained in:
Yo-An Lin 2022-06-01 19:43:07 +08:00 committed by GitHub
commit bef73cf880
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 451 additions and 375 deletions

View File

@ -4,6 +4,13 @@ sessions:
exchange: binance
envVarPrefix: binance
binance_margin_dotusdt:
exchange: binance
envVarPrefix: binance
margin: true
isolatedMargin: true
isolatedMarginSymbol: DOTUSDT
max:
exchange: max
envVarPrefix: max
@ -16,12 +23,13 @@ sync:
filledOrders: true
# since is the start date of your trading data
since: 2019-11-01
since: 2022-01-01
# sessions is the list of session names you want to sync
# by default, BBGO sync all your available sessions.
sessions:
- binance
- binance_margin_dotusdt
- max
# symbols is the list of symbols you want to sync
@ -29,8 +37,15 @@ sync:
symbols:
- BTCUSDT
- ETHUSDT
- LINKUSDT
depositHistory: true
# marginHistory enables the margin history sync
marginHistory: true
# marginAssets lists the assets that are used in the margin.
# including loan, repay, interest and liquidation
marginAssets:
- USDT
# depositHistory: true
rewardHistory: true
withdrawHistory: true
# withdrawHistory: true

View File

@ -215,15 +215,20 @@ type SyncConfig struct {
// Symbols is the list of symbol to sync, if ignored, symbols wlll be discovered by your existing crypto balances
Symbols []string `json:"symbols,omitempty" yaml:"symbols,omitempty"`
// DepositHistory for syncing deposit history
// DepositHistory is for syncing deposit history
DepositHistory bool `json:"depositHistory" yaml:"depositHistory"`
// WithdrawHistory for syncing withdraw history
// WithdrawHistory is for syncing withdraw history
WithdrawHistory bool `json:"withdrawHistory" yaml:"withdrawHistory"`
// RewardHistory for syncing reward history
// RewardHistory is for syncing reward history
RewardHistory bool `json:"rewardHistory" yaml:"rewardHistory"`
// MarginHistory is for syncing margin related history: loans, repays, interests and liquidations
MarginHistory bool `json:"marginHistory" yaml:"marginHistory"`
MarginAssets []string `json:"marginAssets" yaml:"marginAssets"`
// Since is the date where you want to start syncing data
Since *types.LooseFormatTime `json:"since,omitempty"`

View File

@ -81,8 +81,11 @@ type Environment struct {
PositionService *service.PositionService
BacktestService *service.BacktestService
RewardService *service.RewardService
MarginService *service.MarginService
SyncService *service.SyncService
AccountService *service.AccountService
WithdrawService *service.WithdrawService
DepositService *service.DepositService
// startTime is the time of start point (which is used in the backtest)
startTime time.Time
@ -176,11 +179,14 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver
environ.AccountService = &service.AccountService{DB: db}
environ.ProfitService = &service.ProfitService{DB: db}
environ.PositionService = &service.PositionService{DB: db}
environ.MarginService = &service.MarginService{DB: db}
environ.WithdrawService = &service.WithdrawService{DB: db}
environ.DepositService = &service.DepositService{DB: db}
environ.SyncService = &service.SyncService{
TradeService: environ.TradeService,
OrderService: environ.OrderService,
RewardService: environ.RewardService,
MarginService: environ.MarginService,
WithdrawService: &service.WithdrawService{DB: db},
DepositService: &service.DepositService{DB: db},
}
@ -573,12 +579,60 @@ func (environ *Environment) setSyncing(status SyncStatus) {
environ.syncStatusMutex.Unlock()
}
func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *Config) error {
syncSymbols := userConfig.Sync.Symbols
sessions := environ.sessions
selectedSessions := userConfig.Sync.Sessions
if len(selectedSessions) > 0 {
sessions = environ.SelectSessions(selectedSessions...)
}
for _, session := range sessions {
if err := environ.syncSession(ctx, session, syncSymbols...); err != nil {
return err
}
if userConfig.Sync.DepositHistory {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil {
return err
}
}
if userConfig.Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil {
return err
}
}
if userConfig.Sync.RewardHistory {
if err := environ.SyncService.SyncRewardHistory(ctx, session.Exchange); err != nil {
return err
}
}
if userConfig.Sync.MarginHistory {
if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange,
userConfig.Sync.Since.Time(),
userConfig.Sync.MarginAssets...); err != nil {
return err
}
}
}
return nil
}
// Sync syncs all registered exchange sessions
func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) error {
if environ.SyncService == nil {
return nil
}
// for paper trade mode, skip sync
if util.IsPaperTrade() {
return nil
}
environ.syncMutex.Lock()
defer environ.syncMutex.Unlock()
@ -587,38 +641,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err
// sync by the defined user config
if len(userConfig) > 0 && userConfig[0] != nil && userConfig[0].Sync != nil {
syncSymbols := userConfig[0].Sync.Symbols
sessions := environ.sessions
selectedSessions := userConfig[0].Sync.Sessions
if len(selectedSessions) > 0 {
sessions = environ.SelectSessions(selectedSessions...)
}
for _, session := range sessions {
if err := environ.syncSession(ctx, session, syncSymbols...); err != nil {
return err
}
if userConfig[0].Sync.DepositHistory {
if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil {
return err
}
}
if userConfig[0].Sync.WithdrawHistory {
if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil {
return err
}
}
if userConfig[0].Sync.RewardHistory {
if err := environ.SyncService.SyncRewardHistory(ctx, session.Exchange); err != nil {
return err
}
}
}
return nil
return environ.syncWithUserConfig(ctx, userConfig[0])
}
// the default sync logics

View File

@ -6,14 +6,13 @@ import (
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/c9s/bbgo/pkg/bbgo"
)
func init() {
SyncCmd.Flags().String("session", "", "the exchange session name for sync")
SyncCmd.Flags().StringArray("session", []string{}, "the exchange session name for sync")
SyncCmd.Flags().String("symbol", "", "symbol of market for syncing")
SyncCmd.Flags().String("since", "", "sync from time")
RootCmd.AddCommand(SyncCmd)
@ -58,7 +57,7 @@ var SyncCmd = &cobra.Command{
return err
}
sessionName, err := cmd.Flags().GetString("session")
sessionNames, err := cmd.Flags().GetStringArray("session")
if err != nil {
return err
}
@ -88,35 +87,18 @@ var SyncCmd = &cobra.Command{
environ.SetSyncStartTime(syncStartTime)
// syncSymbols is the symbol list to sync
var syncSymbols []string
if userConfig.Sync != nil && len(userConfig.Sync.Symbols) > 0 {
syncSymbols = userConfig.Sync.Symbols
}
if len(symbol) > 0 {
syncSymbols = []string{symbol}
}
var selectedSessions []string
if userConfig.Sync != nil && len(userConfig.Sync.Sessions) > 0 {
selectedSessions = userConfig.Sync.Sessions
}
if len(sessionName) > 0 {
selectedSessions = []string{sessionName}
}
sessions := environ.SelectSessions(selectedSessions...)
for _, session := range sessions {
if err := environ.SyncSession(ctx, session, syncSymbols...); err != nil {
return err
if userConfig.Sync != nil && len(userConfig.Sync.Symbols) > 0 {
userConfig.Sync.Symbols = []string{symbol}
}
log.Infof("exchange session %s synchronization done", session.Name)
}
return nil
if len(sessionNames) > 0 {
if userConfig.Sync != nil && len(userConfig.Sync.Sessions) > 0 {
userConfig.Sync.Sessions = sessionNames
}
}
return environ.Sync(ctx, userConfig)
},
}

View File

@ -17,6 +17,7 @@ func (e *MarginInterestBatchQuery) Query(ctx context.Context, asset string, star
query := &AsyncTimeRangedBatchQuery{
Type: types.MarginInterest{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 30,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.QueryInterestHistory(ctx, asset, &startTime, &endTime)
},

View File

@ -18,6 +18,7 @@ func (e *MarginLiquidationBatchQuery) Query(ctx context.Context, startTime, endT
query := &AsyncTimeRangedBatchQuery{
Type: types.MarginLiquidation{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 30,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.QueryLiquidationHistory(ctx, &startTime, &endTime)
},

View File

@ -16,8 +16,9 @@ type MarginLoanBatchQuery struct {
func (e *MarginLoanBatchQuery) Query(ctx context.Context, asset string, startTime, endTime time.Time) (c chan types.MarginLoan, errC chan error) {
query := &AsyncTimeRangedBatchQuery{
Type: types.MarginLoan{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
Type: types.MarginLoan{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 30,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.QueryLoanHistory(ctx, asset, &startTime, &endTime)
},

View File

@ -18,6 +18,7 @@ func (e *MarginRepayBatchQuery) Query(ctx context.Context, asset string, startTi
query := &AsyncTimeRangedBatchQuery{
Type: types.MarginRepay{},
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
JumpIfEmpty: time.Hour * 24 * 30,
Q: func(startTime, endTime time.Time) (interface{}, error) {
return e.QueryRepayHistory(ctx, asset, &startTime, &endTime)
},

View File

@ -10,6 +10,7 @@ import (
func toGlobalLoan(record binanceapi.MarginLoanRecord) types.MarginLoan {
return types.MarginLoan{
Exchange: types.ExchangeBinance,
TransactionID: uint64(record.TxId),
Asset: record.Asset,
Principle: record.Principal,
@ -20,6 +21,7 @@ func toGlobalLoan(record binanceapi.MarginLoanRecord) types.MarginLoan {
func toGlobalRepay(record binanceapi.MarginRepayRecord) types.MarginRepay {
return types.MarginRepay{
Exchange: types.ExchangeBinance,
TransactionID: record.TxId,
Asset: record.Asset,
Principle: record.Principal,
@ -30,6 +32,7 @@ func toGlobalRepay(record binanceapi.MarginRepayRecord) types.MarginRepay {
func toGlobalInterest(record binanceapi.MarginInterest) types.MarginInterest {
return types.MarginInterest{
Exchange: types.ExchangeBinance,
Asset: record.Asset,
Principle: record.Principal,
Interest: record.Interest,
@ -41,6 +44,7 @@ func toGlobalInterest(record binanceapi.MarginInterest) types.MarginInterest {
func toGlobalLiquidation(record binanceapi.MarginLiquidationRecord) types.MarginLiquidation {
return types.MarginLiquidation{
Exchange: types.ExchangeBinance,
AveragePrice: record.AveragePrice,
ExecutedQuantity: record.ExecutedQuantity,
OrderID: record.OrderId,

View File

@ -38,7 +38,7 @@ func prepareDB(t *testing.T) (*rockhopper.DB, error) {
ctx := context.Background()
err = rockhopper.Up(ctx, db, migrations, 0, 0)
assert.NoError(t, err)
assert.NoError(t, err, "should migrate successfully")
return db, err
}

View File

@ -2,33 +2,16 @@ package service
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
// SyncSelect defines the behaviors for syncing remote records
type SyncSelect struct {
Select sq.SelectBuilder
Type interface{}
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
// ID is a function that returns the unique identity of the object
ID func(obj interface{}) string
// Time is a function that returns the time of the object
Time func(obj interface{}) time.Time
}
type MarginService struct {
DB *sqlx.DB
}
@ -36,20 +19,20 @@ type MarginService struct {
func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset string, startTime time.Time) error {
api, ok := ex.(types.MarginHistory)
if !ok {
return ErrNotImplemented
return nil
}
marginExchange, ok := ex.(types.MarginExchange)
if !ok {
return fmt.Errorf("%T does not implement margin service", ex)
return nil
}
marginSettings := marginExchange.GetMarginSettings()
if !marginSettings.IsMargin {
return fmt.Errorf("exchange instance %s is not using margin", ex.Name())
return nil
}
sels := []SyncSelect{
tasks := []SyncTask{
{
Select: SelectLastMarginLoans(ex.Name(), 100),
Type: types.MarginLoan{},
@ -59,6 +42,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
}
return query.Query(ctx, asset, startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.MarginLoan).Time.Time()
},
ID: func(obj interface{}) string {
return strconv.FormatUint(obj.(types.MarginLoan).TransactionID, 10)
},
@ -72,6 +58,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
}
return query.Query(ctx, asset, startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.MarginRepay).Time.Time()
},
ID: func(obj interface{}) string {
return strconv.FormatUint(obj.(types.MarginRepay).TransactionID, 10)
},
@ -85,6 +74,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
}
return query.Query(ctx, asset, startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.MarginInterest).Time.Time()
},
ID: func(obj interface{}) string {
m := obj.(types.MarginInterest)
return m.Asset + m.IsolatedSymbol + strconv.FormatInt(m.Time.UnixMilli(), 10)
@ -99,6 +91,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
}
return query.Query(ctx, startTime, endTime)
},
Time: func(obj interface{}) time.Time {
return obj.(types.MarginLiquidation).UpdatedTime.Time()
},
ID: func(obj interface{}) string {
m := obj.(types.MarginLiquidation)
return strconv.FormatUint(m.OrderID, 10)
@ -106,116 +101,20 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
},
}
NextQuery:
for _, sel := range sels {
// query from db
recordSlice, err := s.executeDbQuery(ctx, sel.Select, sel.Type)
if err != nil {
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
recordSliceRef := reflect.ValueOf(recordSlice)
if recordSliceRef.Kind() == reflect.Ptr {
recordSliceRef = recordSliceRef.Elem()
}
logrus.Debugf("loaded %d records", recordSliceRef.Len())
ids := buildIdMap(sel, recordSliceRef)
sortRecords(sel, recordSliceRef)
// default since time point
since := lastRecordTime(sel, recordSliceRef, startTime)
// asset "" means all assets
dataC, errC := sel.BatchQuery(ctx, since, time.Now())
dataCRef := reflect.ValueOf(dataC)
for {
select {
case <-ctx.Done():
return nil
case err := <-errC:
return err
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
if err != nil {
return err
}
// closed chan, skip to next query
continue NextQuery
}
obj := v.Interface()
id := sel.ID(obj)
if _, exists := ids[id]; exists {
continue
}
logrus.Debugf("inserting %T: %+v", obj, obj)
if err := s.Insert(obj); err != nil {
return err
}
}
}
}
return nil
}
func (s *MarginService) executeDbQuery(ctx context.Context, sel sq.SelectBuilder, tpe interface{}) (interface{}, error) {
sql, args, err := sel.ToSql()
if err != nil {
return nil, err
}
rows, err := s.DB.QueryxContext(ctx, sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows, tpe)
}
func (s *MarginService) scanRows(rows *sqlx.Rows, tpe interface{}) (interface{}, error) {
refType := reflect.TypeOf(tpe)
if refType.Kind() == reflect.Ptr {
refType = refType.Elem()
}
sliceRef := reflect.New(reflect.SliceOf(refType))
for rows.Next() {
var recordRef = reflect.New(refType)
var record = recordRef.Interface()
if err := rows.StructScan(&record); err != nil {
return sliceRef.Interface(), err
}
sliceRef = reflect.Append(sliceRef, recordRef)
}
return sliceRef.Interface(), rows.Err()
}
func (s *MarginService) Insert(record interface{}) error {
sql := dbCache.InsertSqlOf(record)
_, err := s.DB.NamedExec(sql, record)
return err
}
func SelectLastMarginLoans(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("margin_loans").
Where(sq.Eq{"exchange": ex}).
OrderBy("time").
OrderBy("time DESC").
Limit(limit)
}
@ -223,7 +122,7 @@ func SelectLastMarginRepays(ex types.ExchangeName, limit uint64) sq.SelectBuilde
return sq.Select("*").
From("margin_repays").
Where(sq.Eq{"exchange": ex}).
OrderBy("time").
OrderBy("time DESC").
Limit(limit)
}
@ -231,7 +130,7 @@ func SelectLastMarginInterests(ex types.ExchangeName, limit uint64) sq.SelectBui
return sq.Select("*").
From("margin_interests").
Where(sq.Eq{"exchange": ex}).
OrderBy("time").
OrderBy("time DESC").
Limit(limit)
}
@ -239,36 +138,6 @@ func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.Select
return sq.Select("*").
From("margin_liquidations").
Where(sq.Eq{"exchange": ex}).
OrderBy("time").
OrderBy("time DESC").
Limit(limit)
}
func lastRecordTime(sel SyncSelect, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime
length := recordSlice.Len()
if length > 0 {
since = sel.Time(recordSlice.Index(length - 1))
}
return since
}
func sortRecords(sel SyncSelect, recordSlice reflect.Value) {
// always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b)
})
}
func buildIdMap(sel SyncSelect, recordSliceRef reflect.Value) map[string]struct{} {
ids := map[string]struct{}{}
for i := 0; i < recordSliceRef.Len(); i++ {
entryRef := recordSliceRef.Index(i)
id := sel.ID(entryRef.Interface())
ids[id] = struct{}{}
}
return ids
}

View File

@ -25,19 +25,24 @@ func TestMarginService(t *testing.T) {
ex.MarginSettings.IsIsolatedMargin = true
ex.MarginSettings.IsolatedMarginSymbol = "DOTUSDT"
logrus.SetLevel(logrus.ErrorLevel)
db, err := prepareDB(t)
assert.NoError(t, err)
if err != nil {
t.Fatal(err)
t.Fail()
return
}
defer db.Close()
ctx := context.Background()
logrus.SetLevel(logrus.DebugLevel)
dbx := sqlx.NewDb(db.DB, "sqlite3")
service := &MarginService{DB: dbx}
logrus.SetLevel(logrus.DebugLevel)
err = service.Sync(ctx, ex, "USDT", time.Date(2022, time.February, 1, 0, 0, 0, 0, time.UTC))
assert.NoError(t, err)

View File

@ -19,26 +19,10 @@ type OrderService struct {
}
func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isFutures := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
symbol = futuresSettings.IsolatedFuturesSymbol
}
isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50)
@ -99,7 +83,6 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
return <-errC
}
// QueryLast queries the last order from the database
func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Order, error) {
log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated)

View File

@ -1,11 +1,15 @@
package service
import (
"context"
"reflect"
"strings"
"github.com/Masterminds/squirrel"
"github.com/fatih/camelcase"
gopluralize "github.com/gertd/go-pluralize"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
var pluralize = gopluralize.NewClient()
@ -152,3 +156,51 @@ func (c *ReflectCache) FieldsOf(t interface{}) []string {
c.fields[typeName] = fields
return fields
}
// scanRowsOfType use the given type to scan rows
// this is usually slower than the native one since it uses reflect.
func scanRowsOfType(rows *sqlx.Rows, tpe interface{}) (interface{}, error) {
refType := reflect.TypeOf(tpe)
if refType.Kind() == reflect.Ptr {
refType = refType.Elem()
}
sliceRef := reflect.MakeSlice(reflect.SliceOf(refType), 0, 100)
// sliceRef := reflect.New(reflect.SliceOf(refType))
for rows.Next() {
var recordRef = reflect.New(refType)
var record = recordRef.Interface()
if err := rows.StructScan(record); err != nil {
return sliceRef.Interface(), err
}
sliceRef = reflect.Append(sliceRef, recordRef.Elem())
}
return sliceRef.Interface(), rows.Err()
}
func insertType(db *sqlx.DB, record interface{}) error {
sql := dbCache.InsertSqlOf(record)
_, err := db.NamedExec(sql, record)
return err
}
func selectAndScanType(ctx context.Context, db *sqlx.DB, sel squirrel.SelectBuilder, tpe interface{}) (interface{}, error) {
sql, args, err := sel.ToSql()
if err != nil {
return nil, err
}
logrus.Debugf("selectAndScanType: %T <- %s", tpe, sql)
logrus.Debugf("queryArgs: %v", args)
rows, err := db.QueryxContext(ctx, sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanRowsOfType(rows, tpe)
}

View File

@ -58,7 +58,7 @@ func Test_fieldsNamesOf(t *testing.T) {
{
name: "MarginInterest",
args: args{record: &types.MarginInterest{}},
want: []string{"exchange", "asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"},
want: []string{"gid", "exchange", "asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"},
},
}
for _, tt := range tests {

View File

@ -10,7 +10,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
var ErrNotImplemented = errors.New("not implemented")
@ -22,11 +21,7 @@ type SyncService struct {
RewardService *RewardService
WithdrawService *WithdrawService
DepositService *DepositService
}
func paperTrade() bool {
v, ok := util.GetEnvVarBool("PAPER_TRADE")
return ok && v
MarginService *MarginService
}
// SyncSessionSymbols syncs the trades from the given exchange session
@ -50,20 +45,44 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc
}
}
if paperTrade() {
return nil
}
func (s *SyncService) SyncMarginHistory(ctx context.Context, exchange types.Exchange, startTime time.Time, assets ...string) error {
if _, implemented := exchange.(types.MarginHistory); !implemented {
log.Debugf("exchange %T does not support types.MarginHistory", exchange)
return nil
}
if marginExchange, implemented := exchange.(types.MarginExchange); !implemented {
log.Debugf("exchange %T does not implement types.MarginExchange", exchange)
return nil
} else {
marginSettings := marginExchange.GetMarginSettings()
if !marginSettings.IsMargin {
log.Debugf("exchange %T is not using margin", exchange)
return nil
}
}
log.Infof("syncing %s margin history: %v...", exchange.Name(), assets)
for _, asset := range assets {
if err := s.MarginService.Sync(ctx, exchange, asset, startTime); err != nil {
return err
}
}
return nil
}
func (s *SyncService) SyncRewardHistory(ctx context.Context, exchange types.Exchange) error {
if _, implemented := exchange.(types.ExchangeRewardService); !implemented {
return nil
}
log.Infof("syncing %s reward records...", exchange.Name())
if err := s.RewardService.Sync(ctx, exchange); err != nil {
if err != ErrExchangeRewardServiceNotImplemented {
log.Warnf("%s reward service is not supported", exchange.Name())
return err
}
return err
}
return nil

130
pkg/service/sync_task.go Normal file
View File

@ -0,0 +1,130 @@
package service
import (
"context"
"reflect"
"sort"
"time"
"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// SyncTask defines the behaviors for syncing remote records
type SyncTask struct {
// Type is the element type of this sync task
// Since it will create a []Type slice from this type, you should not set pointer to this field
Type interface{}
// Select is the select query builder for querying db records
Select squirrel.SelectBuilder
// OnLoad is called when the records are loaded from the database
OnLoad func(objs interface{})
// ID is a function that returns the unique identity of the object
ID func(obj interface{}) string
// Time is a function that returns the time of the object
Time func(obj interface{}) time.Time
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
}
func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error {
// query from db
recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type)
if err != nil {
return err
}
recordSliceRef := reflect.ValueOf(recordSlice)
if recordSliceRef.Kind() == reflect.Ptr {
recordSliceRef = recordSliceRef.Elem()
}
logrus.Debugf("loaded %d %T records", recordSliceRef.Len(), sel.Type)
ids := buildIdMap(sel, recordSliceRef)
if err := sortRecords(sel, recordSliceRef); err != nil {
return err
}
if sel.OnLoad != nil {
sel.OnLoad(recordSliceRef.Interface())
}
// default since time point
since := lastRecordTime(sel, recordSliceRef, startTime)
// asset "" means all assets
dataC, errC := sel.BatchQuery(ctx, since, time.Now())
dataCRef := reflect.ValueOf(dataC)
for {
select {
case <-ctx.Done():
return nil
case err := <-errC:
return err
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
return err
}
obj := v.Interface()
id := sel.ID(obj)
if _, exists := ids[id]; exists {
continue
}
logrus.Infof("inserting %T: %+v", obj, obj)
if err := insertType(db, obj); err != nil {
return err
}
}
}
}
func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime
length := recordSlice.Len()
if length > 0 {
since = sel.Time(recordSlice.Index(length - 1).Interface())
}
return since
}
func sortRecords(sel SyncTask, recordSlice reflect.Value) error {
if sel.Time == nil {
return errors.New("time field is not set, can not sort records")
}
// always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b)
})
return nil
}
func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} {
ids := map[string]struct{}{}
for i := 0; i < recordSliceRef.Len(); i++ {
entryRef := recordSliceRef.Index(i)
id := sel.ID(entryRef.Interface())
ids[id] = struct{}{}
}
return ids
}

View File

@ -7,6 +7,7 @@ import (
"strings"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -51,86 +52,58 @@ func NewTradeService(db *sqlx.DB) *TradeService {
}
func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error {
isMargin := false
isFutures := false
isIsolated := false
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
symbol = marginSettings.IsolatedMarginSymbol
}
isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
symbol = isolatedSymbol
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
symbol = futuresSettings.IsolatedFuturesSymbol
}
}
// buffer 50 trades and use the trades ID to scan if the new trades are duplicated
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
// for exchange supports trade id query, we should always try to query from the first trade.
// 0 means disable.
var lastTradeID uint64 = 1
var now = time.Now()
if len(records) > 0 {
for _, record := range records {
tradeKeys[record.Key()] = struct{}{}
}
end := len(records) - 1
last := records[end]
lastTradeID = last.ID
startTime = last.Time.Time()
}
exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService)
api, ok := exchange.(types.ExchangeTradeHistoryService)
if !ok {
return nil
}
b := &batch.TradeBatchQuery{
ExchangeTradeHistoryService: exchangeTradeHistoryService,
lastTradeID := uint64(1)
tasks := []SyncTask{
{
Type: types.Trade{},
Select: SelectLastTrades(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100),
OnLoad: func(objs interface{}) {
// update last trade ID
trades := objs.([]types.Trade)
if len(trades) > 0 {
end := len(trades) - 1
last := trades[end]
lastTradeID = last.ID
}
},
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
query := &batch.TradeBatchQuery{
ExchangeTradeHistoryService: api,
}
return query.Query(ctx, symbol, &types.TradeQueryOptions{
StartTime: &startTime,
EndTime: &endTime,
LastTradeID: lastTradeID,
})
},
Time: func(obj interface{}) time.Time {
return obj.(types.Trade).Time.Time()
},
ID: func(obj interface{}) string {
trade := obj.(types.Trade)
return strconv.FormatUint(trade.ID, 10) + trade.Side.String()
},
},
}
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
LastTradeID: lastTradeID,
StartTime: &startTime,
EndTime: &now,
})
for trade := range tradeC {
select {
case <-ctx.Done():
return ctx.Err()
case err := <-errC:
if err != nil {
return err
}
default:
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
}
key := trade.Key()
if _, exists := tradeKeys[key]; exists {
continue
}
tradeKeys[key] = struct{}{}
/*
log.Infof("inserting trade: %s %d %s %-4s price: %-13v volume: %-11v %5s %s",
trade.Exchange,
trade.ID,
@ -140,13 +113,8 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
trade.Quantity,
trade.Liquidity(),
trade.Time.String())
if err := s.Insert(trade); err != nil {
return err
}
}
return <-errC
*/
return nil
}
func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) {
@ -472,43 +440,8 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro
}
func (s *TradeService) Insert(trade types.Trade) error {
_, err := s.DB.NamedExec(`
INSERT INTO trades (
id,
exchange,
order_id,
symbol,
price,
quantity,
quote_quantity,
side,
is_buyer,
is_maker,
fee,
fee_currency,
traded_at,
is_margin,
is_futures,
is_isolated)
VALUES (
:id,
:exchange,
:order_id,
:symbol,
:price,
:quantity,
:quote_quantity,
:side,
:is_buyer,
:is_maker,
:fee,
:fee_currency,
:traded_at,
:is_margin,
:is_futures,
:is_isolated
)`,
trade)
sql := dbCache.InsertSqlOf(trade)
_, err := s.DB.NamedExec(sql, trade)
return err
}
@ -516,3 +449,45 @@ func (s *TradeService) DeleteAll() error {
_, err := s.DB.Exec(`DELETE FROM trades`)
return err
}
func SelectLastTrades(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("trades").
Where(sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"exchange": ex},
sq.Eq{"is_margin": isMargin},
sq.Eq{"is_futures": isFutures},
sq.Eq{"is_isolated": isIsolated},
}).
OrderBy("traded_at DESC").
Limit(limit)
}
func getExchangeAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) {
if marginExchange, ok := exchange.(types.MarginExchange); ok {
marginSettings := marginExchange.GetMarginSettings()
isMargin = marginSettings.IsMargin
if isMargin {
isIsolated = marginSettings.IsIsolatedMargin
if marginSettings.IsIsolatedMargin {
isolatedSymbol = marginSettings.IsolatedMarginSymbol
}
}
}
if futuresExchange, ok := exchange.(types.FuturesExchange); ok {
futuresSettings := futuresExchange.GetFuturesSettings()
isFutures = futuresSettings.IsFutures
if isFutures {
isIsolated = futuresSettings.IsIsolatedFutures
if futuresSettings.IsIsolatedFutures {
isolatedSymbol = futuresSettings.IsolatedFuturesSymbol
}
}
}
return isMargin, isFutures, isIsolated, isolatedSymbol
}

View File

@ -60,6 +60,7 @@ type MarginBorrowRepayService interface {
}
type MarginInterest struct {
GID uint64 `json:"gid" db:"gid"`
Exchange ExchangeName `json:"exchange" db:"exchange"`
Asset string `json:"asset" db:"asset"`
Principle fixedpoint.Value `json:"principle" db:"principle"`
@ -70,6 +71,7 @@ type MarginInterest struct {
}
type MarginLoan struct {
GID uint64 `json:"gid" db:"gid"`
Exchange ExchangeName `json:"exchange" db:"exchange"`
TransactionID uint64 `json:"transactionID" db:"transaction_id"`
Asset string `json:"asset" db:"asset"`
@ -79,6 +81,7 @@ type MarginLoan struct {
}
type MarginRepay struct {
GID uint64 `json:"gid" db:"gid"`
Exchange ExchangeName `json:"exchange" db:"exchange"`
TransactionID uint64 `json:"transactionID" db:"transaction_id"`
Asset string `json:"asset" db:"asset"`
@ -88,6 +91,7 @@ type MarginRepay struct {
}
type MarginLiquidation struct {
GID uint64 `json:"gid" db:"gid"`
Exchange ExchangeName `json:"exchange" db:"exchange"`
AveragePrice fixedpoint.Value `json:"averagePrice" db:"average_price"`
ExecutedQuantity fixedpoint.Value `json:"executedQuantity" db:"executed_quantity"`

6
pkg/util/paper_trade.go Normal file
View File

@ -0,0 +1,6 @@
package util
func IsPaperTrade() bool {
v, ok := GetEnvVarBool("PAPER_TRADE")
return ok && v
}