bbgo/pkg/bbgo/persistence.go

123 lines
3.4 KiB
Go

package bbgo
import (
"context"
"os"
"reflect"
"sync"
"github.com/codingconcepts/env"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"git.qtrade.icu/lychiyu/bbgo/pkg/dynamic"
"git.qtrade.icu/lychiyu/bbgo/pkg/service"
)
var defaultPersistenceServiceFacade = &service.PersistenceServiceFacade{
Memory: service.NewMemoryService(),
}
// Sync syncs the object properties into the persistence layer
func Sync(ctx context.Context, obj interface{}) {
id := dynamic.CallID(obj)
if len(id) == 0 {
log.Warnf("InstanceID() is not provided, can not sync persistence")
return
}
isolation := GetIsolationFromContext(ctx)
ps := isolation.persistenceServiceFacade.Get()
locker, ok := obj.(sync.Locker)
if ok {
locker.Lock()
defer locker.Unlock()
}
err := storePersistenceFields(obj, id, ps)
if err != nil {
log.WithError(err).Errorf("persistence sync failed")
}
}
func loadPersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
return dynamic.IterateFieldsByTag(obj, "persistence", true, func(tag string, field reflect.StructField, value reflect.Value) error {
log.Debugf("[loadPersistenceFields] loading value into field %v, tag = %s, original value = %v", field, tag, value)
newValueInf := dynamic.NewTypeValueInterface(value.Type())
// inf := value.Interface()
store := persistence.NewStore("state", id, tag)
if err := store.Load(&newValueInf); err != nil {
if err == service.ErrPersistenceNotExists {
log.Debugf("[loadPersistenceFields] state key does not exist, id = %v, tag = %s", id, tag)
return nil
}
return err
}
newValue := reflect.ValueOf(newValueInf)
if value.Kind() != reflect.Ptr && newValue.Kind() == reflect.Ptr {
newValue = newValue.Elem()
}
log.Debugf("[loadPersistenceFields] %v = %v -> %v\n", field, value, newValue)
value.Set(newValue)
return nil
})
}
func storePersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
return dynamic.IterateFieldsByTag(obj, "persistence", true, func(tag string, ft reflect.StructField, fv reflect.Value) error {
log.Debugf("[storePersistenceFields] storing value from field %v, tag = %s, original value = %v", ft, tag, fv)
inf := fv.Interface()
store := persistence.NewStore("state", id, tag)
return store.Save(inf)
})
}
func NewPersistenceServiceFacade(conf *PersistenceConfig) (*service.PersistenceServiceFacade, error) {
facade := &service.PersistenceServiceFacade{
Memory: service.NewMemoryService(),
}
if conf.Redis != nil {
if err := env.Set(conf.Redis); err != nil {
return nil, err
}
redisPersistence := service.NewRedisPersistenceService(conf.Redis)
facade.Redis = redisPersistence
}
if conf.Json != nil {
if _, err := os.Stat(conf.Json.Directory); os.IsNotExist(err) {
if err2 := os.MkdirAll(conf.Json.Directory, 0777); err2 != nil {
return nil, errors.Wrapf(err2, "can not create directory: %s", conf.Json.Directory)
}
}
jsonPersistence := &service.JsonPersistenceService{Directory: conf.Json.Directory}
facade.Json = jsonPersistence
}
return facade, nil
}
func ConfigurePersistence(ctx context.Context, environ *Environment, conf *PersistenceConfig) error {
facade, err := NewPersistenceServiceFacade(conf)
if err != nil {
return err
}
isolation := GetIsolationFromContext(ctx)
isolation.persistenceServiceFacade = facade
environ.PersistentService = facade
return nil
}