service: refactor sync task

This commit is contained in:
c9s 2022-06-01 12:02:15 +08:00
parent f116b7b2d0
commit 279e4d8682
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 161 additions and 144 deletions

View File

@ -3,32 +3,16 @@ package service
import (
"context"
"fmt"
"reflect"
"sort"
"strconv"
"time"
sq "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/batch"
"github.com/c9s/bbgo/pkg/types"
)
// SyncSelect defines the behaviors for syncing remote records
type SyncSelect struct {
Select sq.SelectBuilder
Type interface{}
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
// ID is a function that returns the unique identity of the object
ID func(obj interface{}) string
// Time is a function that returns the time of the object
Time func(obj interface{}) time.Time
}
type MarginService struct {
DB *sqlx.DB
}
@ -49,7 +33,7 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
return fmt.Errorf("exchange instance %s is not using margin", ex.Name())
}
sels := []SyncSelect{
tasks := []SyncTask{
{
Select: SelectLastMarginLoans(ex.Name(), 100),
Type: types.MarginLoan{},
@ -106,111 +90,15 @@ func (s *MarginService) Sync(ctx context.Context, ex types.Exchange, asset strin
},
}
NextQuery:
for _, sel := range sels {
// query from db
recordSlice, err := s.executeDbQuery(ctx, sel.Select, sel.Type)
if err != nil {
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
return err
}
recordSliceRef := reflect.ValueOf(recordSlice)
if recordSliceRef.Kind() == reflect.Ptr {
recordSliceRef = recordSliceRef.Elem()
}
logrus.Debugf("loaded %d records", recordSliceRef.Len())
ids := buildIdMap(sel, recordSliceRef)
sortRecords(sel, recordSliceRef)
// default since time point
since := lastRecordTime(sel, recordSliceRef, startTime)
// asset "" means all assets
dataC, errC := sel.BatchQuery(ctx, since, time.Now())
dataCRef := reflect.ValueOf(dataC)
for {
select {
case <-ctx.Done():
return nil
case err := <-errC:
return err
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
if err != nil {
return err
}
// closed chan, skip to next query
continue NextQuery
}
obj := v.Interface()
id := sel.ID(obj)
if _, exists := ids[id]; exists {
continue
}
logrus.Debugf("inserting %T: %+v", obj, obj)
if err := s.Insert(obj); err != nil {
return err
}
}
}
}
return nil
}
func (s *MarginService) executeDbQuery(ctx context.Context, sel sq.SelectBuilder, tpe interface{}) (interface{}, error) {
sql, args, err := sel.ToSql()
if err != nil {
return nil, err
}
rows, err := s.DB.QueryxContext(ctx, sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return s.scanRows(rows, tpe)
}
func (s *MarginService) scanRows(rows *sqlx.Rows, tpe interface{}) (interface{}, error) {
refType := reflect.TypeOf(tpe)
if refType.Kind() == reflect.Ptr {
refType = refType.Elem()
}
sliceRef := reflect.New(reflect.SliceOf(refType))
for rows.Next() {
var recordRef = reflect.New(refType)
var record = recordRef.Interface()
if err := rows.StructScan(&record); err != nil {
return sliceRef.Interface(), err
}
sliceRef = reflect.Append(sliceRef, recordRef)
}
return sliceRef.Interface(), rows.Err()
}
func (s *MarginService) Insert(record interface{}) error {
sql := dbCache.InsertSqlOf(record)
_, err := s.DB.NamedExec(sql, record)
return err
}
func SelectLastMarginLoans(ex types.ExchangeName, limit uint64) sq.SelectBuilder {
return sq.Select("*").
From("margin_loans").
@ -243,32 +131,3 @@ func SelectLastMarginLiquidations(ex types.ExchangeName, limit uint64) sq.Select
Limit(limit)
}
func lastRecordTime(sel SyncSelect, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime
length := recordSlice.Len()
if length > 0 {
since = sel.Time(recordSlice.Index(length - 1))
}
return since
}
func sortRecords(sel SyncSelect, recordSlice reflect.Value) {
// always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b)
})
}
func buildIdMap(sel SyncSelect, recordSliceRef reflect.Value) map[string]struct{} {
ids := map[string]struct{}{}
for i := 0; i < recordSliceRef.Len(); i++ {
entryRef := recordSliceRef.Index(i)
id := sel.ID(entryRef.Interface())
ids[id] = struct{}{}
}
return ids
}

