From 2d98336fb6d448e16f39d332936c896bf6c97d42 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 7 Dec 2020 11:43:17 +0800 Subject: [PATCH] implement Persistent API for strategy --- go.mod | 2 +- pkg/bbgo/environment.go | 21 +++++++++--- pkg/bbgo/injection.go | 6 ++-- pkg/bbgo/persistence.go | 62 ++++++++++++++++++++++++++++++++++- pkg/bbgo/redis_persistence.go | 50 +++++++++++++++++++++++----- pkg/bbgo/trader.go | 31 ++++++++++++++++-- pkg/cmd/run.go | 8 +++-- 7 files changed, 157 insertions(+), 23 deletions(-) diff --git a/go.mod b/go.mod index c750f3706..5ee5f02f6 100644 --- a/go.mod +++ b/go.mod @@ -8,7 +8,7 @@ require ( github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/adshao/go-binance v0.0.0-20201015231210-37cee298310e github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b - github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482 // indirect + github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482 github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/go-redis/redis/v8 v8.4.0 github.com/go-sql-driver/mysql v1.5.0 diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 4ddc40375..b38a0baad 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/codingconcepts/env" "github.com/jmoiron/sqlx" log "github.com/sirupsen/logrus" @@ -40,6 +41,8 @@ type Environment struct { // note that, for back tests, we don't need notification. Notifiability + PersistenceServiceFacade *PersistenceServiceFacade + TradeService *service.TradeService TradeSync *service.SyncService @@ -47,6 +50,7 @@ type Environment struct { startTime time.Time tradeScanTime time.Time sessions map[string]*ExchangeSession + } func NewEnvironment() *Environment { @@ -221,27 +225,35 @@ func (environ *Environment) Init(ctx context.Context) (err error) { return nil } -func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) { +func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error { var facade = &PersistenceServiceFacade{} if conf.Redis != nil { + if err := env.Set(conf.Redis) ; err != nil { + return err + } + facade.Redis = NewRedisPersistenceService(conf.Redis) } if conf.Json != nil { - if _, err := os.Stat(conf.Json.Directory) ; os.IsNotExist(err) { - if err2 := os.MkdirAll(conf.Json.Directory, 0777) ; err2 != nil { + if _, err := os.Stat(conf.Json.Directory); os.IsNotExist(err) { + if err2 := os.MkdirAll(conf.Json.Directory, 0777); err2 != nil { log.WithError(err2).Errorf("can not create directory: %s", conf.Json.Directory) + return err2 } } facade.Json = &JsonPersistenceService{Directory: conf.Json.Directory} } + + environ.PersistenceServiceFacade = facade + return nil } // configure notification rules // for symbol-based routes, we should register the same symbol rules for each session. // for session-based routes, we should set the fixed callbacks for each session -func (environ *Environment) ConfigureNotification(conf *NotificationConfig) { +func (environ *Environment) ConfigureNotification(conf *NotificationConfig) error { // configure routing here if conf.SymbolChannels != nil { environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels) @@ -383,6 +395,7 @@ func (environ *Environment) ConfigureNotification(conf *NotificationConfig) { } } + return nil } func (environ *Environment) SetStartTime(t time.Time) *Environment { diff --git a/pkg/bbgo/injection.go b/pkg/bbgo/injection.go index d0e1693b9..25dd14100 100644 --- a/pkg/bbgo/injection.go +++ b/pkg/bbgo/injection.go @@ -20,9 +20,9 @@ func isSymbolBasedStrategy(rs reflect.Value) (string, bool) { return field.String(), true } -func hasField(rs reflect.Value, fieldName string) bool { - field := rs.FieldByName(fieldName) - return field.IsValid() +func hasField(rs reflect.Value, fieldName string) (field reflect.Value, ok bool) { + field = rs.FieldByName(fieldName) + return field, field.IsValid() } func injectField(rs reflect.Value, fieldName string, obj interface{}, pointerOnly bool) error { diff --git a/pkg/bbgo/persistence.go b/pkg/bbgo/persistence.go index 2e8a23619..4afc8547d 100644 --- a/pkg/bbgo/persistence.go +++ b/pkg/bbgo/persistence.go @@ -1,6 +1,66 @@ package bbgo +import "fmt" + +type PersistenceSelector struct { + // StoreID is the store you want to use. + StoreID string `json:"store" yaml:"store"` + + // Type is the persistence type + Type string `json:"type" yaml:"type"` +} + +// Persistence is used for strategy to inject the persistence. +type Persistence struct { + PersistenceSelector *PersistenceSelector `json:"persistence,omitempty" yaml:"persistence,omitempty"` + + Facade *PersistenceServiceFacade `json:"-" yaml:"-"` +} + +func (p *Persistence) backendService(t string) (service PersistenceService, err error) { + switch t { + case "json": + service = p.Facade.Json + + case "redis": + service = p.Facade.Redis + + default: + err = fmt.Errorf("unsupported persistent type %s", t) + } + + return service, err +} + +func (p *Persistence) Load(val interface{}, subIDs ...string) error { + service, err := p.backendService(p.PersistenceSelector.Type) + if err != nil { + return err + } + + if p.PersistenceSelector.StoreID == "" { + return fmt.Errorf("persistence.store can not be empty") + } + + store := service.NewStore(p.PersistenceSelector.StoreID, subIDs...) + return store.Load(val) +} + +func (p *Persistence) Save(val interface{}, subIDs ...string) error { + service, err := p.backendService(p.PersistenceSelector.Type) + if err != nil { + return err + } + + if p.PersistenceSelector.StoreID == "" { + return fmt.Errorf("persistence.store can not be empty") + } + + store := service.NewStore(p.PersistenceSelector.StoreID, subIDs...) + return store.Save(val) +} + type PersistenceServiceFacade struct { Redis *RedisPersistenceService - Json *JsonPersistenceService + Json *JsonPersistenceService } diff --git a/pkg/bbgo/redis_persistence.go b/pkg/bbgo/redis_persistence.go index 8a626a88b..4515da043 100644 --- a/pkg/bbgo/redis_persistence.go +++ b/pkg/bbgo/redis_persistence.go @@ -5,13 +5,15 @@ import ( "encoding/json" "io/ioutil" "net" + "os" "path/filepath" + "strings" "github.com/go-redis/redis/v8" ) type PersistenceService interface { - NewStore(id string) Store + NewStore(id string, subIDs ...string) Store } type Store interface { @@ -23,34 +25,52 @@ type JsonPersistenceService struct { Directory string } -func (s *JsonPersistenceService) NewStore(id string) *JsonStore { +func (s *JsonPersistenceService) NewStore(id string, subIDs ...string) Store { return &JsonStore{ - ID: id, - Filepath: filepath.Join(s.Directory, id) + ".json", + ID: id, + Directory: filepath.Join(append([]string{s.Directory}, subIDs...)...), } } type JsonStore struct { - ID string - Filepath string + ID string + Directory string } func (store JsonStore) Load(val interface{}) error { - data, err := ioutil.ReadFile(store.Filepath) + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { + return err2 + } + } + + p := filepath.Join(store.Directory, store.ID) + ".json" + data, err := ioutil.ReadFile(p) if err != nil { return err } + if len(data) == 0 { + return nil + } + return json.Unmarshal(data, val) } func (store JsonStore) Save(val interface{}) error { + if _, err := os.Stat(store.Directory); os.IsNotExist(err) { + if err2 := os.Mkdir(store.Directory, 0777); err2 != nil { + return err2 + } + } + data, err := json.Marshal(val) if err != nil { return err } - return ioutil.WriteFile(store.Filepath, data, 0666) + p := filepath.Join(store.Directory, store.ID) + ".json" + return ioutil.WriteFile(p, data, 0666) } type RedisPersistenceService struct { @@ -70,7 +90,11 @@ func NewRedisPersistenceService(config *RedisPersistenceConfig) *RedisPersistenc } } -func (s *RedisPersistenceService) NewStore(id string) *RedisStore { +func (s *RedisPersistenceService) NewStore(id string, subIDs ...string) Store { + if len(subIDs) > 0 { + id += ":" + strings.Join(subIDs, ":") + } + return &RedisStore{ redis: s.redis, ID: id, @@ -87,9 +111,17 @@ func (store *RedisStore) Load(val interface{}) error { cmd := store.redis.Get(context.Background(), store.ID) data, err := cmd.Result() if err != nil { + if err == redis.Nil { + return nil + } + return err } + if len(data) == 0 { + return nil + } + if err := json.Unmarshal([]byte(data), val); err != nil { return err } diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 13b9b699d..ec5e2d9c2 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -2,6 +2,7 @@ package bbgo import ( "context" + "fmt" "reflect" "sync" @@ -174,44 +175,51 @@ func (trader *Trader) Run(ctx context.Context) error { if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil { log.WithError(err).Errorf("strategy Graceful injection failed") + return err } if err := injectField(rs, "Logger", &trader.logger, false); err != nil { log.WithError(err).Errorf("strategy Logger injection failed") + return err } if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil { log.WithError(err).Errorf("strategy Notifiability injection failed") + return err } if err := injectField(rs, "OrderExecutor", orderExecutor, false); err != nil { log.WithError(err).Errorf("strategy OrderExecutor injection failed") + return err } if symbol, ok := isSymbolBasedStrategy(rs); ok { log.Infof("found symbol based strategy from %s", rs.Type()) - if hasField(rs, "Market") { + if _, ok := hasField(rs, "Market"); ok { if market, ok := session.Market(symbol); ok { // let's make the market object passed by pointer if err := injectField(rs, "Market", &market, false); err != nil { log.WithError(err).Errorf("strategy %T Market injection failed", strategy) + return err } } } // StandardIndicatorSet - if hasField(rs, "StandardIndicatorSet") { + if _, ok := hasField(rs, "StandardIndicatorSet"); ok { if indicatorSet, ok := session.StandardIndicatorSet(symbol); ok { if err := injectField(rs, "StandardIndicatorSet", indicatorSet, true); err != nil { log.WithError(err).Errorf("strategy %T StandardIndicatorSet injection failed", strategy) + return err } } } - if hasField(rs, "MarketDataStore") { + if _, ok := hasField(rs, "MarketDataStore"); ok { if store, ok := session.MarketDataStore(symbol); ok { if err := injectField(rs, "MarketDataStore", store, true); err != nil { log.WithError(err).Errorf("strategy %T MarketDataStore injection failed", strategy) + return err } } } @@ -236,16 +244,33 @@ func (trader *Trader) Run(ctx context.Context) error { // get the struct element rs = rs.Elem() + if field, ok := hasField(rs, "Persistence"); ok { + log.Infof("found Persistence field, injecting...") + + elem := field.Elem() + if elem.Kind() != reflect.Struct { + return fmt.Errorf("the field Persistence is not a struct element") + } + + if err := injectField(elem, "Facade", trader.environment.PersistenceServiceFacade, true); err != nil { + log.WithError(err).Errorf("strategy Persistence injection failed") + return err + } + } + if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil { log.WithError(err).Errorf("strategy Graceful injection failed") + return err } if err := injectField(rs, "Logger", &trader.logger, false); err != nil { log.WithError(err).Errorf("strategy Logger injection failed") + return err } if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil { log.WithError(err).Errorf("strategy Notifiability injection failed") + return err } } diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 175e2565b..fad94d49b 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -139,11 +139,15 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config) error { environ.Notifiability = notification if userConfig.Notifications != nil { - environ.ConfigureNotification(userConfig.Notifications) + if err := environ.ConfigureNotification(userConfig.Notifications); err != nil { + return err + } } if userConfig.Persistence != nil { - environ.ConfigurePersistence(userConfig.Persistence) + if err := environ.ConfigurePersistence(userConfig.Persistence); err != nil { + return err + } } trader := bbgo.NewTrader(environ)