Merge pull request #1116 from c9s/fix/save-state

FIX: fix gracefully shutdown SaveState call for Isolation
This commit is contained in:
Yo-An Lin 2023-03-15 23:02:16 +08:00 committed by GitHub
commit c3ee89cac5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 20 additions and 18 deletions

View File

@ -88,6 +88,8 @@ type Trader struct {
crossExchangeStrategies []CrossExchangeStrategy
exchangeStrategies map[string][]SingleExchangeStrategy
// gracefulShutdown is used for registering strategy's Shutdown calls
// when strategy implements Shutdown(ctx), the func ref will be stored in the callback.
gracefulShutdown GracefulShutdown
logger Logger
@ -400,18 +402,17 @@ func (trader *Trader) IterateStrategies(f func(st StrategyID) error) error {
return nil
}
func (trader *Trader) SaveState() error {
// NOTICE: the ctx here is the trading context, which could already be canceled.
func (trader *Trader) SaveState(ctx context.Context) error {
if trader.environment.BacktestService != nil {
return nil
}
if persistenceServiceFacade == nil {
return nil
}
isolation := GetIsolationFromContext(ctx)
ps := persistenceServiceFacade.Get()
ps := isolation.persistenceServiceFacade.Get()
log.Infof("saving strategies states...")
log.Debugf("saving strategy persistence states...")
return trader.IterateStrategies(func(strategy StrategyID) error {
id := dynamic.CallID(strategy)
if len(id) == 0 {

View File

@ -124,7 +124,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
_ = grpcBind
_ = enableGrpc
ctx, cancelTrading := context.WithCancel(basectx)
tradingCtx, cancelTrading := context.WithCancel(basectx)
defer cancelTrading()
environ := bbgo.NewEnvironment()
@ -135,21 +135,21 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
}
if lightweight {
if err := bbgo.BootstrapEnvironmentLightweight(ctx, environ, userConfig); err != nil {
if err := bbgo.BootstrapEnvironmentLightweight(tradingCtx, environ, userConfig); err != nil {
return err
}
} else {
if err := bbgo.BootstrapEnvironment(ctx, environ, userConfig); err != nil {
if err := bbgo.BootstrapEnvironment(tradingCtx, environ, userConfig); err != nil {
return err
}
}
if err := environ.Init(ctx); err != nil {
if err := environ.Init(tradingCtx); err != nil {
return err
}
if !noSync {
if err := environ.Sync(ctx, userConfig); err != nil {
if err := environ.Sync(tradingCtx, userConfig); err != nil {
return err
}
@ -167,7 +167,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
return err
}
if err := trader.Run(ctx); err != nil {
if err := trader.Run(tradingCtx); err != nil {
return err
}
@ -179,7 +179,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
Trader: trader,
}
if err := s.Run(ctx, webServerBind); err != nil {
if err := s.Run(tradingCtx, webServerBind); err != nil {
log.WithError(err).Errorf("http server bind error")
}
}()
@ -198,18 +198,19 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
}()
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cmdutil.WaitForSignal(tradingCtx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
gracefulShutdownPeriod := 30 * time.Second
shtCtx, cancelShutdown := context.WithTimeout(bbgo.NewTodoContextWithExistingIsolation(ctx), gracefulShutdownPeriod)
shtCtx, cancelShutdown := context.WithTimeout(bbgo.NewTodoContextWithExistingIsolation(tradingCtx), gracefulShutdownPeriod)
bbgo.Shutdown(shtCtx)
cancelShutdown()
if err := trader.SaveState(); err != nil {
log.WithError(err).Errorf("can not save strategy states")
if err := trader.SaveState(shtCtx); err != nil {
log.WithError(err).Errorf("can not save strategy persistence states")
}
cancelShutdown()
for _, session := range environ.Sessions() {
if err := session.MarketDataStream.Close(); err != nil {
log.WithError(err).Errorf("[%s] market data stream close error", session.Name)