service/sync: add onLoad event support

This commit is contained in:
c9s 2022-06-01 18:30:10 +08:00
parent fb63346732
commit 415450acb7
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -8,24 +8,33 @@ import (
"github.com/Masterminds/squirrel" "github.com/Masterminds/squirrel"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
) )
// SyncTask defines the behaviors for syncing remote records // SyncTask defines the behaviors for syncing remote records
type SyncTask struct { type SyncTask struct {
Select squirrel.SelectBuilder // Type is the element type of this sync task
Type interface{} // Since it will create a []Type slice from this type, you should not set pointer to this field
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) Type interface{}
// Select is the select query builder for querying db records
Select squirrel.SelectBuilder
// OnLoad is called when the records are loaded from the database
OnLoad func(objs interface{})
// ID is a function that returns the unique identity of the object // ID is a function that returns the unique identity of the object
ID func(obj interface{}) string ID func(obj interface{}) string
// Time is a function that returns the time of the object // Time is a function that returns the time of the object
Time func(obj interface{}) time.Time Time func(obj interface{}) time.Time
// BatchQuery is used for querying remote records.
BatchQuery func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error)
} }
func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error { func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Time) error {
// query from db // query from db
recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type) recordSlice, err := selectAndScanType(ctx, db, sel.Select, sel.Type)
if err != nil { if err != nil {
@ -37,10 +46,17 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
recordSliceRef = recordSliceRef.Elem() recordSliceRef = recordSliceRef.Elem()
} }
logrus.Debugf("loaded %d records", recordSliceRef.Len()) logrus.Debugf("loaded %d %T records", recordSliceRef.Len(), sel.Type)
ids := buildIdMap(sel, recordSliceRef) ids := buildIdMap(sel, recordSliceRef)
sortRecords(sel, recordSliceRef)
if err := sortRecords(sel, recordSliceRef); err != nil {
return err
}
if sel.OnLoad != nil {
sel.OnLoad(recordSliceRef.Interface())
}
// default since time point // default since time point
since := lastRecordTime(sel, recordSliceRef, startTime) since := lastRecordTime(sel, recordSliceRef, startTime)
@ -70,7 +86,7 @@ func (sel SyncTask) execute(ctx context.Context, db *sqlx.DB, startTime time.Tim
continue continue
} }
logrus.Debugf("inserting %T: %+v", obj, obj) logrus.Infof("inserting %T: %+v", obj, obj)
if err := insertType(db, obj); err != nil { if err := insertType(db, obj); err != nil {
return err return err
} }
@ -82,19 +98,24 @@ func lastRecordTime(sel SyncTask, recordSlice reflect.Value, defaultTime time.Ti
since := defaultTime since := defaultTime
length := recordSlice.Len() length := recordSlice.Len()
if length > 0 { if length > 0 {
since = sel.Time(recordSlice.Index(length - 1)) since = sel.Time(recordSlice.Index(length - 1).Interface())
} }
return since return since
} }
func sortRecords(sel SyncTask, recordSlice reflect.Value) { func sortRecords(sel SyncTask, recordSlice reflect.Value) error {
if sel.Time == nil {
return errors.New("time field is not set, can not sort records")
}
// always sort // always sort
sort.Slice(recordSlice.Interface(), func(i, j int) bool { sort.Slice(recordSlice.Interface(), func(i, j int) bool {
a := sel.Time(recordSlice.Index(i).Interface()) a := sel.Time(recordSlice.Index(i).Interface())
b := sel.Time(recordSlice.Index(j).Interface()) b := sel.Time(recordSlice.Index(j).Interface())
return a.Before(b) return a.Before(b)
}) })
return nil
} }
func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} { func buildIdMap(sel SyncTask, recordSliceRef reflect.Value) map[string]struct{} {