Merge pull request #1159 from c9s/fix/concurrent-write

FIX: avoid global persistenceServiceFacade concurrent write
This commit is contained in:
Yo-An Lin 2023-04-26 00:51:07 +08:00 committed by GitHub
commit d0ea7d8b47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 18 additions and 16 deletions

View File

@ -46,7 +46,7 @@ func BootstrapEnvironment(ctx context.Context, environ *Environment, userConfig
} }
} }
if err := environ.ConfigureNotificationSystem(userConfig); err != nil { if err := environ.ConfigureNotificationSystem(ctx, userConfig); err != nil {
return errors.Wrap(err, "notification configure error") return errors.Wrap(err, "notification configure error")
} }

View File

@ -605,13 +605,14 @@ func (environ *Environment) syncSession(ctx context.Context, session *ExchangeSe
return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...) return environ.SyncService.SyncSessionSymbols(ctx, session.Exchange, environ.syncStartTime, symbols...)
} }
func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) error { func (environ *Environment) ConfigureNotificationSystem(ctx context.Context, userConfig *Config) error {
// setup default notification config // setup default notification config
if userConfig.Notifications == nil { if userConfig.Notifications == nil {
userConfig.Notifications = &NotificationConfig{} userConfig.Notifications = &NotificationConfig{}
} }
var persistence = persistenceServiceFacade.Get() var isolation = GetIsolationFromContext(ctx)
var persistence = isolation.persistenceServiceFacade.Get()
err := environ.setupInteraction(persistence) err := environ.setupInteraction(persistence)
if err != nil { if err != nil {

View File

@ -18,7 +18,7 @@ type Isolation struct {
func NewDefaultIsolation() *Isolation { func NewDefaultIsolation() *Isolation {
return &Isolation{ return &Isolation{
gracefulShutdown: GracefulShutdown{}, gracefulShutdown: GracefulShutdown{},
persistenceServiceFacade: persistenceServiceFacade, persistenceServiceFacade: defaultPersistenceServiceFacade,
} }
} }

View File

@ -20,5 +20,5 @@ func TestNewDefaultIsolation(t *testing.T) {
assert.NotNil(t, isolation) assert.NotNil(t, isolation)
assert.NotNil(t, isolation.persistenceServiceFacade) assert.NotNil(t, isolation.persistenceServiceFacade)
assert.NotNil(t, isolation.gracefulShutdown) assert.NotNil(t, isolation.gracefulShutdown)
assert.Equal(t, persistenceServiceFacade, isolation.persistenceServiceFacade) assert.Equal(t, defaultPersistenceServiceFacade, isolation.persistenceServiceFacade)
} }

View File

@ -17,8 +17,6 @@ var defaultPersistenceServiceFacade = &service.PersistenceServiceFacade{
Memory: service.NewMemoryService(), Memory: service.NewMemoryService(),
} }
var persistenceServiceFacade = defaultPersistenceServiceFacade
// Sync syncs the object properties into the persistence layer // Sync syncs the object properties into the persistence layer
func Sync(ctx context.Context, obj interface{}) { func Sync(ctx context.Context, obj interface{}) {
id := dynamic.CallID(obj) id := dynamic.CallID(obj)
@ -111,7 +109,6 @@ func ConfigurePersistence(ctx context.Context, environ *Environment, conf *Persi
isolation := GetIsolationFromContext(ctx) isolation := GetIsolationFromContext(ctx)
isolation.persistenceServiceFacade = facade isolation.persistenceServiceFacade = facade
persistenceServiceFacade = facade
environ.PersistentService = facade environ.PersistentService = facade
return nil return nil
} }

View File

@ -233,7 +233,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
return errors.New("strategy object is not a struct") return errors.New("strategy object is not a struct")
} }
if err := trader.injectCommonServices(strategy); err != nil { if err := trader.injectCommonServices(ctx, strategy); err != nil {
return err return err
} }
@ -303,7 +303,7 @@ func (trader *Trader) injectFieldsAndSubscribe(ctx context.Context) error {
continue continue
} }
if err := trader.injectCommonServices(strategy); err != nil { if err := trader.injectCommonServices(ctx, strategy); err != nil {
return err return err
} }
@ -425,7 +425,11 @@ func (trader *Trader) Shutdown(ctx context.Context) {
trader.gracefulShutdown.Shutdown(ctx) trader.gracefulShutdown.Shutdown(ctx)
} }
func (trader *Trader) injectCommonServices(s interface{}) error { func (trader *Trader) injectCommonServices(ctx context.Context, s interface{}) error {
isolation := GetIsolationFromContext(ctx)
ps := isolation.persistenceServiceFacade
// a special injection for persistence selector: // a special injection for persistence selector:
// if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer // if user defined the selector, the facade pointer will be nil, hence we need to update the persistence facade pointer
sv := reflect.ValueOf(s).Elem() sv := reflect.ValueOf(s).Elem()
@ -437,7 +441,7 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
return fmt.Errorf("field Persistence is not a struct element, %s given", field) return fmt.Errorf("field Persistence is not a struct element, %s given", field)
} }
if err := dynamic.InjectField(elem, "Facade", persistenceServiceFacade, true); err != nil { if err := dynamic.InjectField(elem, "Facade", ps, true); err != nil {
return err return err
} }
@ -457,6 +461,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
trader.environment.DatabaseService, trader.environment.DatabaseService,
trader.environment.AccountService, trader.environment.AccountService,
trader.environment, trader.environment,
persistenceServiceFacade, // if the strategy use persistence facade separately ps, // if the strategy use persistence facade separately
) )
} }