From fc9409673f8d3d0c7278991f09d8fea53122cfd6 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 12 Nov 2020 14:50:08 +0800 Subject: [PATCH] add graceful shutdown --- config/bollgrid.yaml | 4 ++-- pkg/bbgo/graceful_callbacks.go | 18 +++++++++++++++ pkg/bbgo/trader.go | 21 ++++++++++++++++- pkg/cmd/backtest.go | 5 +++++ pkg/cmd/run.go | 17 ++++++++++---- pkg/strategy/bollgrid/strategy.go | 12 ++++++++++ pkg/strategy/flashcrash/strategy.go | 35 ++++++++++++----------------- pkg/strategy/grid/strategy.go | 11 +++++++++ 8 files changed, 95 insertions(+), 28 deletions(-) create mode 100644 pkg/bbgo/graceful_callbacks.go diff --git a/config/bollgrid.yaml b/config/bollgrid.yaml index 76effe775..cceff81ba 100644 --- a/config/bollgrid.yaml +++ b/config/bollgrid.yaml @@ -30,7 +30,7 @@ riskControls: # This is the session-based risk controller, which let you configure different risk controller by session. sessionBased: # "binance" is the session name that you want to configure the risk control - binance: + max: # orderExecutor is one of the risk control orderExecutor: # symbol-routed order executor @@ -59,7 +59,7 @@ backtest: USDT: 10000.0 exchangeStrategies: -- on: binance +- on: max bollgrid: symbol: BTCUSDT interval: 5m diff --git a/pkg/bbgo/graceful_callbacks.go b/pkg/bbgo/graceful_callbacks.go new file mode 100644 index 000000000..848b6fdcd --- /dev/null +++ b/pkg/bbgo/graceful_callbacks.go @@ -0,0 +1,18 @@ +// Code generated by "callbackgen -type Graceful"; DO NOT EDIT. + +package bbgo + +import ( + "context" + "sync" +) + +func (g *Graceful) OnShutdown(cb func(ctx context.Context, wg *sync.WaitGroup)) { + g.shutdownCallbacks = append(g.shutdownCallbacks, cb) +} + +func (g *Graceful) EmitShutdown(ctx context.Context, wg *sync.WaitGroup) { + for _, cb := range g.shutdownCallbacks { + cb(ctx, wg) + } +} diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index f0c4514c0..80208e1b2 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -3,6 +3,7 @@ package bbgo import ( "context" "reflect" + "sync" log "github.com/sirupsen/logrus" @@ -26,6 +27,19 @@ type CrossExchangeStrategy interface { Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error } + +//go:generate callbackgen -type Graceful +type Graceful struct { + shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup) +} + +func (g *Graceful) Shutdown(ctx context.Context) { + var wg sync.WaitGroup + g.EmitShutdown(ctx, &wg) + wg.Wait() +} + + type Logging interface { EnableLogging() DisableLogging() @@ -52,6 +66,8 @@ type Trader struct { exchangeStrategies map[string][]SingleExchangeStrategy logger Logger + + Graceful Graceful } func NewTrader(environ *Environment) *Trader { @@ -97,7 +113,6 @@ func (trader *Trader) SetRiskControls(riskControls *RiskControls) { } func (trader *Trader) Run(ctx context.Context) error { - // pre-subscribe the data for sessionName, strategies := range trader.exchangeStrategies { session := trader.environment.sessions[sessionName] @@ -147,6 +162,10 @@ func (trader *Trader) Run(ctx context.Context) error { // get the struct element rs = rs.Elem() + if err := injectField(rs, "Graceful", &trader.Graceful, true) ; err != nil { + log.WithError(err).Errorf("strategy Graceful injection failed") + } + if err := injectField(rs, "Logger", &trader.logger, false); err != nil { log.WithError(err).Errorf("strategy Logger injection failed") } diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 7e78008e7..d62ae14af 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -219,6 +219,11 @@ var BacktestCmd = &cobra.Command{ <-backtestExchange.Done() + log.Infof("shutting down trader...") + shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + trader.Graceful.Shutdown(shutdownCtx) + cancel() + // put the logger back to print the pnl log.SetLevel(log.InfoLevel) for _, session := range environ.Sessions() { diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index dd7c6ed3f..a25602911 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -11,6 +11,7 @@ import ( "runtime" "syscall" "text/template" + "time" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -25,8 +26,6 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -var errSlackTokenUndefined = errors.New("slack token is not defined.") - func init() { RunCmd.Flags().Bool("no-compile", false, "do not compile wrapper binary") RunCmd.Flags().String("os", runtime.GOOS, "GOOS") @@ -158,7 +157,17 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error { } } - return trader.Run(ctx) + if err := trader.Run(ctx); err != nil { + return err + } + + cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + + shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second)) + defer cancel() + + trader.Graceful.Shutdown(shutdownCtx) + return nil } var RunCmd = &cobra.Command{ @@ -196,7 +205,7 @@ var RunCmd = &cobra.Command{ if err := runConfig(ctx, userConfig); err != nil { return err } - cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM) + return nil } diff --git a/pkg/strategy/bollgrid/strategy.go b/pkg/strategy/bollgrid/strategy.go index 43b2a484c..c41952cb1 100644 --- a/pkg/strategy/bollgrid/strategy.go +++ b/pkg/strategy/bollgrid/strategy.go @@ -2,6 +2,7 @@ package bollgrid import ( "context" + "sync" "github.com/sirupsen/logrus" @@ -41,6 +42,9 @@ type Strategy struct { // This field will be injected automatically since we defined the Symbol field. *bbgo.StandardIndicatorSet + // Graceful let you define the graceful shutdown handler + *bbgo.Graceful + // Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc // This field will be injected automatically since we defined the Symbol field. types.Market @@ -315,6 +319,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.submitReverseOrder(o) }) + s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + log.Infof("canceling active orders...") + + if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { + log.WithError(err).Errorf("cancel order error") + } + }) + // avoid using time ticker since we will need back testing here session.Stream.OnKLineClosed(func(kline types.KLine) { // skip kline events that does not belong to this symbol diff --git a/pkg/strategy/flashcrash/strategy.go b/pkg/strategy/flashcrash/strategy.go index 9a5dce4d9..5a9e15815 100644 --- a/pkg/strategy/flashcrash/strategy.go +++ b/pkg/strategy/flashcrash/strategy.go @@ -4,6 +4,7 @@ package flashcrash import ( "context" + "sync" log "github.com/sirupsen/logrus" @@ -44,6 +45,9 @@ type Strategy struct { // StandardIndicatorSet contains the standard indicators of a market (symbol) // This field will be injected automatically since we defined the Symbol field. *bbgo.StandardIndicatorSet + + // Graceful shutdown function + *bbgo.Graceful // -------------------------- // ewma is the exponential weighted moving average indicator @@ -94,26 +98,6 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb s.activeOrders.Add(orders...) } -func (s *Strategy) orderUpdateHandler(order types.Order) { - if order.Symbol != s.Symbol { - return - } - - log.Infof("received order update: %+v", order) - - switch order.Status { - case types.OrderStatusFilled: - s.activeOrders.Remove(order) - - case types.OrderStatusCanceled, types.OrderStatusRejected: - log.Infof("order status %s, removing %d from the active order pool...", order.Status, order.OrderID) - s.activeOrders.Remove(order) - - case types.OrderStatusPartiallyFilled: - s.activeOrders.Add(order) - } -} - func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: string(s.Interval)}) } @@ -121,12 +105,21 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { // we don't persist orders so that we can not clear the previous orders for now. just need time to support this. s.activeOrders = bbgo.NewLocalActiveOrderBook() + s.activeOrders.BindStream(session.Stream) + + s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + log.Infof("canceling active orders...") + + if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { + log.WithError(err).Errorf("cancel order error") + } + }) + s.ewma = s.StandardIndicatorSet.GetEWMA(types.IntervalWindow{ Interval: s.Interval, Window: 25, }) - session.Stream.OnOrderUpdate(s.orderUpdateHandler) session.Stream.OnKLineClosed(func(kline types.KLine) { s.updateOrders(orderExecutor, session) }) diff --git a/pkg/strategy/grid/strategy.go b/pkg/strategy/grid/strategy.go index 7aaa283ba..96106eda5 100644 --- a/pkg/strategy/grid/strategy.go +++ b/pkg/strategy/grid/strategy.go @@ -2,6 +2,7 @@ package grid import ( "context" + "sync" "sync/atomic" "github.com/sirupsen/logrus" @@ -27,6 +28,8 @@ type Strategy struct { // This field will be injected automatically since it's a single exchange strategy. *bbgo.Notifiability + *bbgo.Graceful + // OrderExecutor is an interface for submitting order. // This field will be injected automatically since it's a single exchange strategy. bbgo.OrderExecutor @@ -193,6 +196,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // we don't persist orders so that we can not clear the previous orders for now. just need time to support this. s.activeOrders = bbgo.NewLocalActiveOrderBook() + s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { + log.Infof("canceling active orders...") + + if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { + log.WithError(err).Errorf("cancel order error") + } + }) + session.Stream.OnOrderUpdate(s.orderUpdateHandler) session.Stream.OnTradeUpdate(s.tradeUpdateHandler) session.Stream.OnConnect(func() {