fix: avoid global persistenceServiceFacade concurrent write

This commit is contained in:
c9s 2023-04-26 00:37:13 +08:00
parent 152149fcee
commit a13ad2f6ab
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
5 changed files with 17 additions and 13 deletions

View File

@ -46,7 +46,7 @@ func BootstrapEnvironment(ctx context.Context, environ *Environment, userConfig
} }
} }
if err := environ.ConfigureNotificationSystem(userConfig); err != nil { if err := environ.ConfigureNotificationSystem(ctx, userConfig); err != nil {
return errors.Wrap(err, "notification configure error") return errors.Wrap(err, "notification configure error")
} }
@ -55,4 +55,4 @@ func BootstrapEnvironment(ctx context.Context, environ *Environment, userConfig
func BootstrapBacktestEnvironment(ctx context.Context, environ *Environment) error { func BootstrapBacktestEnvironment(ctx context.Context, environ *Environment) error {
return environ.ConfigureDatabase(ctx) return environ.ConfigureDatabase(ctx)
} }

View File

@ -605,13 +605,14 @@ func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSe
return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...)
} }
func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) error { func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error {
// setup default notification config // setup default notification config
if userConfig.Notifications == nil { if userConfig.Notifications == nil {
userConfig.Notifications = &NotificationConfig{} userConfig.Notifications = &NotificationConfig{}
} }
var persistence = persistenceServiceFacade.Get() var isolation = GetIsolationFromContext(ctx)
var persistence = isolation.persistenceServiceFacade.Get()
err := environ.setupInteraction(persistence) err := environ.setupInteraction(persistence)
if err != nil { if err != nil {
@ -985,4 +986,4 @@ func (session *ExchangeSession) getSessionSymbols(defaultSymbols ...string) ([]s
} }
return session.FindPossibleSymbols() return session.FindPossibleSymbols()
} }

View File

@ -18,7 +18,7 @@ type Isolation struct {
func NewDefaultIsolation() *Isolation { func NewDefaultIsolation() *Isolation {
return &Isolation{ return &Isolation{
gracefulShutdown: GracefulShutdown{}, gracefulShutdown: GracefulShutdown{},
persistenceServiceFacade: persistenceServiceFacade, persistenceServiceFacade: defaultPersistenceServiceFacade,
} }
} }

View File

@ -111,7 +111,6 @@ func ConfigurePersistence(ctx context.Context, environ *Environment, conf *Persi
isolation := GetIsolationFromContext(ctx) isolation := GetIsolationFromContext(ctx)
isolation.persistenceServiceFacade = facade isolation.persistenceServiceFacade = facade
persistenceServiceFacade = facade
environ.PersistentService = facade environ.PersistentService = facade
return nil return nil
} }

View File

@ -233,7 +233,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
return errors.New("strategy object is not a struct") return errors.New("strategy object is not a struct")
} }
if err := trader.injectCommonServices(strategy); err != nil { if err := trader.injectCommonServices(ctx, strategy); err != nil {
return err return err
} }
@ -303,7 +303,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
continue continue
} }
if err := trader.injectCommonServices(strategy); err != nil { if err := trader.injectCommonServices(ctx, strategy); err != nil {
return err return err
} }
@ -425,7 +425,11 @@ func (trader *Trader) Shutdown(ctx context.Context) {
trader.gracefulShutdown.Shutdown(ctx) trader.gracefulShutdown.Shutdown(ctx)
} }
func (trader *Trader) injectCommonServices(s interface{}) error { func (trader *Trader) injectCommonServices(ctx context.Context, s interface{}) error {
isolation := GetIsolationFromContext(ctx)
ps := isolation.persistenceServiceFacade
// a special injection for persistence selector: // a special injection for persistence selector:
// if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer // if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer
sv := reflect.ValueOf(s).Elem() sv := reflect.ValueOf(s).Elem()
@ -437,7 +441,7 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
return fmt.Errorf("field Persistence is not a struct element, %s given", field) return fmt.Errorf("field Persistence is not a struct element, %s given", field)
} }
if err := dynamic.InjectField(elem, "Facade", persistenceServiceFacade, true); err != nil { if err := dynamic.InjectField(elem, "Facade", ps, true); err != nil {
return err return err
} }
@ -457,6 +461,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
trader.environment.DatabaseService, trader.environment.DatabaseService,
trader.environment.AccountService, trader.environment.AccountService,
trader.environment, trader.environment,
persistenceServiceFacade, // if the strategy use persistence facade separately ps, // if the strategy use persistence facade separately
) )
} }