bbgo_origin/pkg/service/sync_task.go

214 lines
5.6 KiB
Go
Raw Permalink Normal View History

2022-06-01 04:02:15 +00:00
package service
import (
"context"
"reflect"
"sort"
"time"
"github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx"
2022-06-01 10:30:10 +00:00
"github.com/pkg/errors"
2022-06-01 04:02:15 +00:00
"github.com/sirupsen/logrus"
)
// SyncTask defines the behaviors for syncing remote records
type SyncTask struct {
2022-06-01 10:30:10 +00:00
// Type is the element type of this sync task
// Since it will create a []Type slice from this type, you should not set pointer to this field
Type interface{}
// ID is a function that returns the unique identity of the object
2022-06-02 13:27:28 +00:00
// This function will be used for detecting duplicated objects.
ID func(obj interface{}) string
// Time is a function that returns the time of the object
2022-06-02 13:27:28 +00:00
// This function will be used for sorting records
Time func(obj interface{}) time.Time
2022-06-02 13:27:28 +00:00
// Select is the select query builder for querying existing db records
// The built SQL will be used for querying existing db records.
// And then the ID function will be used for filtering duplicated object.
2022-06-01 10:30:10 +00:00
Select squirrel.SelectBuilder
// OnLoad is an optional field, which is called when the records are loaded from the database
2022-06-01 10:30:10 +00:00
OnLoad func(objs interface{})
2022-06-01 04:02:15 +00:00
// Filter is an optional field, which is used for filtering the remote records
// Return true to keep the record,
// Return false to filter the record.
Filter func(obj interface{}) bool
2022-06-01 04:02:15 +00:00
2022-06-06 09:21:31 +00:00
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
// Insert is an option field, which is used for customizing the record insert
Insert func(obj interface{}) error
2022-06-01 10:30:10 +00:00
2022-06-06 09:21:31 +00:00
// Insert is an option field, which is used for customizing the record batch insert
BatchInsert func(obj interface{}) error
BatchInsertBuffer int
2022-06-06 04:24:18 +00:00
// LogInsert logs the insert record in INFO level
LogInsert bool
2022-06-01 04:02:15 +00:00
}
2024-06-19 06:18:21 +00:00
func (sel SyncTask) execute(
ctx context.Context,
db *sqlx.DB, startTime time.Time, endTimeArgs ...time.Time,
) error {
2022-06-06 09:21:31 +00:00
batchBufferRefVal := reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)
2022-06-01 04:02:15 +00:00
// query from db
recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type)
if err != nil {
return err
}
recordSliceRef := reflect.ValueOf(recordSlice)
if recordSliceRef.Kind() == reflect.Ptr {
recordSliceRef = recordSliceRef.Elem()
}
2022-06-01 10:30:10 +00:00
logrus.Debugf("loaded %d %T records", recordSliceRef.Len(), sel.Type)
2022-06-01 04:02:15 +00:00
ids := buildIdMap(sel, recordSliceRef)
2022-06-01 10:30:10 +00:00
2022-06-23 09:49:28 +00:00
if err := sortRecordsAscending(sel, recordSliceRef); err != nil {
2022-06-01 10:30:10 +00:00
return err
}
if sel.OnLoad != nil {
sel.OnLoad(recordSliceRef.Interface())
}
2022-06-01 04:02:15 +00:00
// default since time point
2022-06-02 08:40:24 +00:00
startTime = lastRecordTime(sel, recordSliceRef, startTime)
endTime := time.Now()
2024-06-19 06:18:21 +00:00
if len(endTimeArgs) > 0 {
endTime = endTimeArgs[0]
2022-06-02 08:40:24 +00:00
}
2022-06-01 04:02:15 +00:00
// asset "" means all assets
2022-06-02 08:40:24 +00:00
dataC, errC := sel.BatchQuery(ctx, startTime, endTime)
2022-06-01 04:02:15 +00:00
dataCRef := reflect.ValueOf(dataC)
2022-06-06 09:21:31 +00:00
defer func() {
if sel.BatchInsert != nil && batchBufferRefVal.Len() > 0 {
slice := batchBufferRefVal.Interface()
if err := sel.BatchInsert(slice); err != nil {
logrus.WithError(err).Errorf("batch insert error: %+v", slice)
2022-06-06 09:21:31 +00:00
}
}
}()
2022-06-01 04:02:15 +00:00
for {
select {
case <-ctx.Done():
2022-06-02 13:27:28 +00:00
logrus.Warnf("context is cancelled, stop syncing")
return ctx.Err()
2022-06-01 04:02:15 +00:00
default:
v, ok := dataCRef.Recv()
if !ok {
err := <-errC
return err
}
obj := v.Interface()
id := sel.ID(obj)
if _, exists := ids[id]; exists {
2022-06-24 09:14:30 +00:00
logrus.Debugf("object %s already exists, skipping", id)
2022-06-01 04:02:15 +00:00
continue
}
tt := sel.Time(obj)
2022-06-23 09:51:45 +00:00
if tt.Before(startTime) || tt.After(endTime) {
2022-06-24 09:14:30 +00:00
logrus.Debugf("object %s time %s is outside of the time range", id, tt)
continue
}
if sel.Filter != nil {
if !sel.Filter(obj) {
2022-06-24 09:14:30 +00:00
logrus.Debugf("object %s is filtered", id)
continue
}
}
ids[id] = struct{}{}
2022-06-06 09:21:31 +00:00
if sel.BatchInsert != nil {
if batchBufferRefVal.Len() > sel.BatchInsertBuffer-1 {
2022-06-06 09:21:31 +00:00
if sel.LogInsert {
logrus.Infof("batch inserting %d %T", batchBufferRefVal.Len(), obj)
} else {
logrus.Debugf("batch inserting %d %T", batchBufferRefVal.Len(), obj)
}
2022-06-06 09:21:31 +00:00
if err := sel.BatchInsert(batchBufferRefVal.Interface()); err != nil {
return err
}
batchBufferRefVal = reflect.MakeSlice(reflect.SliceOf(reflect.TypeOf(sel.Type)), 0, sel.BatchInsertBuffer)
}
2022-06-06 09:21:31 +00:00
batchBufferRefVal = reflect.Append(batchBufferRefVal, v)
} else {
2022-06-06 09:21:31 +00:00
if sel.LogInsert {
logrus.Infof("inserting %T: %+v", obj, obj)
} else {
logrus.Debugf("inserting %T: %+v", obj, obj)
}
if sel.Insert != nil {
// for custom insert
if err := sel.Insert(obj); err != nil {
logrus.WithError(err).Errorf("can not insert record: %v", obj)
2022-06-06 09:21:31 +00:00
return err
}
} else {
if err := insertType(db, obj); err != nil {
logrus.WithError(err).Errorf("can not insert record: %v", obj)
2022-06-06 09:21:31 +00:00
return err
}
}
2022-06-01 04:02:15 +00:00
}
}
}
}
func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Time) time.Time {
since := defaultTime
length := recordSlice.Len()
if length > 0 {
2022-06-24 07:27:51 +00:00
last := recordSlice.Index(length - 1)
since = sel.Time(last.Interface())
2022-06-01 04:02:15 +00:00
}
return since
}
2022-06-23 09:49:28 +00:00
func sortRecordsAscending(sel SyncTask, recordSlice reflect.Value) error {
2022-06-01 10:30:10 +00:00
if sel.Time == nil {
return errors.New("time field is not set, can not sort records")
}
2022-06-01 04:02:15 +00:00
// always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b)
})
2022-06-01 10:30:10 +00:00
return nil
2022-06-01 04:02:15 +00:00
}
func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} {
ids := map[string]struct{}{}
for i := 0; i < recordSliceRef.Len(); i++ {
entryRef := recordSliceRef.Index(i)
id := sel.ID(entryRef.Interface())
ids[id] = struct{}{}
}
return ids
}