From 279e4d8682da16596e0ddf998ba5b5de52377ffe Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 12:02:15 +0800 Subject: [PATCH 01/16] service: refactor sync task --- pkg/service/margin.go | 147 +-------------------------------------- pkg/service/reflect.go | 49 +++++++++++++ pkg/service/sync_task.go | 109 +++++++++++++++++++++++++++++ 3 files changed, 161 insertions(+), 144 deletions(-) create mode 100644 pkg/service/sync_task.go diff --git a/pkg/service/margin.go b/pkg/service/margin.go index f2e712350..536a7b37d 100644 --- a/pkg/service/margin.go +++ b/pkg/service/margin.go @@ -3,32 +3,16 @@ package service import ( "context" "fmt" - "reflect" - "sort" "strconv" "time" sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" - "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) -// SyncSelect defines the behaviors for syncing remote records -type SyncSelect struct { - Select sq.SelectBuilder - Type interface{} - BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) - - // ID is a function that returns the unique identity of the object - ID func(obj interface{}) string - - // Time is a function that returns the time of the object - Time func(obj interface{}) time.Time -} - type MarginService struct { DB *sqlx.DB } @@ -49,7 +33,7 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin return fmt.Errorf("exchange instance %s is not using margin", ex.Name()) } - sels := []SyncSelect{ + tasks := []SyncTask{ { Select: SelectLastMarginLoans(ex.Name(), 100), Type: types.MarginLoan{}, @@ -106,111 +90,15 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin }, } -NextQuery: - for _, sel := range sels { - // query from db - recordSlice, err := s.executeDbQuery(ctx, sel.Select, sel.Type) - if err != nil { + for _, sel := range tasks { + if err := sel.execute(ctx, s.DB, startTime); err != nil { return err } - - recordSliceRef := reflect.ValueOf(recordSlice) - if recordSliceRef.Kind() == reflect.Ptr { - recordSliceRef = recordSliceRef.Elem() - } - - logrus.Debugf("loaded %d records", recordSliceRef.Len()) - - ids := buildIdMap(sel, recordSliceRef) - sortRecords(sel, recordSliceRef) - - // default since time point - since := lastRecordTime(sel, recordSliceRef, startTime) - - // asset "" means all assets - dataC, errC := sel.BatchQuery(ctx, since, time.Now()) - dataCRef := reflect.ValueOf(dataC) - - for { - select { - case <-ctx.Done(): - return nil - - case err := <-errC: - return err - - default: - v, ok := dataCRef.Recv() - if !ok { - err := <-errC - if err != nil { - return err - } - - // closed chan, skip to next query - continue NextQuery - } - - obj := v.Interface() - id := sel.ID(obj) - if _, exists := ids[id]; exists { - continue - } - - logrus.Debugf("inserting %T: %+v", obj, obj) - if err := s.Insert(obj); err != nil { - return err - } - } - } } return nil } -func (s *MarginService) executeDbQuery(ctx context.Context, sel sq.SelectBuilder, tpe interface{}) (interface{}, error) { - sql, args, err := sel.ToSql() - - if err != nil { - return nil, err - } - - rows, err := s.DB.QueryxContext(ctx, sql, args...) - if err != nil { - return nil, err - } - - defer rows.Close() - return s.scanRows(rows, tpe) -} - -func (s *MarginService) scanRows(rows *sqlx.Rows, tpe interface{}) (interface{}, error) { - refType := reflect.TypeOf(tpe) - - if refType.Kind() == reflect.Ptr { - refType = refType.Elem() - } - - sliceRef := reflect.New(reflect.SliceOf(refType)) - for rows.Next() { - var recordRef = reflect.New(refType) - var record = recordRef.Interface() - if err := rows.StructScan(&record); err != nil { - return sliceRef.Interface(), err - } - - sliceRef = reflect.Append(sliceRef, recordRef) - } - - return sliceRef.Interface(), rows.Err() -} - -func (s *MarginService) Insert(record interface{}) error { - sql := dbCache.InsertSqlOf(record) - _, err := s.DB.NamedExec(sql, record) - return err -} - func SelectLastMarginLoans(ex types.ExchangeName, limit uint64) sq.SelectBuilder { return sq.Select("*"). From("margin_loans"). @@ -243,32 +131,3 @@ func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.Select Limit(limit) } -func lastRecordTime(sel SyncSelect, recordSlice reflect.Value, defaultTime time.Time) time.Time { - since := defaultTime - length := recordSlice.Len() - if length > 0 { - since = sel.Time(recordSlice.Index(length - 1)) - } - - return since -} - -func sortRecords(sel SyncSelect, recordSlice reflect.Value) { - // always sort - sort.Slice(recordSlice.Interface(), func(i, j int) bool { - a := sel.Time(recordSlice.Index(i).Interface()) - b := sel.Time(recordSlice.Index(j).Interface()) - return a.Before(b) - }) -} - -func buildIdMap(sel SyncSelect, recordSliceRef reflect.Value) map[string]struct{} { - ids := map[string]struct{}{} - for i := 0; i < recordSliceRef.Len(); i++ { - entryRef := recordSliceRef.Index(i) - id := sel.ID(entryRef.Interface()) - ids[id] = struct{}{} - } - - return ids -} diff --git a/pkg/service/reflect.go b/pkg/service/reflect.go index f8d938b87..a20aebb7b 100644 --- a/pkg/service/reflect.go +++ b/pkg/service/reflect.go @@ -1,11 +1,14 @@ package service import ( + "context" "reflect" "strings" + "github.com/Masterminds/squirrel" "github.com/fatih/camelcase" gopluralize "github.com/gertd/go-pluralize" + "github.com/jmoiron/sqlx" ) var pluralize = gopluralize.NewClient() @@ -152,3 +155,49 @@ func (c *ReflectCache) FieldsOf(t interface{}) []string { c.fields[typeName] = fields return fields } + +// scanRowsOfType use the given type to scan rows +// this is usually slower than the native one since it uses reflect. +func scanRowsOfType(rows *sqlx.Rows, tpe interface{}) (interface{}, error) { + refType := reflect.TypeOf(tpe) + + if refType.Kind() == reflect.Ptr { + refType = refType.Elem() + } + + sliceRef := reflect.New(reflect.SliceOf(refType)) + for rows.Next() { + var recordRef = reflect.New(refType) + var record = recordRef.Interface() + if err := rows.StructScan(&record); err != nil { + return sliceRef.Interface(), err + } + + sliceRef = reflect.Append(sliceRef, recordRef) + } + + return sliceRef.Interface(), rows.Err() +} + +func insertType(db *sqlx.DB, record interface{}) error { + sql := dbCache.InsertSqlOf(record) + _, err := db.NamedExec(sql, record) + return err +} + +func selectAndScanType(ctx context.Context, db *sqlx.DB, sel squirrel.SelectBuilder, tpe interface{}) (interface{}, error) { + sql, args, err := sel.ToSql() + + if err != nil { + return nil, err + } + + rows, err := db.QueryxContext(ctx, sql, args...) + if err != nil { + return nil, err + } + + defer rows.Close() + return scanRowsOfType(rows, tpe) +} + diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go new file mode 100644 index 000000000..19d43a992 --- /dev/null +++ b/pkg/service/sync_task.go @@ -0,0 +1,109 @@ +package service + +import ( + "context" + "reflect" + "sort" + "time" + + "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" +) + +// SyncTask defines the behaviors for syncing remote records +type SyncTask struct { + Select squirrel.SelectBuilder + Type interface{} + BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) + + // ID is a function that returns the unique identity of the object + ID func(obj interface{}) string + + // Time is a function that returns the time of the object + Time func(obj interface{}) time.Time +} + +func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error { + + // query from db + recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type) + if err != nil { + return err + } + + recordSliceRef := reflect.ValueOf(recordSlice) + if recordSliceRef.Kind() == reflect.Ptr { + recordSliceRef = recordSliceRef.Elem() + } + + logrus.Debugf("loaded %d records", recordSliceRef.Len()) + + ids := buildIdMap(sel, recordSliceRef) + sortRecords(sel, recordSliceRef) + + // default since time point + since := lastRecordTime(sel, recordSliceRef, startTime) + + // asset "" means all assets + dataC, errC := sel.BatchQuery(ctx, since, time.Now()) + dataCRef := reflect.ValueOf(dataC) + + for { + select { + case <-ctx.Done(): + return nil + + case err := <-errC: + return err + + default: + v, ok := dataCRef.Recv() + if !ok { + err := <-errC + return err + } + + obj := v.Interface() + id := sel.ID(obj) + if _, exists := ids[id]; exists { + continue + } + + logrus.Debugf("inserting %T: %+v", obj, obj) + if err := insertType(db, obj); err != nil { + return err + } + } + } +} + +func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time { + since := defaultTime + length := recordSlice.Len() + if length > 0 { + since = sel.Time(recordSlice.Index(length - 1)) + } + + return since +} + +func sortRecords(sel SyncTask, recordSlice reflect.Value) { + // always sort + sort.Slice(recordSlice.Interface(), func(i, j int) bool { + a := sel.Time(recordSlice.Index(i).Interface()) + b := sel.Time(recordSlice.Index(j).Interface()) + return a.Before(b) + }) +} + +func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} { + ids := map[string]struct{}{} + for i := 0; i < recordSliceRef.Len(); i++ { + entryRef := recordSliceRef.Index(i) + id := sel.ID(entryRef.Interface()) + ids[id] = struct{}{} + } + + return ids +} From bdc76e8db646b40302efee3b3b8071dd92736f1a Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:51:32 +0800 Subject: [PATCH 02/16] types: add gid field --- pkg/service/db_test.go | 2 +- pkg/service/margin_test.go | 11 ++++++++--- pkg/service/reflect_test.go | 2 +- pkg/types/margin.go | 4 ++++ 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/pkg/service/db_test.go b/pkg/service/db_test.go index 34029a799..3093f7dd1 100644 --- a/pkg/service/db_test.go +++ b/pkg/service/db_test.go @@ -38,7 +38,7 @@ func prepareDB(t *testing.T) (*rockhopper.DB, error) { ctx := context.Background() err = rockhopper.Up(ctx, db, migrations, 0, 0) - assert.NoError(t, err) + assert.NoError(t, err, "should migrate successfully") return db, err } diff --git a/pkg/service/margin_test.go b/pkg/service/margin_test.go index 387577eac..5fa85265d 100644 --- a/pkg/service/margin_test.go +++ b/pkg/service/margin_test.go @@ -25,19 +25,24 @@ func TestMarginService(t *testing.T) { ex.MarginSettings.IsIsolatedMargin = true ex.MarginSettings.IsolatedMarginSymbol = "DOTUSDT" + logrus.SetLevel(logrus.ErrorLevel) db, err := prepareDB(t) + + assert.NoError(t, err) + if err != nil { - t.Fatal(err) + t.Fail() + return } defer db.Close() ctx := context.Background() - logrus.SetLevel(logrus.DebugLevel) - dbx := sqlx.NewDb(db.DB, "sqlite3") service := &MarginService{DB: dbx} + + logrus.SetLevel(logrus.DebugLevel) err = service.Sync(ctx, ex, "USDT", time.Date(2022, time.February, 1, 0, 0, 0, 0, time.UTC)) assert.NoError(t, err) diff --git a/pkg/service/reflect_test.go b/pkg/service/reflect_test.go index 9eb525ae9..fa19056b7 100644 --- a/pkg/service/reflect_test.go +++ b/pkg/service/reflect_test.go @@ -58,7 +58,7 @@ func Test_fieldsNamesOf(t *testing.T) { { name: "MarginInterest", args: args{record: &types.MarginInterest{}}, - want: []string{"exchange", "asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"}, + want: []string{"gid", "exchange", "asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"}, }, } for _, tt := range tests { diff --git a/pkg/types/margin.go b/pkg/types/margin.go index c517954d0..fbec9e2a0 100644 --- a/pkg/types/margin.go +++ b/pkg/types/margin.go @@ -60,6 +60,7 @@ type MarginBorrowRepayService interface { } type MarginInterest struct { + GID uint64 `json:"gid" db:"gid"` Exchange ExchangeName `json:"exchange" db:"exchange"` Asset string `json:"asset" db:"asset"` Principle fixedpoint.Value `json:"principle" db:"principle"` @@ -70,6 +71,7 @@ type MarginInterest struct { } type MarginLoan struct { + GID uint64 `json:"gid" db:"gid"` Exchange ExchangeName `json:"exchange" db:"exchange"` TransactionID uint64 `json:"transactionID" db:"transaction_id"` Asset string `json:"asset" db:"asset"` @@ -79,6 +81,7 @@ type MarginLoan struct { } type MarginRepay struct { + GID uint64 `json:"gid" db:"gid"` Exchange ExchangeName `json:"exchange" db:"exchange"` TransactionID uint64 `json:"transactionID" db:"transaction_id"` Asset string `json:"asset" db:"asset"` @@ -88,6 +91,7 @@ type MarginRepay struct { } type MarginLiquidation struct { + GID uint64 `json:"gid" db:"gid"` Exchange ExchangeName `json:"exchange" db:"exchange"` AveragePrice fixedpoint.Value `json:"averagePrice" db:"average_price"` ExecutedQuantity fixedpoint.Value `json:"executedQuantity" db:"executed_quantity"` From f4e7f4f6f6a27c0e3ca539d821de56572cecd346 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:51:45 +0800 Subject: [PATCH 03/16] add margin history entry in config --- pkg/bbgo/config.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 15ad56708..e79fd87e8 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -215,15 +215,20 @@ type SyncConfig struct { // Symbols is the list of symbol to sync, if ignored, symbols wlll be discovered by your existing crypto balances Symbols []string `json:"symbols,omitempty" yaml:"symbols,omitempty"` - // DepositHistory for syncing deposit history + // DepositHistory is for syncing deposit history DepositHistory bool `json:"depositHistory" yaml:"depositHistory"` - // WithdrawHistory for syncing withdraw history + // WithdrawHistory is for syncing withdraw history WithdrawHistory bool `json:"withdrawHistory" yaml:"withdrawHistory"` - // RewardHistory for syncing reward history + // RewardHistory is for syncing reward history RewardHistory bool `json:"rewardHistory" yaml:"rewardHistory"` + // MarginHistory is for syncing margin related history: loans, repays, interests and liquidations + MarginHistory bool `json:"marginHistory" yaml:"marginHistory"` + + MarginAssets []string `json:"marginAssets" yaml:"marginAssets"` + // Since is the date where you want to start syncing data Since *types.LooseFormatTime `json:"since,omitempty"` From cf19ed6f26deeea0c821a19473ac24acc554abfb Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:52:00 +0800 Subject: [PATCH 04/16] refactor environment sync method --- pkg/bbgo/environment.go | 89 ++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 33 deletions(-) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index ebcfa0fe1..ff582dc77 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -81,8 +81,11 @@ type Environment struct { PositionService *service.PositionService BacktestService *service.BacktestService RewardService *service.RewardService + MarginService *service.MarginService SyncService *service.SyncService AccountService *service.AccountService + WithdrawService *service.WithdrawService + DepositService *service.DepositService // startTime is the time of start point (which is used in the backtest) startTime time.Time @@ -176,11 +179,14 @@ func (environ *Environment) ConfigureDatabaseDriver(ctx context.Context, driver environ.AccountService = &service.AccountService{DB: db} environ.ProfitService = &service.ProfitService{DB: db} environ.PositionService = &service.PositionService{DB: db} - + environ.MarginService = &service.MarginService{DB: db} + environ.WithdrawService = &service.WithdrawService{DB: db} + environ.DepositService = &service.DepositService{DB: db} environ.SyncService = &service.SyncService{ TradeService: environ.TradeService, OrderService: environ.OrderService, RewardService: environ.RewardService, + MarginService: environ.MarginService, WithdrawService: &service.WithdrawService{DB: db}, DepositService: &service.DepositService{DB: db}, } @@ -573,12 +579,60 @@ func (environ *Environment) setSyncing(status SyncStatus) { environ.syncStatusMutex.Unlock() } +func (environ *Environment) syncWithUserConfig(ctx context.Context, userConfig *Config) error { + syncSymbols := userConfig.Sync.Symbols + sessions := environ.sessions + selectedSessions := userConfig.Sync.Sessions + if len(selectedSessions) > 0 { + sessions = environ.SelectSessions(selectedSessions...) + } + + for _, session := range sessions { + if err := environ.syncSession(ctx, session, syncSymbols...); err != nil { + return err + } + + if userConfig.Sync.DepositHistory { + if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil { + return err + } + } + + if userConfig.Sync.WithdrawHistory { + if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { + return err + } + } + + if userConfig.Sync.RewardHistory { + if err := environ.SyncService.SyncRewardHistory(ctx, session.Exchange); err != nil { + return err + } + } + + if userConfig.Sync.MarginHistory { + if err := environ.SyncService.SyncMarginHistory(ctx, session.Exchange, + userConfig.Sync.Since.Time(), + userConfig.Sync.MarginAssets...); err != nil { + return err + } + } + } + + return nil +} + // Sync syncs all registered exchange sessions func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) error { if environ.SyncService == nil { return nil } + // for paper trade mode, skip sync + if util.IsPaperTrade() { + return nil + } + environ.syncMutex.Lock() defer environ.syncMutex.Unlock() @@ -587,38 +641,7 @@ func (environ *Environment) Sync(ctx context.Context, userConfig ...*Config) err // sync by the defined user config if len(userConfig) > 0 && userConfig[0] != nil && userConfig[0].Sync != nil { - syncSymbols := userConfig[0].Sync.Symbols - sessions := environ.sessions - selectedSessions := userConfig[0].Sync.Sessions - if len(selectedSessions) > 0 { - sessions = environ.SelectSessions(selectedSessions...) - } - - for _, session := range sessions { - if err := environ.syncSession(ctx, session, syncSymbols...); err != nil { - return err - } - - if userConfig[0].Sync.DepositHistory { - if err := environ.SyncService.SyncDepositHistory(ctx, session.Exchange); err != nil { - return err - } - } - - if userConfig[0].Sync.WithdrawHistory { - if err := environ.SyncService.SyncWithdrawHistory(ctx, session.Exchange); err != nil { - return err - } - } - - if userConfig[0].Sync.RewardHistory { - if err := environ.SyncService.SyncRewardHistory(ctx, session.Exchange); err != nil { - return err - } - } - } - - return nil + return environ.syncWithUserConfig(ctx, userConfig[0]) } // the default sync logics From 63ad635f62e265c92b49a2d5766031757cf9567c Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:52:11 +0800 Subject: [PATCH 05/16] cmd: rewrite sync command --- pkg/cmd/sync.go | 36 +++++++++--------------------------- pkg/util/paper_trade.go | 6 ++++++ 2 files changed, 15 insertions(+), 27 deletions(-) create mode 100644 pkg/util/paper_trade.go diff --git a/pkg/cmd/sync.go b/pkg/cmd/sync.go index aa56cbb4f..2aa7ed331 100644 --- a/pkg/cmd/sync.go +++ b/pkg/cmd/sync.go @@ -6,7 +6,6 @@ import ( "time" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/c9s/bbgo/pkg/bbgo" @@ -88,35 +87,18 @@ var SyncCmd = &cobra.Command{ environ.SetSyncStartTime(syncStartTime) - // syncSymbols is the symbol list to sync - var syncSymbols []string - - if userConfig.Sync != nil && len(userConfig.Sync.Symbols) > 0 { - syncSymbols = userConfig.Sync.Symbols - } - if len(symbol) > 0 { - syncSymbols = []string{symbol} - } - - var selectedSessions []string - - if userConfig.Sync != nil && len(userConfig.Sync.Sessions) > 0 { - selectedSessions = userConfig.Sync.Sessions - } - if len(sessionName) > 0 { - selectedSessions = []string{sessionName} - } - - sessions := environ.SelectSessions(selectedSessions...) - for _, session := range sessions { - if err := environ.SyncSession(ctx, session, syncSymbols...); err != nil { - return err + if userConfig.Sync != nil && len(userConfig.Sync.Symbols) > 0 { + userConfig.Sync.Symbols = []string{symbol} } - - log.Infof("exchange session %s synchronization done", session.Name) } - return nil + if len(sessionName) > 0 { + if userConfig.Sync != nil && len(userConfig.Sync.Sessions) > 0 { + userConfig.Sync.Sessions = []string{sessionName} + } + } + + return environ.Sync(ctx, userConfig) }, } diff --git a/pkg/util/paper_trade.go b/pkg/util/paper_trade.go new file mode 100644 index 000000000..b3a09d68b --- /dev/null +++ b/pkg/util/paper_trade.go @@ -0,0 +1,6 @@ +package util + +func IsPaperTrade() bool { + v, ok := GetEnvVarBool("PAPER_TRADE") + return ok && v +} From 5eaa4706f08fdd05841349fe47ec01254b52d92e Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:52:27 +0800 Subject: [PATCH 06/16] binance: set exchange field for margin records --- pkg/exchange/binance/convert_margin.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/exchange/binance/convert_margin.go b/pkg/exchange/binance/convert_margin.go index d74494e9d..e04bad07e 100644 --- a/pkg/exchange/binance/convert_margin.go +++ b/pkg/exchange/binance/convert_margin.go @@ -10,6 +10,7 @@ import ( func toGlobalLoan(record binanceapi.MarginLoanRecord) types.MarginLoan { return types.MarginLoan{ + Exchange: types.ExchangeBinance, TransactionID: uint64(record.TxId), Asset: record.Asset, Principle: record.Principal, @@ -20,6 +21,7 @@ func toGlobalLoan(record binanceapi.MarginLoanRecord) types.MarginLoan { func toGlobalRepay(record binanceapi.MarginRepayRecord) types.MarginRepay { return types.MarginRepay{ + Exchange: types.ExchangeBinance, TransactionID: record.TxId, Asset: record.Asset, Principle: record.Principal, @@ -30,6 +32,7 @@ func toGlobalRepay(record binanceapi.MarginRepayRecord) types.MarginRepay { func toGlobalInterest(record binanceapi.MarginInterest) types.MarginInterest { return types.MarginInterest{ + Exchange: types.ExchangeBinance, Asset: record.Asset, Principle: record.Principal, Interest: record.Interest, @@ -41,6 +44,7 @@ func toGlobalInterest(record binanceapi.MarginInterest) types.MarginInterest { func toGlobalLiquidation(record binanceapi.MarginLiquidationRecord) types.MarginLiquidation { return types.MarginLiquidation{ + Exchange: types.ExchangeBinance, AveragePrice: record.AveragePrice, ExecutedQuantity: record.ExecutedQuantity, OrderID: record.OrderId, From 5a4a2db66f991b7d11bc040cdd61b5e229598f4a Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:52:45 +0800 Subject: [PATCH 07/16] service: add time function --- pkg/service/margin.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/pkg/service/margin.go b/pkg/service/margin.go index 536a7b37d..302699fc3 100644 --- a/pkg/service/margin.go +++ b/pkg/service/margin.go @@ -2,7 +2,6 @@ package service import ( "context" - "fmt" "strconv" "time" @@ -20,17 +19,17 @@ type MarginService struct { func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset string, startTime time.Time) error { api, ok := ex.(types.MarginHistory) if !ok { - return ErrNotImplemented + return nil } marginExchange, ok := ex.(types.MarginExchange) if !ok { - return fmt.Errorf("%T does not implement margin service", ex) + return nil } marginSettings := marginExchange.GetMarginSettings() if !marginSettings.IsMargin { - return fmt.Errorf("exchange instance %s is not using margin", ex.Name()) + return nil } tasks := []SyncTask{ @@ -43,6 +42,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin } return query.Query(ctx, asset, startTime, endTime) }, + Time: func(obj interface{}) time.Time { + return obj.(types.MarginLoan).Time.Time() + }, ID: func(obj interface{}) string { return strconv.FormatUint(obj.(types.MarginLoan).TransactionID, 10) }, @@ -56,6 +58,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin } return query.Query(ctx, asset, startTime, endTime) }, + Time: func(obj interface{}) time.Time { + return obj.(types.MarginRepay).Time.Time() + }, ID: func(obj interface{}) string { return strconv.FormatUint(obj.(types.MarginRepay).TransactionID, 10) }, @@ -69,6 +74,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin } return query.Query(ctx, asset, startTime, endTime) }, + Time: func(obj interface{}) time.Time { + return obj.(types.MarginInterest).Time.Time() + }, ID: func(obj interface{}) string { m := obj.(types.MarginInterest) return m.Asset + m.IsolatedSymbol + strconv.FormatInt(m.Time.UnixMilli(), 10) @@ -83,6 +91,9 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin } return query.Query(ctx, startTime, endTime) }, + Time: func(obj interface{}) time.Time { + return obj.(types.MarginLiquidation).UpdatedTime.Time() + }, ID: func(obj interface{}) string { m := obj.(types.MarginLiquidation) return strconv.FormatUint(m.OrderID, 10) @@ -130,4 +141,3 @@ func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.Select OrderBy("time"). Limit(limit) } - From 118dc07e10c6ac4980dd81f71d280f39749747a7 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:53:08 +0800 Subject: [PATCH 08/16] service: fix reflect rows scan --- pkg/service/reflect.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/service/reflect.go b/pkg/service/reflect.go index a20aebb7b..86d46ca91 100644 --- a/pkg/service/reflect.go +++ b/pkg/service/reflect.go @@ -165,15 +165,16 @@ func scanRowsOfType(rows *sqlx.Rows, tpe interface{}) (interface{}, error) { refType = refType.Elem() } - sliceRef := reflect.New(reflect.SliceOf(refType)) + sliceRef := reflect.MakeSlice(reflect.SliceOf(refType), 0, 100) + // sliceRef := reflect.New(reflect.SliceOf(refType)) for rows.Next() { var recordRef = reflect.New(refType) var record = recordRef.Interface() - if err := rows.StructScan(&record); err != nil { + if err := rows.StructScan(record); err != nil { return sliceRef.Interface(), err } - sliceRef = reflect.Append(sliceRef, recordRef) + sliceRef = reflect.Append(sliceRef, recordRef.Elem()) } return sliceRef.Interface(), rows.Err() @@ -200,4 +201,3 @@ func selectAndScanType(ctx context.Context, db *sqlx.DB, sel squirrel.SelectBuil defer rows.Close() return scanRowsOfType(rows, tpe) } - From 484fc62892226b2ed920a9fd322d6c4ae3e949c2 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:53:37 +0800 Subject: [PATCH 09/16] batch: set jump if empty field --- pkg/exchange/batch/margin_loan.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/exchange/batch/margin_loan.go b/pkg/exchange/batch/margin_loan.go index 2df49cb8b..a32c7ea15 100644 --- a/pkg/exchange/batch/margin_loan.go +++ b/pkg/exchange/batch/margin_loan.go @@ -16,8 +16,9 @@ type MarginLoanBatchQuery struct { func (e *MarginLoanBatchQuery) Query(ctx context.Context, asset string, startTime, endTime time.Time) (c chan types.MarginLoan, errC chan error) { query := &AsyncTimeRangedBatchQuery{ - Type: types.MarginLoan{}, - Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Type: types.MarginLoan{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + JumpIfEmpty: time.Hour * 24 * 30, Q: func(startTime, endTime time.Time) (interface{}, error) { return e.QueryLoanHistory(ctx, asset, &startTime, &endTime) }, From 5bb98734fba43f087348764b91943ac33281e2cf Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:54:27 +0800 Subject: [PATCH 10/16] batch: set jump if empty field --- pkg/exchange/batch/margin_interest.go | 1 + pkg/exchange/batch/margin_liquidation.go | 1 + pkg/exchange/batch/margin_repay.go | 1 + 3 files changed, 3 insertions(+) diff --git a/pkg/exchange/batch/margin_interest.go b/pkg/exchange/batch/margin_interest.go index 4332db1b3..5ffe823b5 100644 --- a/pkg/exchange/batch/margin_interest.go +++ b/pkg/exchange/batch/margin_interest.go @@ -17,6 +17,7 @@ func (e *MarginInterestBatchQuery) Query(ctx context.Context, asset string, star query := &AsyncTimeRangedBatchQuery{ Type: types.MarginInterest{}, Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + JumpIfEmpty: time.Hour * 24 * 30, Q: func(startTime, endTime time.Time) (interface{}, error) { return e.QueryInterestHistory(ctx, asset, &startTime, &endTime) }, diff --git a/pkg/exchange/batch/margin_liquidation.go b/pkg/exchange/batch/margin_liquidation.go index babd98b82..1f4425382 100644 --- a/pkg/exchange/batch/margin_liquidation.go +++ b/pkg/exchange/batch/margin_liquidation.go @@ -18,6 +18,7 @@ func (e *MarginLiquidationBatchQuery) Query(ctx context.Context, startTime, endT query := &AsyncTimeRangedBatchQuery{ Type: types.MarginLiquidation{}, Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + JumpIfEmpty: time.Hour * 24 * 30, Q: func(startTime, endTime time.Time) (interface{}, error) { return e.QueryLiquidationHistory(ctx, &startTime, &endTime) }, diff --git a/pkg/exchange/batch/margin_repay.go b/pkg/exchange/batch/margin_repay.go index 60e2bdc69..0d5c29e25 100644 --- a/pkg/exchange/batch/margin_repay.go +++ b/pkg/exchange/batch/margin_repay.go @@ -18,6 +18,7 @@ func (e *MarginRepayBatchQuery) Query(ctx context.Context, asset string, startTi query := &AsyncTimeRangedBatchQuery{ Type: types.MarginRepay{}, Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + JumpIfEmpty: time.Hour * 24 * 30, Q: func(startTime, endTime time.Time) (interface{}, error) { return e.QueryRepayHistory(ctx, asset, &startTime, &endTime) }, From 1a85e629934234d53b1c4ff4dd53c7ebd24e366e Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 15:54:50 +0800 Subject: [PATCH 11/16] service: integrate margin service into the sync service --- pkg/service/sync.go | 41 ++++++++++++++++++++++++++++++----------- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/pkg/service/sync.go b/pkg/service/sync.go index 5db3b549b..c0d30443b 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -10,7 +10,6 @@ import ( log "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" ) var ErrNotImplemented = errors.New("not implemented") @@ -22,11 +21,7 @@ type SyncService struct { RewardService *RewardService WithdrawService *WithdrawService DepositService *DepositService -} - -func paperTrade() bool { - v, ok := util.GetEnvVarBool("PAPER_TRADE") - return ok && v + MarginService *MarginService } // SyncSessionSymbols syncs the trades from the given exchange session @@ -50,20 +45,44 @@ func (s *SyncService) SyncSessionSymbols(ctx context.Context, exchange types.Exc } } - if paperTrade() { + return nil +} + +func (s *SyncService) SyncMarginHistory(ctx context.Context, exchange types.Exchange, startTime time.Time, assets ...string) error { + if _, implemented := exchange.(types.MarginHistory); !implemented { + log.Debugf("exchange %T does not support types.MarginHistory", exchange) return nil } + if marginExchange, implemented := exchange.(types.MarginExchange); !implemented { + log.Debugf("exchange %T does not implement types.MarginExchange", exchange) + return nil + } else { + marginSettings := marginExchange.GetMarginSettings() + if !marginSettings.IsMargin { + log.Debugf("exchange %T is not using margin", exchange) + return nil + } + } + + log.Infof("syncing %s margin history: %v...", exchange.Name(), assets) + for _, asset := range assets { + if err := s.MarginService.Sync(ctx, exchange, asset, startTime); err != nil { + return err + } + } + return nil } func (s *SyncService) SyncRewardHistory(ctx context.Context, exchange types.Exchange) error { + if _, implemented := exchange.(types.ExchangeRewardService); !implemented { + return nil + } + log.Infof("syncing %s reward records...", exchange.Name()) if err := s.RewardService.Sync(ctx, exchange); err != nil { - if err != ErrExchangeRewardServiceNotImplemented { - log.Warnf("%s reward service is not supported", exchange.Name()) - return err - } + return err } return nil From 991d13cb322166742cc156623fa3f1f5d5638469 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 18:29:18 +0800 Subject: [PATCH 12/16] cmd/sync: support multiple session names --- pkg/cmd/sync.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/cmd/sync.go b/pkg/cmd/sync.go index 2aa7ed331..5517d19dc 100644 --- a/pkg/cmd/sync.go +++ b/pkg/cmd/sync.go @@ -12,7 +12,7 @@ import ( ) func init() { - SyncCmd.Flags().String("session", "", "the exchange session name for sync") + SyncCmd.Flags().StringArray("session", []string{}, "the exchange session name for sync") SyncCmd.Flags().String("symbol", "", "symbol of market for syncing") SyncCmd.Flags().String("since", "", "sync from time") RootCmd.AddCommand(SyncCmd) @@ -57,7 +57,7 @@ var SyncCmd = &cobra.Command{ return err } - sessionName, err := cmd.Flags().GetString("session") + sessionNames, err := cmd.Flags().GetStringArray("session") if err != nil { return err } @@ -93,9 +93,9 @@ var SyncCmd = &cobra.Command{ } } - if len(sessionName) > 0 { + if len(sessionNames) > 0 { if userConfig.Sync != nil && len(userConfig.Sync.Sessions) > 0 { - userConfig.Sync.Sessions = []string{sessionName} + userConfig.Sync.Sessions = sessionNames } } From dfe29e07e7224a11993690d80482938c91473dac Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 18:29:34 +0800 Subject: [PATCH 13/16] service/margin: fix query ordering --- pkg/service/margin.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/service/margin.go b/pkg/service/margin.go index 302699fc3..de45c9cda 100644 --- a/pkg/service/margin.go +++ b/pkg/service/margin.go @@ -114,7 +114,7 @@ func SelectLastMarginLoans(ex types.ExchangeName, limit uint64) sq.SelectBuilder return sq.Select("*"). From("margin_loans"). Where(sq.Eq{"exchange": ex}). - OrderBy("time"). + OrderBy("time DESC"). Limit(limit) } @@ -122,7 +122,7 @@ func SelectLastMarginRepays(ex types.ExchangeName, limit uint64) sq.SelectBuilde return sq.Select("*"). From("margin_repays"). Where(sq.Eq{"exchange": ex}). - OrderBy("time"). + OrderBy("time DESC"). Limit(limit) } @@ -130,7 +130,7 @@ func SelectLastMarginInterests(ex types.ExchangeName, limit uint64) sq.SelectBui return sq.Select("*"). From("margin_interests"). Where(sq.Eq{"exchange": ex}). - OrderBy("time"). + OrderBy("time DESC"). Limit(limit) } @@ -138,6 +138,6 @@ func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.Select return sq.Select("*"). From("margin_liquidations"). Where(sq.Eq{"exchange": ex}). - OrderBy("time"). + OrderBy("time DESC"). Limit(limit) } From fb633467327d147fae6afff4cc23f314ad7d42b6 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 18:29:54 +0800 Subject: [PATCH 14/16] service/reflect: add more debug logs --- pkg/service/reflect.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/service/reflect.go b/pkg/service/reflect.go index 86d46ca91..89f548ed6 100644 --- a/pkg/service/reflect.go +++ b/pkg/service/reflect.go @@ -9,6 +9,7 @@ import ( "github.com/fatih/camelcase" gopluralize "github.com/gertd/go-pluralize" "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" ) var pluralize = gopluralize.NewClient() @@ -188,11 +189,13 @@ func insertType(db *sqlx.DB, record interface{}) error { func selectAndScanType(ctx context.Context, db *sqlx.DB, sel squirrel.SelectBuilder, tpe interface{}) (interface{}, error) { sql, args, err := sel.ToSql() - if err != nil { return nil, err } + logrus.Debugf("selectAndScanType: %T <- %s", tpe, sql) + logrus.Debugf("queryArgs: %v", args) + rows, err := db.QueryxContext(ctx, sql, args...) if err != nil { return nil, err From 415450acb7906f7eb3b85b14293ca379be1751a4 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 18:30:10 +0800 Subject: [PATCH 15/16] service/sync: add onLoad event support --- pkg/service/sync_task.go | 39 ++++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/pkg/service/sync_task.go b/pkg/service/sync_task.go index 19d43a992..94a2bbebb 100644 --- a/pkg/service/sync_task.go +++ b/pkg/service/sync_task.go @@ -8,24 +8,33 @@ import ( "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" + "github.com/pkg/errors" "github.com/sirupsen/logrus" ) // SyncTask defines the behaviors for syncing remote records type SyncTask struct { - Select squirrel.SelectBuilder - Type interface{} - BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) + // Type is the element type of this sync task + // Since it will create a []Type slice from this type, you should not set pointer to this field + Type interface{} + + // Select is the select query builder for querying db records + Select squirrel.SelectBuilder + + // OnLoad is called when the records are loaded from the database + OnLoad func(objs interface{}) // ID is a function that returns the unique identity of the object ID func(obj interface{}) string // Time is a function that returns the time of the object Time func(obj interface{}) time.Time + + // BatchQuery is used for querying remote records. + BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) } func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error { - // query from db recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type) if err != nil { @@ -37,10 +46,17 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim recordSliceRef = recordSliceRef.Elem() } - logrus.Debugf("loaded %d records", recordSliceRef.Len()) + logrus.Debugf("loaded %d %T records", recordSliceRef.Len(), sel.Type) ids := buildIdMap(sel, recordSliceRef) - sortRecords(sel, recordSliceRef) + + if err := sortRecords(sel, recordSliceRef); err != nil { + return err + } + + if sel.OnLoad != nil { + sel.OnLoad(recordSliceRef.Interface()) + } // default since time point since := lastRecordTime(sel, recordSliceRef, startTime) @@ -70,7 +86,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim continue } - logrus.Debugf("inserting %T: %+v", obj, obj) + logrus.Infof("inserting %T: %+v", obj, obj) if err := insertType(db, obj); err != nil { return err } @@ -82,19 +98,24 @@ func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Ti since := defaultTime length := recordSlice.Len() if length > 0 { - since = sel.Time(recordSlice.Index(length - 1)) + since = sel.Time(recordSlice.Index(length - 1).Interface()) } return since } -func sortRecords(sel SyncTask, recordSlice reflect.Value) { +func sortRecords(sel SyncTask, recordSlice reflect.Value) error { + if sel.Time == nil { + return errors.New("time field is not set, can not sort records") + } + // always sort sort.Slice(recordSlice.Interface(), func(i, j int) bool { a := sel.Time(recordSlice.Index(i).Interface()) b := sel.Time(recordSlice.Index(j).Interface()) return a.Before(b) }) + return nil } func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} { From b070952b32f54bd8a363196d3fa68c58bc8ffe84 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 1 Jun 2022 18:30:24 +0800 Subject: [PATCH 16/16] service/sync: rewrite trade sync with syncTask --- config/sync.yaml | 23 ++++- pkg/service/order.go | 25 +----- pkg/service/trade.go | 203 +++++++++++++++++++------------------------ 3 files changed, 112 insertions(+), 139 deletions(-) diff --git a/config/sync.yaml b/config/sync.yaml index ec31dd986..99fede4e9 100644 --- a/config/sync.yaml +++ b/config/sync.yaml @@ -4,6 +4,13 @@ sessions: exchange: binance envVarPrefix: binance + binance_margin_dotusdt: + exchange: binance + envVarPrefix: binance + margin: true + isolatedMargin: true + isolatedMarginSymbol: DOTUSDT + max: exchange: max envVarPrefix: max @@ -16,12 +23,13 @@ sync: filledOrders: true # since is the start date of your trading data - since: 2019-11-01 + since: 2022-01-01 # sessions is the list of session names you want to sync # by default, BBGO sync all your available sessions. sessions: - binance + - binance_margin_dotusdt - max # symbols is the list of symbols you want to sync @@ -29,8 +37,15 @@ sync: symbols: - BTCUSDT - ETHUSDT - - LINKUSDT - depositHistory: true + # marginHistory enables the margin history sync + marginHistory: true + + # marginAssets lists the assets that are used in the margin. + # including loan, repay, interest and liquidation + marginAssets: + - USDT + + # depositHistory: true rewardHistory: true - withdrawHistory: true + # withdrawHistory: true diff --git a/pkg/service/order.go b/pkg/service/order.go index 471af49d0..b4b7dccc7 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -19,26 +19,10 @@ type OrderService struct { } func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - isMargin := false - isFutures := false - isIsolated := false - - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } - } - - if futuresExchange, ok := exchange.(types.FuturesExchange); ok { - futuresSettings := futuresExchange.GetFuturesSettings() - isFutures = futuresSettings.IsFutures - isIsolated = futuresSettings.IsIsolatedFutures - if futuresSettings.IsIsolatedFutures { - symbol = futuresSettings.IsolatedFuturesSymbol - } + isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange) + // override symbol if isolatedSymbol is not empty + if isIsolated && len(isolatedSymbol) > 0 { + symbol = isolatedSymbol } records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50) @@ -99,7 +83,6 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol return <-errC } - // QueryLast queries the last order from the database func (s *OrderService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Order, error) { log.Infof("querying last order exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated) diff --git a/pkg/service/trade.go b/pkg/service/trade.go index d6cc51fcb..48c07410d 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -7,6 +7,7 @@ import ( "strings" "time" + sq "github.com/Masterminds/squirrel" "github.com/jmoiron/sqlx" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -51,86 +52,58 @@ func NewTradeService(db *sqlx.DB) *TradeService { } func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol string, startTime time.Time) error { - isMargin := false - isFutures := false - isIsolated := false - - if marginExchange, ok := exchange.(types.MarginExchange); ok { - marginSettings := marginExchange.GetMarginSettings() - isMargin = marginSettings.IsMargin - isIsolated = marginSettings.IsIsolatedMargin - if marginSettings.IsIsolatedMargin { - symbol = marginSettings.IsolatedMarginSymbol - } + isMargin, isFutures, isIsolated, isolatedSymbol := getExchangeAttributes(exchange) + // override symbol if isolatedSymbol is not empty + if isIsolated && len(isolatedSymbol) > 0 { + symbol = isolatedSymbol } - if futuresExchange, ok := exchange.(types.FuturesExchange); ok { - futuresSettings := futuresExchange.GetFuturesSettings() - isFutures = futuresSettings.IsFutures - isIsolated = futuresSettings.IsIsolatedFutures - if futuresSettings.IsIsolatedFutures { - symbol = futuresSettings.IsolatedFuturesSymbol - } - } - - // buffer 50 trades and use the trades ID to scan if the new trades are duplicated - records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100) - if err != nil { - return err - } - - var tradeKeys = map[types.TradeKey]struct{}{} - - // for exchange supports trade id query, we should always try to query from the first trade. - // 0 means disable. - var lastTradeID uint64 = 1 - var now = time.Now() - if len(records) > 0 { - for _, record := range records { - tradeKeys[record.Key()] = struct{}{} - } - - end := len(records) - 1 - last := records[end] - lastTradeID = last.ID - startTime = last.Time.Time() - } - - exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService) + api, ok := exchange.(types.ExchangeTradeHistoryService) if !ok { return nil } - b := &batch.TradeBatchQuery{ - ExchangeTradeHistoryService: exchangeTradeHistoryService, + lastTradeID := uint64(1) + tasks := []SyncTask{ + { + Type: types.Trade{}, + Select: SelectLastTrades(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100), + OnLoad: func(objs interface{}) { + // update last trade ID + trades := objs.([]types.Trade) + if len(trades) > 0 { + end := len(trades) - 1 + last := trades[end] + lastTradeID = last.ID + } + }, + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.TradeBatchQuery{ + ExchangeTradeHistoryService: api, + } + return query.Query(ctx, symbol, &types.TradeQueryOptions{ + StartTime: &startTime, + EndTime: &endTime, + LastTradeID: lastTradeID, + }) + }, + Time: func(obj interface{}) time.Time { + return obj.(types.Trade).Time.Time() + }, + ID: func(obj interface{}) string { + trade := obj.(types.Trade) + return strconv.FormatUint(trade.ID, 10) + trade.Side.String() + }, + }, } - tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ - LastTradeID: lastTradeID, - StartTime: &startTime, - EndTime: &now, - }) - - for trade := range tradeC { - select { - case <-ctx.Done(): - return ctx.Err() - - case err := <-errC: - if err != nil { - return err - } - - default: + for _, sel := range tasks { + if err := sel.execute(ctx, s.DB, startTime); err != nil { + return err } + } - key := trade.Key() - if _, exists := tradeKeys[key]; exists { - continue - } - - tradeKeys[key] = struct{}{} - + /* log.Infof("inserting trade: %s %d %s %-4s price: %-13v volume: %-11v %5s %s", trade.Exchange, trade.ID, @@ -140,13 +113,8 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol trade.Quantity, trade.Liquidity(), trade.Time.String()) - - if err := s.Insert(trade); err != nil { - return err - } - } - - return <-errC + */ + return nil } func (s *TradeService) QueryTradingVolume(startTime time.Time, options TradingVolumeQueryOptions) ([]TradingVolume, error) { @@ -472,43 +440,8 @@ func (s *TradeService) scanRows(rows *sqlx.Rows) (trades []types.Trade, err erro } func (s *TradeService) Insert(trade types.Trade) error { - _, err := s.DB.NamedExec(` - INSERT INTO trades ( - id, - exchange, - order_id, - symbol, - price, - quantity, - quote_quantity, - side, - is_buyer, - is_maker, - fee, - fee_currency, - traded_at, - is_margin, - is_futures, - is_isolated) - VALUES ( - :id, - :exchange, - :order_id, - :symbol, - :price, - :quantity, - :quote_quantity, - :side, - :is_buyer, - :is_maker, - :fee, - :fee_currency, - :traded_at, - :is_margin, - :is_futures, - :is_isolated - )`, - trade) + sql := dbCache.InsertSqlOf(trade) + _, err := s.DB.NamedExec(sql, trade) return err } @@ -516,3 +449,45 @@ func (s *TradeService) DeleteAll() error { _, err := s.DB.Exec(`DELETE FROM trades`) return err } + +func SelectLastTrades(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("trades"). + Where(sq.And{ + sq.Eq{"symbol": symbol}, + sq.Eq{"exchange": ex}, + sq.Eq{"is_margin": isMargin}, + sq.Eq{"is_futures": isFutures}, + sq.Eq{"is_isolated": isIsolated}, + }). + OrderBy("traded_at DESC"). + Limit(limit) +} + + + +func getExchangeAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) { + if marginExchange, ok := exchange.(types.MarginExchange); ok { + marginSettings := marginExchange.GetMarginSettings() + isMargin = marginSettings.IsMargin + if isMargin { + isIsolated = marginSettings.IsIsolatedMargin + if marginSettings.IsIsolatedMargin { + isolatedSymbol = marginSettings.IsolatedMarginSymbol + } + } + } + + if futuresExchange, ok := exchange.(types.FuturesExchange); ok { + futuresSettings := futuresExchange.GetFuturesSettings() + isFutures = futuresSettings.IsFutures + if isFutures { + isIsolated = futuresSettings.IsIsolatedFutures + if futuresSettings.IsIsolatedFutures { + isolatedSymbol = futuresSettings.IsolatedFuturesSymbol + } + } + } + + return isMargin, isFutures, isIsolated, isolatedSymbol +}