bbgo_origin/pkg/bbgo/persistence.go

146 lines
4.0 KiB
Go
Raw Permalink Normal View History

package bbgo
import (
"fmt"
"reflect"
log "github.com/sirupsen/logrus"
2022-03-14 13:21:04 +00:00
"github.com/c9s/bbgo/pkg/dynamic"
2022-03-14 13:21:04 +00:00
"github.com/c9s/bbgo/pkg/service"
)
2020-12-07 03:43:17 +00:00
type PersistenceSelector struct {
// StoreID is the store you want to use.
StoreID string `json:"store" yaml:"store"`
// Type is the persistence type
Type string `json:"type" yaml:"type"`
}
var DefaultPersistenceServiceFacade = &service.PersistenceServiceFacade{
Memory: service.NewMemoryService(),
}
var PersistenceServiceFacade = DefaultPersistenceServiceFacade
2020-12-07 03:43:17 +00:00
// Persistence is used for strategy to inject the persistence.
type Persistence struct {
PersistenceSelector *PersistenceSelector `json:"persistence,omitempty" yaml:"persistence,omitempty"`
}
2021-03-18 09:20:07 +00:00
func (p *Persistence) backendService(t string) (service.PersistenceService, error) {
2020-12-07 03:43:17 +00:00
switch t {
case "json":
return PersistenceServiceFacade.Json, nil
2020-12-07 03:43:17 +00:00
case "redis":
if PersistenceServiceFacade.Redis == nil {
log.Warn("redis persistence is not available, fallback to memory backend")
return PersistenceServiceFacade.Memory, nil
}
return PersistenceServiceFacade.Redis, nil
2020-12-07 03:43:17 +00:00
2020-12-07 04:03:56 +00:00
case "memory":
return PersistenceServiceFacade.Memory, nil
2020-12-07 04:03:56 +00:00
2020-12-07 03:43:17 +00:00
}
2021-03-18 09:20:07 +00:00
return nil, fmt.Errorf("unsupported persistent type %s", t)
2020-12-07 03:43:17 +00:00
}
func (p *Persistence) Load(val interface{}, subIDs ...string) error {
2021-03-18 09:20:07 +00:00
ps, err := p.backendService(p.PersistenceSelector.Type)
2020-12-07 03:43:17 +00:00
if err != nil {
return err
}
2022-03-14 13:21:04 +00:00
log.Debugf("using persistence store %T for loading", ps)
2020-12-07 03:43:17 +00:00
if p.PersistenceSelector.StoreID == "" {
2021-03-18 09:20:07 +00:00
p.PersistenceSelector.StoreID = "default"
2020-12-07 03:43:17 +00:00
}
2021-03-18 09:20:07 +00:00
store := ps.NewStore(p.PersistenceSelector.StoreID, subIDs...)
2020-12-07 03:43:17 +00:00
return store.Load(val)
}
func (p *Persistence) Save(val interface{}, subIDs ...string) error {
2021-03-18 09:20:07 +00:00
ps, err := p.backendService(p.PersistenceSelector.Type)
2020-12-07 03:43:17 +00:00
if err != nil {
return err
}
2022-03-14 13:21:04 +00:00
log.Debugf("using persistence store %T for storing", ps)
2020-12-07 03:43:17 +00:00
if p.PersistenceSelector.StoreID == "" {
2021-03-18 09:20:07 +00:00
p.PersistenceSelector.StoreID = "default"
2020-12-07 03:43:17 +00:00
}
2021-03-18 09:20:07 +00:00
store := ps.NewStore(p.PersistenceSelector.StoreID, subIDs...)
2020-12-07 03:43:17 +00:00
return store.Save(val)
}
2022-05-05 10:02:08 +00:00
func (p *Persistence) Sync(obj interface{}) error {
2022-08-22 18:12:26 +00:00
id := dynamic.CallID(obj)
2022-05-05 10:02:08 +00:00
if len(id) == 0 {
return nil
}
ps := PersistenceServiceFacade.Get()
return storePersistenceFields(obj, id, ps)
}
// Sync syncs the object properties into the persistence layer
2022-06-21 07:57:26 +00:00
func Sync(obj interface{}) {
2022-08-22 18:12:26 +00:00
id := dynamic.CallID(obj)
if len(id) == 0 {
2022-06-21 07:57:26 +00:00
log.Warnf("InstanceID() is not provided, can not sync persistence")
return
}
ps := PersistenceServiceFacade.Get()
2022-06-21 07:57:26 +00:00
err := storePersistenceFields(obj, id, ps)
if err != nil {
log.WithError(err).Errorf("persistence sync failed")
}
2022-05-05 10:02:08 +00:00
}
2022-05-05 05:05:01 +00:00
func loadPersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
return dynamic.IterateFieldsByTag(obj, "persistence", func(tag string, field reflect.StructField, value reflect.Value) error {
log.Debugf("[loadPersistenceFields] loading value into field %v, tag = %s, original value = %v", field, tag, value)
newValueInf := dynamic.NewTypeValueInterface(value.Type())
// inf := value.Interface()
2022-05-05 09:56:41 +00:00
store := persistence.NewStore("state", id, tag)
if err := store.Load(&newValueInf); err != nil {
if err == service.ErrPersistenceNotExists {
log.Debugf("[loadPersistenceFields] state key does not exist, id = %v, tag = %s", id, tag)
return nil
}
return err
}
newValue := reflect.ValueOf(newValueInf)
if value.Kind() != reflect.Ptr && newValue.Kind() == reflect.Ptr {
newValue = newValue.Elem()
}
log.Debugf("[loadPersistenceFields] %v = %v -> %v\n", field, value, newValue)
value.Set(newValue)
return nil
})
}
2022-05-05 05:05:01 +00:00
func storePersistenceFields(obj interface{}, id string, persistence service.PersistenceService) error {
return dynamic.IterateFieldsByTag(obj, "persistence", func(tag string, ft reflect.StructField, fv reflect.Value) error {
log.Debugf("[storePersistenceFields] storing value from field %v, tag = %s, original value = %v", ft, tag, fv)
inf := fv.Interface()
2022-05-05 09:56:41 +00:00
store := persistence.NewStore("state", id, tag)
return store.Save(inf)
})
}