implement Persistent API for strategy

This commit is contained in:
c9s 2020-12-07 11:43:17 +08:00
parent 341f735bc3
commit 2d98336fb6
7 changed files with 157 additions and 23 deletions

2
go.mod
View File

@ -8,7 +8,7 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0 github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/adshao/go-binance v0.0.0-20201015231210-37cee298310e github.com/adshao/go-binance v0.0.0-20201015231210-37cee298310e
github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b github.com/c9s/goose v0.0.0-20200415105707-8da682162a5b
github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482 // indirect github.com/codingconcepts/env v0.0.0-20200821220118-a8fbf8d84482
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-redis/redis/v8 v8.4.0 github.com/go-redis/redis/v8 v8.4.0
github.com/go-sql-driver/mysql v1.5.0 github.com/go-sql-driver/mysql v1.5.0

View File

@ -7,6 +7,7 @@ import (
"strings" "strings"
"time" "time"
"github.com/codingconcepts/env"
"github.com/jmoiron/sqlx" "github.com/jmoiron/sqlx"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -40,6 +41,8 @@ type Environment struct {
// note that, for back tests, we don't need notification. // note that, for back tests, we don't need notification.
Notifiability Notifiability
PersistenceServiceFacade *PersistenceServiceFacade
TradeService *service.TradeService TradeService *service.TradeService
TradeSync *service.SyncService TradeSync *service.SyncService
@ -47,6 +50,7 @@ type Environment struct {
startTime time.Time startTime time.Time
tradeScanTime time.Time tradeScanTime time.Time
sessions map[string]*ExchangeSession sessions map[string]*ExchangeSession
} }
func NewEnvironment() *Environment { func NewEnvironment() *Environment {
@ -221,27 +225,35 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
return nil return nil
} }
func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) { func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error {
var facade = &PersistenceServiceFacade{} var facade = &PersistenceServiceFacade{}
if conf.Redis != nil { if conf.Redis != nil {
if err := env.Set(conf.Redis) ; err != nil {
return err
}
facade.Redis = NewRedisPersistenceService(conf.Redis) facade.Redis = NewRedisPersistenceService(conf.Redis)
} }
if conf.Json != nil { if conf.Json != nil {
if _, err := os.Stat(conf.Json.Directory) ; os.IsNotExist(err) { if _, err := os.Stat(conf.Json.Directory); os.IsNotExist(err) {
if err2 := os.MkdirAll(conf.Json.Directory, 0777) ; err2 != nil { if err2 := os.MkdirAll(conf.Json.Directory, 0777); err2 != nil {
log.WithError(err2).Errorf("can not create directory: %s", conf.Json.Directory) log.WithError(err2).Errorf("can not create directory: %s", conf.Json.Directory)
return err2
} }
} }
facade.Json = &JsonPersistenceService{Directory: conf.Json.Directory} facade.Json = &JsonPersistenceService{Directory: conf.Json.Directory}
} }
environ.PersistenceServiceFacade = facade
return nil
} }
// configure notification rules // configure notification rules
// for symbol-based routes, we should register the same symbol rules for each session. // for symbol-based routes, we should register the same symbol rules for each session.
// for session-based routes, we should set the fixed callbacks for each session // for session-based routes, we should set the fixed callbacks for each session
func (environ *Environment) ConfigureNotification(conf *NotificationConfig) { func (environ *Environment) ConfigureNotification(conf *NotificationConfig) error {
// configure routing here // configure routing here
if conf.SymbolChannels != nil { if conf.SymbolChannels != nil {
environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels) environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
@ -383,6 +395,7 @@ func (environ *Environment) ConfigureNotification(conf *NotificationConfig) {
} }
} }
return nil
} }
func (environ *Environment) SetStartTime(t time.Time) *Environment { func (environ *Environment) SetStartTime(t time.Time) *Environment {

View File

@ -20,9 +20,9 @@ func isSymbolBasedStrategy(rs reflect.Value) (string, bool) {
return field.String(), true return field.String(), true
} }
func hasField(rs reflect.Value, fieldName string) bool { func hasField(rs reflect.Value, fieldName string) (field reflect.Value, ok bool) {
field := rs.FieldByName(fieldName) field = rs.FieldByName(fieldName)
return field.IsValid() return field, field.IsValid()
} }
func injectField(rs reflect.Value, fieldName string, obj interface{}, pointerOnly bool) error { func injectField(rs reflect.Value, fieldName string, obj interface{}, pointerOnly bool) error {

View File

@ -1,6 +1,66 @@
package bbgo package bbgo
import "fmt"
type PersistenceSelector struct {
// StoreID is the store you want to use.
StoreID string `json:"store" yaml:"store"`
// Type is the persistence type
Type string `json:"type" yaml:"type"`
}
// Persistence is used for strategy to inject the persistence.
type Persistence struct {
PersistenceSelector *PersistenceSelector `json:"persistence,omitempty" yaml:"persistence,omitempty"`
Facade *PersistenceServiceFacade `json:"-" yaml:"-"`
}
func (p *Persistence) backendService(t string) (service PersistenceService, err error) {
switch t {
case "json":
service = p.Facade.Json
case "redis":
service = p.Facade.Redis
default:
err = fmt.Errorf("unsupported persistent type %s", t)
}
return service, err
}
func (p *Persistence) Load(val interface{}, subIDs ...string) error {
service, err := p.backendService(p.PersistenceSelector.Type)
if err != nil {
return err
}
if p.PersistenceSelector.StoreID == "" {
return fmt.Errorf("persistence.store can not be empty")
}
store := service.NewStore(p.PersistenceSelector.StoreID, subIDs...)
return store.Load(val)
}
func (p *Persistence) Save(val interface{}, subIDs ...string) error {
service, err := p.backendService(p.PersistenceSelector.Type)
if err != nil {
return err
}
if p.PersistenceSelector.StoreID == "" {
return fmt.Errorf("persistence.store can not be empty")
}
store := service.NewStore(p.PersistenceSelector.StoreID, subIDs...)
return store.Save(val)
}
type PersistenceServiceFacade struct { type PersistenceServiceFacade struct {
Redis *RedisPersistenceService Redis *RedisPersistenceService
Json *JsonPersistenceService Json *JsonPersistenceService
} }

View File

@ -5,13 +5,15 @@ import (
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"net" "net"
"os"
"path/filepath" "path/filepath"
"strings"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
) )
type PersistenceService interface { type PersistenceService interface {
NewStore(id string) Store NewStore(id string, subIDs ...string) Store
} }
type Store interface { type Store interface {
@ -23,34 +25,52 @@ type JsonPersistenceService struct {
Directory string Directory string
} }
func (s *JsonPersistenceService) NewStore(id string) *JsonStore { func (s *JsonPersistenceService) NewStore(id string, subIDs ...string) Store {
return &JsonStore{ return &JsonStore{
ID: id, ID: id,
Filepath: filepath.Join(s.Directory, id) + ".json", Directory: filepath.Join(append([]string{s.Directory}, subIDs...)...),
} }
} }
type JsonStore struct { type JsonStore struct {
ID string ID string
Filepath string Directory string
} }
func (store JsonStore) Load(val interface{}) error { func (store JsonStore) Load(val interface{}) error {
data, err := ioutil.ReadFile(store.Filepath) if _, err := os.Stat(store.Directory); os.IsNotExist(err) {
if err2 := os.Mkdir(store.Directory, 0777); err2 != nil {
return err2
}
}
p := filepath.Join(store.Directory, store.ID) + ".json"
data, err := ioutil.ReadFile(p)
if err != nil { if err != nil {
return err return err
} }
if len(data) == 0 {
return nil
}
return json.Unmarshal(data, val) return json.Unmarshal(data, val)
} }
func (store JsonStore) Save(val interface{}) error { func (store JsonStore) Save(val interface{}) error {
if _, err := os.Stat(store.Directory); os.IsNotExist(err) {
if err2 := os.Mkdir(store.Directory, 0777); err2 != nil {
return err2
}
}
data, err := json.Marshal(val) data, err := json.Marshal(val)
if err != nil { if err != nil {
return err return err
} }
return ioutil.WriteFile(store.Filepath, data, 0666) p := filepath.Join(store.Directory, store.ID) + ".json"
return ioutil.WriteFile(p, data, 0666)
} }
type RedisPersistenceService struct { type RedisPersistenceService struct {
@ -70,7 +90,11 @@ func NewRedisPersistenceService(config *RedisPersistenceConfig) *RedisPersistenc
} }
} }
func (s *RedisPersistenceService) NewStore(id string) *RedisStore { func (s *RedisPersistenceService) NewStore(id string, subIDs ...string) Store {
if len(subIDs) > 0 {
id += ":" + strings.Join(subIDs, ":")
}
return &RedisStore{ return &RedisStore{
redis: s.redis, redis: s.redis,
ID: id, ID: id,
@ -87,9 +111,17 @@ func (store *RedisStore) Load(val interface{}) error {
cmd := store.redis.Get(context.Background(), store.ID) cmd := store.redis.Get(context.Background(), store.ID)
data, err := cmd.Result() data, err := cmd.Result()
if err != nil { if err != nil {
if err == redis.Nil {
return nil
}
return err return err
} }
if len(data) == 0 {
return nil
}
if err := json.Unmarshal([]byte(data), val); err != nil { if err := json.Unmarshal([]byte(data), val); err != nil {
return err return err
} }

View File

@ -2,6 +2,7 @@ package bbgo
import ( import (
"context" "context"
"fmt"
"reflect" "reflect"
"sync" "sync"
@ -174,44 +175,51 @@ func (trader *Trader) Run(ctx context.Context) error {
if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil { if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil {
log.WithError(err).Errorf("strategy Graceful injection failed") log.WithError(err).Errorf("strategy Graceful injection failed")
return err
} }
if err := injectField(rs, "Logger", &trader.logger, false); err != nil { if err := injectField(rs, "Logger", &trader.logger, false); err != nil {
log.WithError(err).Errorf("strategy Logger injection failed") log.WithError(err).Errorf("strategy Logger injection failed")
return err
} }
if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil { if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil {
log.WithError(err).Errorf("strategy Notifiability injection failed") log.WithError(err).Errorf("strategy Notifiability injection failed")
return err
} }
if err := injectField(rs, "OrderExecutor", orderExecutor, false); err != nil { if err := injectField(rs, "OrderExecutor", orderExecutor, false); err != nil {
log.WithError(err).Errorf("strategy OrderExecutor injection failed") log.WithError(err).Errorf("strategy OrderExecutor injection failed")
return err
} }
if symbol, ok := isSymbolBasedStrategy(rs); ok { if symbol, ok := isSymbolBasedStrategy(rs); ok {
log.Infof("found symbol based strategy from %s", rs.Type()) log.Infof("found symbol based strategy from %s", rs.Type())
if hasField(rs, "Market") { if _, ok := hasField(rs, "Market"); ok {
if market, ok := session.Market(symbol); ok { if market, ok := session.Market(symbol); ok {
// let's make the market object passed by pointer // let's make the market object passed by pointer
if err := injectField(rs, "Market", &market, false); err != nil { if err := injectField(rs, "Market", &market, false); err != nil {
log.WithError(err).Errorf("strategy %T Market injection failed", strategy) log.WithError(err).Errorf("strategy %T Market injection failed", strategy)
return err
} }
} }
} }
// StandardIndicatorSet // StandardIndicatorSet
if hasField(rs, "StandardIndicatorSet") { if _, ok := hasField(rs, "StandardIndicatorSet"); ok {
if indicatorSet, ok := session.StandardIndicatorSet(symbol); ok { if indicatorSet, ok := session.StandardIndicatorSet(symbol); ok {
if err := injectField(rs, "StandardIndicatorSet", indicatorSet, true); err != nil { if err := injectField(rs, "StandardIndicatorSet", indicatorSet, true); err != nil {
log.WithError(err).Errorf("strategy %T StandardIndicatorSet injection failed", strategy) log.WithError(err).Errorf("strategy %T StandardIndicatorSet injection failed", strategy)
return err
} }
} }
} }
if hasField(rs, "MarketDataStore") { if _, ok := hasField(rs, "MarketDataStore"); ok {
if store, ok := session.MarketDataStore(symbol); ok { if store, ok := session.MarketDataStore(symbol); ok {
if err := injectField(rs, "MarketDataStore", store, true); err != nil { if err := injectField(rs, "MarketDataStore", store, true); err != nil {
log.WithError(err).Errorf("strategy %T MarketDataStore injection failed", strategy) log.WithError(err).Errorf("strategy %T MarketDataStore injection failed", strategy)
return err
} }
} }
} }
@ -236,16 +244,33 @@ func (trader *Trader) Run(ctx context.Context) error {
// get the struct element // get the struct element
rs = rs.Elem() rs = rs.Elem()
if field, ok := hasField(rs, "Persistence"); ok {
log.Infof("found Persistence field, injecting...")
elem := field.Elem()
if elem.Kind() != reflect.Struct {
return fmt.Errorf("the field Persistence is not a struct element")
}
if err := injectField(elem, "Facade", trader.environment.PersistenceServiceFacade, true); err != nil {
log.WithError(err).Errorf("strategy Persistence injection failed")
return err
}
}
if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil { if err := injectField(rs, "Graceful", &trader.Graceful, true); err != nil {
log.WithError(err).Errorf("strategy Graceful injection failed") log.WithError(err).Errorf("strategy Graceful injection failed")
return err
} }
if err := injectField(rs, "Logger", &trader.logger, false); err != nil { if err := injectField(rs, "Logger", &trader.logger, false); err != nil {
log.WithError(err).Errorf("strategy Logger injection failed") log.WithError(err).Errorf("strategy Logger injection failed")
return err
} }
if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil { if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil {
log.WithError(err).Errorf("strategy Notifiability injection failed") log.WithError(err).Errorf("strategy Notifiability injection failed")
return err
} }
} }

View File

@ -139,11 +139,15 @@ func runConfig(basectx context.Context, userConfig *bbgo.Config) error {
environ.Notifiability = notification environ.Notifiability = notification
if userConfig.Notifications != nil { if userConfig.Notifications != nil {
environ.ConfigureNotification(userConfig.Notifications) if err := environ.ConfigureNotification(userConfig.Notifications); err != nil {
return err
}
} }
if userConfig.Persistence != nil { if userConfig.Persistence != nil {
environ.ConfigurePersistence(userConfig.Persistence) if err := environ.ConfigurePersistence(userConfig.Persistence); err != nil {
return err
}
} }
trader := bbgo.NewTrader(environ) trader := bbgo.NewTrader(environ)