From bf92e28461de2b4c039c6c03e49e312a2871b51e Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 31 May 2022 17:08:52 +0800 Subject: [PATCH] service: implement margin service for syncing margin related data --- go.mod | 3 + go.sum | 6 + .../mysql/20220531013542_margin_interests.sql | 2 - .../20220531015005_margin_liquidations.sql | 2 +- .../20220531013541_margin_interests.sql | 2 - .../20220531015005_margin_liquidations.sql | 14 +- pkg/exchange/batch/margin_liquidation.go | 36 +++ pkg/exchange/batch/margin_loan.go | 2 +- .../binance/binanceapi/client_test.go | 25 +- pkg/service/margin.go | 260 ++++++++++++++++++ pkg/service/margin_test.go | 47 ++++ pkg/service/reflect_test.go | 2 +- pkg/testutil/auth.go | 25 ++ pkg/types/margin.go | 2 +- 14 files changed, 392 insertions(+), 36 deletions(-) create mode 100644 pkg/exchange/batch/margin_liquidation.go create mode 100644 pkg/service/margin.go create mode 100644 pkg/service/margin_test.go create mode 100644 pkg/testutil/auth.go diff --git a/go.mod b/go.mod index 6bfba370c..c2de845d6 100644 --- a/go.mod +++ b/go.mod @@ -52,6 +52,7 @@ require ( ) require ( + github.com/Masterminds/squirrel v1.5.3 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bitly/go-simplejson v0.5.0 // indirect github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect @@ -75,6 +76,8 @@ require ( github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 // indirect github.com/json-iterator/go v1.1.11 // indirect + github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect + github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lestrrat-go/strftime v1.0.0 // indirect github.com/lib/pq v1.10.5 // indirect diff --git a/go.sum b/go.sum index 6d3a17450..bd8f396bd 100644 --- a/go.sum +++ b/go.sum @@ -39,6 +39,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/DATA-DOG/go-sqlmock v1.5.0 h1:Shsta01QNfFxHCfpW6YH2STWB0MudeXXEWMr20OEh60= github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= +github.com/Masterminds/squirrel v1.5.3 h1:YPpoceAcxuzIljlr5iWpNKaql7hLeG1KLSrhvdHpkZc= +github.com/Masterminds/squirrel v1.5.3/go.mod h1:NNaOrjSoIDfDA40n7sr2tPNZRfjzjA400rg+riTZj10= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/adshao/go-binance/v2 v2.3.5 h1:WVYZecm0w8l14YoWlnKZj6xxZT2AKMTHpMQSqIX1xxA= github.com/adshao/go-binance/v2 v2.3.5/go.mod h1:8Pg/FGTLyAhq8QXA0IkoReKyRpoxJcK3LVujKDAZV/c= @@ -303,6 +305,10 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw= +github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk= +github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw= github.com/leekchan/accounting v0.0.0-20191218023648-17a4ce5f94d4 h1:KZzDAtJ7ZLm0zSWVhN/zgyB8Ksx5H+P9irwbTcJ9FwI= github.com/leekchan/accounting v0.0.0-20191218023648-17a4ce5f94d4/go.mod h1:3timm6YPhY3YDaGxl0q3eaflX0eoSx3FXn7ckHe4tO0= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= diff --git a/migrations/mysql/20220531013542_margin_interests.sql b/migrations/mysql/20220531013542_margin_interests.sql index 30e898769..90169526b 100644 --- a/migrations/mysql/20220531013542_margin_interests.sql +++ b/migrations/mysql/20220531013542_margin_interests.sql @@ -3,8 +3,6 @@ CREATE TABLE `margin_interests` ( `gid` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT, - `transaction_id` BIGINT UNSIGNED NOT NULL, - `exchange` VARCHAR(24) NOT NULL DEFAULT '', `asset` VARCHAR(24) NOT NULL DEFAULT '', diff --git a/migrations/mysql/20220531015005_margin_liquidations.sql b/migrations/mysql/20220531015005_margin_liquidations.sql index 86f0675b7..82ea81f1d 100644 --- a/migrations/mysql/20220531015005_margin_liquidations.sql +++ b/migrations/mysql/20220531015005_margin_liquidations.sql @@ -23,7 +23,7 @@ CREATE TABLE `margin_liquidations` `time_in_force` VARCHAR(5) NOT NULL DEFAULT '', - `updated_time` DATETIME(3) NOT NULL, + `time` DATETIME(3) NOT NULL, PRIMARY KEY (`gid`), UNIQUE KEY (`order_id`, `exchange`) diff --git a/migrations/sqlite3/20220531013541_margin_interests.sql b/migrations/sqlite3/20220531013541_margin_interests.sql index b82c033ba..f088f2581 100644 --- a/migrations/sqlite3/20220531013541_margin_interests.sql +++ b/migrations/sqlite3/20220531013541_margin_interests.sql @@ -3,8 +3,6 @@ CREATE TABLE `margin_interests` ( `gid` INTEGER PRIMARY KEY AUTOINCREMENT, - `transaction_id` INTEGER NOT NULL, - `exchange` VARCHAR(24) NOT NULL DEFAULT '', `asset` VARCHAR(24) NOT NULL DEFAULT '', diff --git a/migrations/sqlite3/20220531015005_margin_liquidations.sql b/migrations/sqlite3/20220531015005_margin_liquidations.sql index 9cd4cc116..5a99afc36 100644 --- a/migrations/sqlite3/20220531015005_margin_liquidations.sql +++ b/migrations/sqlite3/20220531015005_margin_liquidations.sql @@ -3,13 +3,13 @@ CREATE TABLE `margin_liquidations` ( `gid` INTEGER PRIMARY KEY AUTOINCREMENT, - `exchange` VARCHAR(24) NOT NULL DEFAULT '', + `exchange` VARCHAR(24) NOT NULL DEFAULT '', - `symbol` VARCHAR(24) NOT NULL DEFAULT '', + `symbol` VARCHAR(24) NOT NULL DEFAULT '', - `order_id` INTEGER NOT NULL, + `order_id` INTEGER NOT NULL, - `is_isolated` BOOL NOT NULL DEFAULT false, + `is_isolated` BOOL NOT NULL DEFAULT false, `average_price` DECIMAL(16, 8) NOT NULL, @@ -19,11 +19,11 @@ CREATE TABLE `margin_liquidations` `executed_quantity` DECIMAL(16, 8) NOT NULL, - `side` VARCHAR(5) NOT NULL DEFAULT '', + `side` VARCHAR(5) NOT NULL DEFAULT '', - `time_in_force` VARCHAR(5) NOT NULL DEFAULT '', + `time_in_force` VARCHAR(5) NOT NULL DEFAULT '', - `updated_time` DATETIME(3) NOT NULL + `time` DATETIME(3) NOT NULL ); -- +down diff --git a/pkg/exchange/batch/margin_liquidation.go b/pkg/exchange/batch/margin_liquidation.go new file mode 100644 index 000000000..babd98b82 --- /dev/null +++ b/pkg/exchange/batch/margin_liquidation.go @@ -0,0 +1,36 @@ +package batch + +import ( + "context" + "strconv" + "time" + + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/types" +) + +type MarginLiquidationBatchQuery struct { + types.MarginHistory +} + +func (e *MarginLiquidationBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.MarginLiquidation, errC chan error) { + query := &AsyncTimeRangedBatchQuery{ + Type: types.MarginLiquidation{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Q: func(startTime, endTime time.Time) (interface{}, error) { + return e.QueryLiquidationHistory(ctx, &startTime, &endTime) + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.MarginLiquidation).UpdatedTime) + }, + ID: func(obj interface{}) string { + liquidation := obj.(types.MarginLiquidation) + return strconv.FormatUint(liquidation.OrderID, 10) + }, + } + + c = make(chan types.MarginLiquidation, 100) + errC = query.Query(ctx, c, startTime, endTime) + return c, errC +} diff --git a/pkg/exchange/batch/margin_loan.go b/pkg/exchange/batch/margin_loan.go index 3cfa5b57b..2df49cb8b 100644 --- a/pkg/exchange/batch/margin_loan.go +++ b/pkg/exchange/batch/margin_loan.go @@ -22,7 +22,7 @@ func (e *MarginLoanBatchQuery) Query(ctx context.Context, asset string, startTim return e.QueryLoanHistory(ctx, asset, &startTime, &endTime) }, T: func(obj interface{}) time.Time { - return time.Time(obj.(types.MarginRepay).Time) + return time.Time(obj.(types.MarginLoan).Time) }, ID: func(obj interface{}) string { loan := obj.(types.MarginLoan) diff --git a/pkg/exchange/binance/binanceapi/client_test.go b/pkg/exchange/binance/binanceapi/client_test.go index 84a6542f8..ac32a3f18 100644 --- a/pkg/exchange/binance/binanceapi/client_test.go +++ b/pkg/exchange/binance/binanceapi/client_test.go @@ -4,32 +4,15 @@ import ( "context" "log" "net/http/httputil" - "os" - "regexp" "testing" "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/testutil" ) -func maskSecret(s string) string { - re := regexp.MustCompile(`\b(\w{4})\w+\b`) - s = re.ReplaceAllString(s, "$1******") - return s -} - -func integrationTestConfigured(t *testing.T, prefix string) (key, secret string, ok bool) { - var hasKey, hasSecret bool - key, hasKey = os.LookupEnv(prefix + "_API_KEY") - secret, hasSecret = os.LookupEnv(prefix + "_API_SECRET") - ok = hasKey && hasSecret && os.Getenv("TEST_"+prefix) == "1" - if ok { - t.Logf(prefix+" api integration test enabled, key = %s, secret = %s", maskSecret(key), maskSecret(secret)) - } - return key, secret, ok -} - func getTestClientOrSkip(t *testing.T) *RestClient { - key, secret, ok := integrationTestConfigured(t, "BINANCE") + key, secret, ok := testutil.IntegrationTestConfigured(t, "BINANCE") if !ok { t.SkipNow() return nil @@ -119,7 +102,7 @@ func TestClient_NewGetMarginInterestRateHistoryRequest(t *testing.T) { } func TestClient_privateCall(t *testing.T) { - key, secret, ok := integrationTestConfigured(t, "BINANCE") + key, secret, ok := testutil.IntegrationTestConfigured(t, "BINANCE") if !ok { t.SkipNow() } diff --git a/pkg/service/margin.go b/pkg/service/margin.go new file mode 100644 index 000000000..f6e42bb65 --- /dev/null +++ b/pkg/service/margin.go @@ -0,0 +1,260 @@ +package service + +import ( + "context" + "fmt" + "reflect" + "sort" + "strconv" + "time" + + sq "github.com/Masterminds/squirrel" + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/types" +) + +// SyncSelect defines the behaviors for syncing remote records +type SyncSelect struct { + Select sq.SelectBuilder + Type interface{} + BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) + + // ID is a function that returns the unique identity of the object + ID func(obj interface{}) string + + // Time is a function that returns the time of the object + Time func(obj interface{}) time.Time +} + +type MarginService struct { + DB *sqlx.DB +} + +func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset string, startTime time.Time) error { + api, ok := ex.(types.MarginHistory) + if !ok { + return ErrNotImplemented + } + + marginExchange, ok := ex.(types.MarginExchange) + if !ok { + return fmt.Errorf("%T does not implement margin service", ex) + } + + marginSettings := marginExchange.GetMarginSettings() + if !marginSettings.IsMargin { + return fmt.Errorf("exchange instance %s is not using margin", ex.Name()) + } + + sels := []SyncSelect{ + { + Select: SelectLastMarginLoans(ex.Name(), 100), + Type: types.MarginLoan{}, + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.MarginLoanBatchQuery{ + MarginHistory: api, + } + return query.Query(ctx, asset, startTime, endTime) + }, + ID: func(obj interface{}) string { + return strconv.FormatUint(obj.(types.MarginLoan).TransactionID, 10) + }, + }, + { + Select: SelectLastMarginRepays(ex.Name(), 100), + Type: types.MarginRepay{}, + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.MarginRepayBatchQuery{ + MarginHistory: api, + } + return query.Query(ctx, asset, startTime, endTime) + }, + ID: func(obj interface{}) string { + return strconv.FormatUint(obj.(types.MarginRepay).TransactionID, 10) + }, + }, + { + Select: SelectLastMarginInterests(ex.Name(), 100), + Type: types.MarginInterest{}, + BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) { + query := &batch.MarginInterestBatchQuery{ + MarginHistory: api, + } + return query.Query(ctx, asset, startTime, endTime) + }, + ID: func(obj interface{}) string { + m := obj.(types.MarginInterest) + return m.Asset + m.IsolatedSymbol + strconv.FormatInt(m.Time.UnixMilli(), 10) + }, + }, + } + +NextQuery: + for _, sel := range sels { + // query from db + recordSlice, err := s.executeDbQuery(ctx, sel.Select, sel.Type) + if err != nil { + return err + } + + recordSliceRef := reflect.ValueOf(recordSlice) + if recordSliceRef.Kind() == reflect.Ptr { + recordSliceRef = recordSliceRef.Elem() + } + + logrus.Debugf("loaded %d records", recordSliceRef.Len()) + + ids := buildIdMap(sel, recordSliceRef) + sortRecords(sel, recordSliceRef) + + // default since time point + since := lastRecordTime(sel, recordSliceRef, startTime) + + // asset "" means all assets + dataC, errC := sel.BatchQuery(ctx, since, time.Now()) + dataCRef := reflect.ValueOf(dataC) + + for { + select { + case <-ctx.Done(): + return nil + + case err := <-errC: + return err + + default: + v, ok := dataCRef.Recv() + if !ok { + err := <-errC + if err != nil { + return err + } + + // closed chan, skip to next query + continue NextQuery + } + + obj := v.Interface() + id := sel.ID(obj) + if _, exists := ids[id]; exists { + continue + } + + logrus.Debugf("inserting %T: %+v", obj, obj) + if err := s.Insert(obj); err != nil { + return err + } + } + } + } + + return nil +} + +func (s *MarginService) executeDbQuery(ctx context.Context, sel sq.SelectBuilder, tpe interface{}) (interface{}, error) { + sql, args, err := sel.ToSql() + + if err != nil { + return nil, err + } + + rows, err := s.DB.QueryxContext(ctx, sql, args...) + if err != nil { + return nil, err + } + + defer rows.Close() + return s.scanRows(rows, tpe) +} + +func (s *MarginService) scanRows(rows *sqlx.Rows, tpe interface{}) (interface{}, error) { + refType := reflect.TypeOf(tpe) + + if refType.Kind() == reflect.Ptr { + refType = refType.Elem() + } + + sliceRef := reflect.New(reflect.SliceOf(refType)) + for rows.Next() { + var recordRef = reflect.New(refType) + var record = recordRef.Interface() + if err := rows.StructScan(&record); err != nil { + return sliceRef.Interface(), err + } + + sliceRef = reflect.Append(sliceRef, recordRef) + } + + return sliceRef.Interface(), rows.Err() +} + +func (s *MarginService) Insert(record interface{}) error { + sql := dbCache.InsertSqlOf(record) + _, err := s.DB.NamedExec(sql, record) + return err +} + +func SelectLastMarginLoans(ex types.ExchangeName, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("margin_loans"). + Where(sq.Eq{"exchange": ex}). + OrderBy("time"). + Limit(limit) +} + +func SelectLastMarginRepays(ex types.ExchangeName, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("margin_repays"). + Where(sq.Eq{"exchange": ex}). + OrderBy("time"). + Limit(limit) +} + +func SelectLastMarginInterests(ex types.ExchangeName, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("margin_interests"). + Where(sq.Eq{"exchange": ex}). + OrderBy("time"). + Limit(limit) +} + +func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.SelectBuilder { + return sq.Select("*"). + From("margin_liquidations"). + Where(sq.Eq{"exchange": ex}). + OrderBy("time"). + Limit(limit) +} + +func lastRecordTime(sel SyncSelect, recordSlice reflect.Value, defaultTime time.Time) time.Time { + since := defaultTime + length := recordSlice.Len() + if length > 0 { + since = sel.Time(recordSlice.Index(length - 1)) + } + + return since +} + +func sortRecords(sel SyncSelect, recordSlice reflect.Value) { + // always sort + sort.Slice(recordSlice.Interface(), func(i, j int) bool { + a := sel.Time(recordSlice.Index(i).Interface()) + b := sel.Time(recordSlice.Index(j).Interface()) + return a.Before(b) + }) +} + +func buildIdMap(sel SyncSelect, recordSliceRef reflect.Value) map[string]struct{} { + ids := map[string]struct{}{} + for i := 0; i < recordSliceRef.Len(); i++ { + entryRef := recordSliceRef.Index(i) + id := sel.ID(entryRef.Interface()) + ids[id] = struct{}{} + } + + return ids +} diff --git a/pkg/service/margin_test.go b/pkg/service/margin_test.go new file mode 100644 index 000000000..387577eac --- /dev/null +++ b/pkg/service/margin_test.go @@ -0,0 +1,47 @@ +package service + +import ( + "context" + "testing" + "time" + + "github.com/jmoiron/sqlx" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + + "github.com/c9s/bbgo/pkg/exchange/binance" + "github.com/c9s/bbgo/pkg/testutil" +) + +func TestMarginService(t *testing.T) { + key, secret, ok := testutil.IntegrationTestConfigured(t, "BINANCE") + if !ok { + t.SkipNow() + return + } + + ex := binance.New(key, secret) + ex.MarginSettings.IsMargin = true + ex.MarginSettings.IsIsolatedMargin = true + ex.MarginSettings.IsolatedMarginSymbol = "DOTUSDT" + + db, err := prepareDB(t) + if err != nil { + t.Fatal(err) + } + + defer db.Close() + + ctx := context.Background() + + logrus.SetLevel(logrus.DebugLevel) + + dbx := sqlx.NewDb(db.DB, "sqlite3") + service := &MarginService{DB: dbx} + err = service.Sync(ctx, ex, "USDT", time.Date(2022, time.February, 1, 0, 0, 0, 0, time.UTC)) + assert.NoError(t, err) + + // sync second time to ensure that we can query records + err = service.Sync(ctx, ex, "USDT", time.Date(2022, time.February, 1, 0, 0, 0, 0, time.UTC)) + assert.NoError(t, err) +} diff --git a/pkg/service/reflect_test.go b/pkg/service/reflect_test.go index b2299aff0..9eb525ae9 100644 --- a/pkg/service/reflect_test.go +++ b/pkg/service/reflect_test.go @@ -58,7 +58,7 @@ func Test_fieldsNamesOf(t *testing.T) { { name: "MarginInterest", args: args{record: &types.MarginInterest{}}, - want: []string{"asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"}, + want: []string{"exchange", "asset", "principle", "interest", "interest_rate", "isolated_symbol", "time"}, }, } for _, tt := range tests { diff --git a/pkg/testutil/auth.go b/pkg/testutil/auth.go new file mode 100644 index 000000000..164207e29 --- /dev/null +++ b/pkg/testutil/auth.go @@ -0,0 +1,25 @@ +package testutil + +import ( + "os" + "regexp" + "testing" +) + +func maskSecret(s string) string { + re := regexp.MustCompile(`\b(\w{4})\w+\b`) + s = re.ReplaceAllString(s, "$1******") + return s +} + +func IntegrationTestConfigured(t *testing.T, prefix string) (key, secret string, ok bool) { + var hasKey, hasSecret bool + key, hasKey = os.LookupEnv(prefix + "_API_KEY") + secret, hasSecret = os.LookupEnv(prefix + "_API_SECRET") + ok = hasKey && hasSecret && os.Getenv("TEST_"+prefix) == "1" + if ok { + t.Logf(prefix+" api integration test enabled, key = %s, secret = %s", maskSecret(key), maskSecret(secret)) + } + + return key, secret, ok +} diff --git a/pkg/types/margin.go b/pkg/types/margin.go index 5aa569c3f..c517954d0 100644 --- a/pkg/types/margin.go +++ b/pkg/types/margin.go @@ -98,7 +98,7 @@ type MarginLiquidation struct { Symbol string `json:"symbol" db:"symbol"` TimeInForce TimeInForce `json:"timeInForce" db:"time_in_force"` IsIsolated bool `json:"isIsolated" db:"is_isolated"` - UpdatedTime Time `json:"updatedTime" db:"updated_time"` + UpdatedTime Time `json:"updatedTime" db:"time"` } // MarginHistory provides the service of querying loan history and repay history