diff --git a/pkg/bbgo/graceful_shutdown.go b/pkg/bbgo/graceful_shutdown.go index b35482ce2..c3248b0c8 100644 --- a/pkg/bbgo/graceful_shutdown.go +++ b/pkg/bbgo/graceful_shutdown.go @@ -3,18 +3,40 @@ package bbgo import ( "context" "sync" + "time" + + "github.com/sirupsen/logrus" ) +var graceful = &Graceful{} + //go:generate callbackgen -type Graceful type Graceful struct { shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup) } +// Shutdown is a blocking call to emit all shutdown callbacks at the same time. func (g *Graceful) Shutdown(ctx context.Context) { var wg sync.WaitGroup wg.Add(len(g.shutdownCallbacks)) - go g.EmitShutdown(ctx, &wg) + // for each shutdown callback, we give them 10 second + shtCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + + go g.EmitShutdown(shtCtx, &wg) wg.Wait() + cancel() +} + +func OnShutdown(f func(ctx context.Context, wg *sync.WaitGroup)) { + graceful.OnShutdown(f) +} + +func Shutdown() { + logrus.Infof("shutting down...") + + ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second) + graceful.Shutdown(ctx) + cancel() } diff --git a/pkg/bbgo/persistence_test.go b/pkg/bbgo/persistence_test.go index 1c14eb2e9..0eea57ed5 100644 --- a/pkg/bbgo/persistence_test.go +++ b/pkg/bbgo/persistence_test.go @@ -24,7 +24,6 @@ func (s *TestStructWithoutInstanceID) ID() string { type TestStruct struct { *Environment - *Graceful Position *types.Position `persistence:"position"` Integer int64 `persistence:"integer"` diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index f8dbc7203..86371a53b 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -73,8 +73,6 @@ type Trader struct { exchangeStrategies map[string][]SingleExchangeStrategy logger Logger - - Graceful Graceful } func NewTrader(environ *Environment) *Trader { @@ -416,7 +414,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error { } return parseStructAndInject(s, - &trader.Graceful, &trader.logger, Notification, trader.environment.TradeService, diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 363d2d282..1c1ac11a4 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -443,9 +443,7 @@ var BacktestCmd = &cobra.Command{ cmdutil.WaitForSignal(runCtx, syscall.SIGINT, syscall.SIGTERM) log.Infof("shutting down trader...") - shutdownCtx, cancelShutdown := context.WithDeadline(runCtx, time.Now().Add(10*time.Second)) - trader.Graceful.Shutdown(shutdownCtx) - cancelShutdown() + bbgo.Shutdown() // put the logger back to print the pnl log.SetLevel(log.InfoLevel) diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index ca634c0a5..c6f1841d6 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -8,7 +8,6 @@ import ( "path/filepath" "runtime/pprof" "syscall" - "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -78,12 +77,7 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) cancelTrading() - // graceful period = 15 second - shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(15*time.Second)) - - log.Infof("shutting down...") - trader.Graceful.Shutdown(shutdownCtx) - cancelShutdown() + bbgo.Shutdown() return nil } @@ -216,10 +210,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) cancelTrading() - log.Infof("shutting down...") - shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) - trader.Graceful.Shutdown(shutdownCtx) - cancelShutdown() + bbgo.Shutdown() if err := trader.SaveState(); err != nil { log.WithError(err).Errorf("can not save strategy states") diff --git a/pkg/dynamic/merge_test.go b/pkg/dynamic/merge_test.go index 2e8929ff0..bc65e49c8 100644 --- a/pkg/dynamic/merge_test.go +++ b/pkg/dynamic/merge_test.go @@ -5,7 +5,6 @@ import ( "github.com/stretchr/testify/assert" - "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/types" ) @@ -21,14 +20,14 @@ type TestStrategy struct { func Test_reflectMergeStructFields(t *testing.T) { t.Run("zero value", func(t *testing.T) { a := &TestStrategy{Symbol: "BTCUSDT"} - b := &bbgo.CumulatedVolumeTakeProfit{Symbol: ""} + b := &struct{ Symbol string }{Symbol: ""} MergeStructValues(b, a) assert.Equal(t, "BTCUSDT", b.Symbol) }) t.Run("non-zero value", func(t *testing.T) { a := &TestStrategy{Symbol: "BTCUSDT"} - b := &bbgo.CumulatedVolumeTakeProfit{Symbol: "ETHUSDT"} + b := &struct{ Symbol string }{Symbol: "ETHUSDT"} MergeStructValues(b, a) assert.Equal(t, "ETHUSDT", b.Symbol, "should be the original value") }) @@ -40,7 +39,10 @@ func Test_reflectMergeStructFields(t *testing.T) { }{ IntervalWindow: iw, } - b := &bbgo.CumulatedVolumeTakeProfit{} + b := &struct { + Symbol string + types.IntervalWindow + }{} MergeStructValues(b, a) assert.Equal(t, iw, b.IntervalWindow) }) @@ -52,7 +54,9 @@ func Test_reflectMergeStructFields(t *testing.T) { }{ IntervalWindow: iw, } - b := &bbgo.CumulatedVolumeTakeProfit{ + b := &struct { + types.IntervalWindow + }{ IntervalWindow: types.IntervalWindow{Interval: types.Interval5m, Window: 9}, } MergeStructValues(b, a) diff --git a/pkg/strategy/bollgrid/strategy.go b/pkg/strategy/bollgrid/strategy.go index 8d689f7d4..676c8fea0 100644 --- a/pkg/strategy/bollgrid/strategy.go +++ b/pkg/strategy/bollgrid/strategy.go @@ -41,9 +41,6 @@ type Strategy struct { // This field will be injected automatically since we defined the Symbol field. *bbgo.StandardIndicatorSet - // Graceful let you define the graceful shutdown handler - *bbgo.Graceful - // Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc // This field will be injected automatically since we defined the Symbol field. types.Market @@ -350,7 +347,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.profitOrders.BindStream(session.UserDataStream) // setup graceful shutting down handler - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { // call Done to notify the main process. defer wg.Done() log.Infof("canceling active orders...") diff --git a/pkg/strategy/bollmaker/strategy.go b/pkg/strategy/bollmaker/strategy.go index 93b135b9d..2bcf6e2e9 100644 --- a/pkg/strategy/bollmaker/strategy.go +++ b/pkg/strategy/bollmaker/strategy.go @@ -49,7 +49,6 @@ type BollingerSetting struct { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Environment *bbgo.Environment @@ -616,7 +615,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // s.book = types.NewStreamBook(s.Symbol) // s.book.BindStreamForBackground(session.MarketDataStream) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() _ = s.orderExecutor.GracefulCancel(ctx) diff --git a/pkg/strategy/dca/strategy.go b/pkg/strategy/dca/strategy.go index f0e86aa53..38e6c9d14 100644 --- a/pkg/strategy/dca/strategy.go +++ b/pkg/strategy/dca/strategy.go @@ -47,7 +47,6 @@ func (b BudgetPeriod) Duration() time.Duration { // Strategy is the Dollar-Cost-Average strategy type Strategy struct { - *bbgo.Graceful Environment *bbgo.Environment Symbol string `json:"symbol"` diff --git a/pkg/strategy/emastop/strategy.go b/pkg/strategy/emastop/strategy.go index 89c837b37..7c9c4a190 100644 --- a/pkg/strategy/emastop/strategy.go +++ b/pkg/strategy/emastop/strategy.go @@ -25,7 +25,6 @@ func init() { } type Strategy struct { - *bbgo.Graceful SourceExchangeName string `json:"sourceExchange"` @@ -217,7 +216,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.place(ctx, orderExecutor, session, indicator, closePrice) }) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() log.Infof("canceling trailingstop order...") s.clear(ctx, orderExecutor) @@ -261,7 +260,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se s.place(ctx, &orderExecutor, session, indicator, closePrice) }) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() log.Infof("canceling trailingstop order...") s.clear(ctx, &orderExecutor) diff --git a/pkg/strategy/ewoDgtrd/strategy.go b/pkg/strategy/ewoDgtrd/strategy.go index bce4cf07a..4a926c901 100644 --- a/pkg/strategy/ewoDgtrd/strategy.go +++ b/pkg/strategy/ewoDgtrd/strategy.go @@ -51,7 +51,6 @@ type Strategy struct { KLineEndTime types.Time *bbgo.Environment - *bbgo.Graceful bbgo.StrategyController activeMakerOrders *bbgo.ActiveOrderBook @@ -1221,7 +1220,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se } } }) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() log.Infof("canceling active orders...") s.CancelAll(ctx) diff --git a/pkg/strategy/flashcrash/strategy.go b/pkg/strategy/flashcrash/strategy.go index b15fcbfb7..4b5c80577 100644 --- a/pkg/strategy/flashcrash/strategy.go +++ b/pkg/strategy/flashcrash/strategy.go @@ -49,10 +49,6 @@ type Strategy struct { // This field will be injected automatically since we defined the Symbol field. *bbgo.StandardIndicatorSet - // Graceful shutdown function - *bbgo.Graceful - // -------------------------- - // ewma is the exponential weighted moving average indicator ewma *indicator.EWMA } @@ -114,7 +110,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol) s.activeOrders.BindStream(session.UserDataStream) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() log.Infof("canceling active orders...") diff --git a/pkg/strategy/fmaker/strategy.go b/pkg/strategy/fmaker/strategy.go index d67367569..c6f7068be 100644 --- a/pkg/strategy/fmaker/strategy.go +++ b/pkg/strategy/fmaker/strategy.go @@ -31,7 +31,6 @@ type IntervalWindowSetting struct { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Environment *bbgo.Environment diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 75e781783..7fc899576 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -45,8 +45,6 @@ type State struct { } type Strategy struct { - *bbgo.Graceful `json:"-" yaml:"-"` - *bbgo.Persistence // OrderExecutor is an interface for submitting order. @@ -621,7 +619,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se }) s.tradeCollector.BindStream(session.UserDataStream) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() if err := s.SaveState(); err != nil { diff --git a/pkg/strategy/pivotshort/strategy.go b/pkg/strategy/pivotshort/strategy.go index 1ce58c7a2..4d8f87338 100644 --- a/pkg/strategy/pivotshort/strategy.go +++ b/pkg/strategy/pivotshort/strategy.go @@ -70,8 +70,6 @@ type Entry struct { } type Strategy struct { - *bbgo.Graceful - Environment *bbgo.Environment Symbol string `json:"symbol"` Market types.Market @@ -124,6 +122,8 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { for i := range s.ExitMethods { m := s.ExitMethods[i] + + // we need to pass some information from the strategy configuration to the exit methods, like symbol, interval and window dynamic.MergeStructValues(&m, s) m.Subscribe(session) } @@ -369,7 +369,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se }) } - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { _, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String()) wg.Done() }) diff --git a/pkg/strategy/rsmaker/strategy.go b/pkg/strategy/rsmaker/strategy.go index 0d0c5a8fe..0704f8ae8 100644 --- a/pkg/strategy/rsmaker/strategy.go +++ b/pkg/strategy/rsmaker/strategy.go @@ -29,9 +29,6 @@ func init() { } type Strategy struct { - *bbgo.Graceful - *bbgo.Notifiability - Environment *bbgo.Environment StandardIndicatorSet *bbgo.StandardIndicatorSet Market types.Market diff --git a/pkg/strategy/supertrend/strategy.go b/pkg/strategy/supertrend/strategy.go index e91313be3..2f456a7d2 100644 --- a/pkg/strategy/supertrend/strategy.go +++ b/pkg/strategy/supertrend/strategy.go @@ -30,7 +30,6 @@ func init() { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Environment *bbgo.Environment @@ -391,7 +390,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se }) // Graceful shutdown - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() close(s.stopC) diff --git a/pkg/strategy/support/strategy.go b/pkg/strategy/support/strategy.go index 4a02e443a..d9a37b135 100644 --- a/pkg/strategy/support/strategy.go +++ b/pkg/strategy/support/strategy.go @@ -134,7 +134,6 @@ func (control *TrailingStopControl) GenerateStopOrder(quantity fixedpoint.Value) type Strategy struct { *bbgo.Persistence `json:"-"` *bbgo.Environment `json:"-"` - *bbgo.Graceful `json:"-"` session *bbgo.ExchangeSession @@ -582,7 +581,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se } }) - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() // Cancel trailing stop order diff --git a/pkg/strategy/wall/strategy.go b/pkg/strategy/wall/strategy.go index 824cc28cf..ffc9cef4a 100644 --- a/pkg/strategy/wall/strategy.go +++ b/pkg/strategy/wall/strategy.go @@ -30,7 +30,6 @@ func init() { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Environment *bbgo.Environment @@ -377,7 +376,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se } }() - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() close(s.stopC) diff --git a/pkg/strategy/xbalance/strategy.go b/pkg/strategy/xbalance/strategy.go index 60f0ac6ae..ea4aa1e04 100644 --- a/pkg/strategy/xbalance/strategy.go +++ b/pkg/strategy/xbalance/strategy.go @@ -135,8 +135,6 @@ func (a *Address) UnmarshalJSON(body []byte) error { } type Strategy struct { - *bbgo.Graceful - Interval types.Duration `json:"interval"` Addresses map[string]Address `json:"addresses"` @@ -342,7 +340,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se s.State = s.newDefaultState() } - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() }) diff --git a/pkg/strategy/xgap/strategy.go b/pkg/strategy/xgap/strategy.go index 102ea2a42..7406e5551 100644 --- a/pkg/strategy/xgap/strategy.go +++ b/pkg/strategy/xgap/strategy.go @@ -57,7 +57,6 @@ func (s *State) Reset() { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Symbol string `json:"symbol"` @@ -193,7 +192,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se } } - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() close(s.stopC) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 289b94208..aca499413 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -33,7 +33,6 @@ func init() { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence Environment *bbgo.Environment @@ -879,7 +878,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order } }() - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() close(s.stopC) diff --git a/pkg/strategy/xnav/strategy.go b/pkg/strategy/xnav/strategy.go index f11582efb..ea9526f47 100644 --- a/pkg/strategy/xnav/strategy.go +++ b/pkg/strategy/xnav/strategy.go @@ -58,7 +58,6 @@ func (s *State) Reset() { } type Strategy struct { - *bbgo.Graceful *bbgo.Persistence *bbgo.Environment @@ -180,7 +179,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se return err } - s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done() s.SaveState()