View File

@ -1,11 +1,14 @@
package service
import (
"context"
"reflect"
"strings"
"github.com/Masterminds/squirrel"
"github.com/fatih/camelcase"
gopluralize "github.com/gertd/go-pluralize"
"github.com/jmoiron/sqlx"
)
var pluralize = gopluralize.NewClient()
@ -152,3 +155,49 @@ func (c *ReflectCache) FieldsOf(t interface{}) []string {
c.fields[typeName] = fields
return fields
}
// scanRowsOfType use the given type to scan rows
// this is usually slower than the native one since it uses reflect.
func scanRowsOfType(rows *sqlx.Rows, tpe interface{}) (interface{}, error) {
refType := reflect.TypeOf(tpe)
if refType.Kind() == reflect.Ptr {
refType = refType.Elem()
}
sliceRef := reflect.New(reflect.SliceOf(refType))
for rows.Next() {
var recordRef = reflect.New(refType)
var record = recordRef.Interface()
if err := rows.StructScan(&record); err != nil {
return sliceRef.Interface(), err
}
sliceRef = reflect.Append(sliceRef, recordRef)
}
return sliceRef.Interface(), rows.Err()
}
func insertType(db *sqlx.DB, record interface{}) error {
sql := dbCache.InsertSqlOf(record)
_, err := db.NamedExec(sql, record)
return err
}
func selectAndScanType(ctx context.Context, db *sqlx.DB, sel squirrel.SelectBuilder, tpe interface{}) (interface{}, error) {
sql, args, err := sel.ToSql()
if err != nil {
return nil, err
}
rows, err := db.QueryxContext(ctx, sql, args...)
if err != nil {
return nil, err
}
defer rows.Close()
return scanRowsOfType(rows, tpe)
}

109
pkg/service/sync_task.go Normal file
View File

@ -0,0 +1,109 @@
package service
import (
"context"
"reflect"
"sort"
"time"
"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
"github.com/sirupsen/logrus"
)
// SyncTask defines the behaviors for syncing remote records
type SyncTask struct {
Select squirrel.SelectBuilder
Type interface{}
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
// ID is a function that returns the unique identity of the object
ID func(obj interface{}) string
// Time is a function that returns the time of the object
Time func(obj interface{}) time.Time
}
func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error {
// query from db
recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type)
if err != nil {
return err
}
recordSliceRef := reflect.ValueOf(recordSlice)
if recordSliceRef.Kind() == reflect.Ptr {
recordSliceRef = recordSliceRef.Elem()
}
logrus.Debugf("loaded %d records", recordSliceRef.Len())
ids := buildIdMap(sel, recordSliceRef)
sortRecords(sel, recordSliceRef)
// default since time point
since := lastRecordTime(sel, recordSliceRef, startTime)
// asset "" means all assets
dataC, errC := sel.BatchQuery(ctx, since, time.Now())
dataCRef := reflect.ValueOf(dataC)
for {
select {
case <-ctx.Done():
return nil
case err := <-errC:
return err
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
return err
}
obj := v.Interface()
id := sel.ID(obj)
if _, exists := ids[id]; exists {
continue
}
logrus.Debugf("inserting %T: %+v", obj, obj)
if err := insertType(db, obj); err != nil {
return err
}
}
}
}
func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime
length := recordSlice.Len()
if length > 0 {
since = sel.Time(recordSlice.Index(length - 1))
}
return since
}
func sortRecords(sel SyncTask, recordSlice reflect.Value) {
// always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b)
})
}
func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} {
ids := map[string]struct{}{}
for i := 0; i < recordSliceRef.Len(); i++ {
entryRef := recordSliceRef.Index(i)
id := sel.ID(entryRef.Interface())
ids[id] = struct{}{}
}
return ids
}