From a13ad2f6ab3e3b502950085168ae52f978f88089 Mon Sep 17 00:00:00 2001 From: c9s Date: Wed, 26 Apr 2023 00:37:13 +0800 Subject: [PATCH] fix: avoid global persistenceServiceFacade concurrent write --- pkg/bbgo/bootstrap.go | 4 ++-- pkg/bbgo/environment.go | 7 ++++--- pkg/bbgo/isolation.go | 2 +- pkg/bbgo/persistence.go | 3 +-- pkg/bbgo/trader.go | 14 +++++++++----- 5 files changed, 17 insertions(+), 13 deletions(-) diff --git a/pkg/bbgo/bootstrap.go b/pkg/bbgo/bootstrap.go index 78ec8eaeb..63b2dc452 100644 --- a/pkg/bbgo/bootstrap.go +++ b/pkg/bbgo/bootstrap.go @@ -46,7 +46,7 @@ func BootstrapEnvironment(ctx context.Context, environ *Environment, userConfig } } - if err := environ.ConfigureNotificationSystem(userConfig); err != nil { + if err := environ.ConfigureNotificationSystem(ctx, userConfig); err != nil { return errors.Wrap(err, "notification configure error") } @@ -55,4 +55,4 @@ func BootstrapEnvironment(ctx context.Context, environ *Environment, userConfig func BootstrapBacktestEnvironment(ctx context.Context, environ *Environment) error { return environ.ConfigureDatabase(ctx) -} \ No newline at end of file +} diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index d35e12596..6bdad5e42 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -605,13 +605,14 @@ func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSe return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) } -func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) error { +func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error { // setup default notification config if userConfig.Notifications == nil { userConfig.Notifications = &NotificationConfig{} } - var persistence = persistenceServiceFacade.Get() + var isolation = GetIsolationFromContext(ctx) + var persistence = isolation.persistenceServiceFacade.Get() err := environ.setupInteraction(persistence) if err != nil { @@ -985,4 +986,4 @@ func (session *ExchangeSession) getSessionSymbols(defaultSymbols ...string) ([]s } return session.FindPossibleSymbols() -} \ No newline at end of file +} diff --git a/pkg/bbgo/isolation.go b/pkg/bbgo/isolation.go index df3d43ae0..51da5a310 100644 --- a/pkg/bbgo/isolation.go +++ b/pkg/bbgo/isolation.go @@ -18,7 +18,7 @@ type Isolation struct { func NewDefaultIsolation() *Isolation { return &Isolation{ gracefulShutdown: GracefulShutdown{}, - persistenceServiceFacade: persistenceServiceFacade, + persistenceServiceFacade: defaultPersistenceServiceFacade, } } diff --git a/pkg/bbgo/persistence.go b/pkg/bbgo/persistence.go index bbc7c75b4..4a0ae302f 100644 --- a/pkg/bbgo/persistence.go +++ b/pkg/bbgo/persistence.go @@ -111,7 +111,6 @@ func ConfigurePersistence(ctx context.Context, environ *Environment, conf *Persi isolation := GetIsolationFromContext(ctx) isolation.persistenceServiceFacade = facade - persistenceServiceFacade = facade environ.PersistentService = facade return nil -} \ No newline at end of file +} diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index fee49d90f..0d0c0ca8b 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -233,7 +233,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error { return errors.New("strategy object is not a struct") } - if err := trader.injectCommonServices(strategy); err != nil { + if err := trader.injectCommonServices(ctx, strategy); err != nil { return err } @@ -303,7 +303,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error { continue } - if err := trader.injectCommonServices(strategy); err != nil { + if err := trader.injectCommonServices(ctx, strategy); err != nil { return err } @@ -425,7 +425,11 @@ func (trader *Trader) Shutdown(ctx context.Context) { trader.gracefulShutdown.Shutdown(ctx) } -func (trader *Trader) injectCommonServices(s interface{}) error { +func (trader *Trader) injectCommonServices(ctx context.Context, s interface{}) error { + isolation := GetIsolationFromContext(ctx) + + ps := isolation.persistenceServiceFacade + // a special injection for persistence selector: // if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer sv := reflect.ValueOf(s).Elem() @@ -437,7 +441,7 @@ func (trader *Trader) injectCommonServices(s interface{}) error { return fmt.Errorf("field Persistence is not a struct element, %s given", field) } - if err := dynamic.InjectField(elem, "Facade", persistenceServiceFacade, true); err != nil { + if err := dynamic.InjectField(elem, "Facade", ps, true); err != nil { return err } @@ -457,6 +461,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error { trader.environment.DatabaseService, trader.environment.AccountService, trader.environment, - persistenceServiceFacade, // if the strategy use persistence facade separately + ps, // if the strategy use persistence facade separately ) }