add graceful shutdown

This commit is contained in:
c9s 2020-11-12 14:50:08 +08:00
parent de2a8d6adc
commit fc9409673f
8 changed files with 95 additions and 28 deletions

View File

@ -30,7 +30,7 @@ riskControls:
# This is the session-based risk controller, which let you configure different risk controller by session. # This is the session-based risk controller, which let you configure different risk controller by session.
sessionBased: sessionBased:
# "binance" is the session name that you want to configure the risk control # "binance" is the session name that you want to configure the risk control
binance: max:
# orderExecutor is one of the risk control # orderExecutor is one of the risk control
orderExecutor: orderExecutor:
# symbol-routed order executor # symbol-routed order executor
@ -59,7 +59,7 @@ backtest:
USDT: 10000.0 USDT: 10000.0
exchangeStrategies: exchangeStrategies:
- on: binance - on: max
bollgrid: bollgrid:
symbol: BTCUSDT symbol: BTCUSDT
interval: 5m interval: 5m

View File

@ -0,0 +1,18 @@
// Code generated by "callbackgen -type Graceful"; DO NOT EDIT.
package bbgo
import (
"context"
"sync"
)
func (g *Graceful) OnShutdown(cb func(ctx context.Context, wg *sync.WaitGroup)) {
g.shutdownCallbacks = append(g.shutdownCallbacks, cb)
}
func (g *Graceful) EmitShutdown(ctx context.Context, wg *sync.WaitGroup) {
for _, cb := range g.shutdownCallbacks {
cb(ctx, wg)
}
}

View File

@ -3,6 +3,7 @@ package bbgo
import ( import (
"context" "context"
"reflect" "reflect"
"sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -26,6 +27,19 @@ type CrossExchangeStrategy interface {
Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error Run(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error
} }
//go:generate callbackgen -type Graceful
type Graceful struct {
shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup)
}
func (g *Graceful) Shutdown(ctx context.Context) {
var wg sync.WaitGroup
g.EmitShutdown(ctx, &wg)
wg.Wait()
}
type Logging interface { type Logging interface {
EnableLogging() EnableLogging()
DisableLogging() DisableLogging()
@ -52,6 +66,8 @@ type Trader struct {
exchangeStrategies map[string][]SingleExchangeStrategy exchangeStrategies map[string][]SingleExchangeStrategy
logger Logger logger Logger
Graceful Graceful
} }
func NewTrader(environ *Environment) *Trader { func NewTrader(environ *Environment) *Trader {
@ -97,7 +113,6 @@ func (trader *Trader) SetRiskControls(riskControls *RiskControls) {
} }
func (trader *Trader) Run(ctx context.Context) error { func (trader *Trader) Run(ctx context.Context) error {
// pre-subscribe the data // pre-subscribe the data
for sessionName, strategies := range trader.exchangeStrategies { for sessionName, strategies := range trader.exchangeStrategies {
session := trader.environment.sessions[sessionName] session := trader.environment.sessions[sessionName]
@ -147,6 +162,10 @@ func (trader *Trader) Run(ctx context.Context) error {
// get the struct element // get the struct element
rs = rs.Elem() rs = rs.Elem()
if err := injectField(rs, "Graceful", &trader.Graceful, true) ; err != nil {
log.WithError(err).Errorf("strategy Graceful injection failed")
}
if err := injectField(rs, "Logger", &trader.logger, false); err != nil { if err := injectField(rs, "Logger", &trader.logger, false); err != nil {
log.WithError(err).Errorf("strategy Logger injection failed") log.WithError(err).Errorf("strategy Logger injection failed")
} }

View File

@ -219,6 +219,11 @@ var BacktestCmd = &cobra.Command{
<-backtestExchange.Done() <-backtestExchange.Done()
log.Infof("shutting down trader...")
shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Second))
trader.Graceful.Shutdown(shutdownCtx)
cancel()
// put the logger back to print the pnl // put the logger back to print the pnl
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)
for _, session := range environ.Sessions() { for _, session := range environ.Sessions() {

View File

@ -11,6 +11,7 @@ import (
"runtime" "runtime"
"syscall" "syscall"
"text/template" "text/template"
"time"
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -25,8 +26,6 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
var errSlackTokenUndefined = errors.New("slack token is not defined.")
func init() { func init() {
RunCmd.Flags().Bool("no-compile", false, "do not compile wrapper binary") RunCmd.Flags().Bool("no-compile", false, "do not compile wrapper binary")
RunCmd.Flags().String("os", runtime.GOOS, "GOOS") RunCmd.Flags().String("os", runtime.GOOS, "GOOS")
@ -158,7 +157,17 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
} }
} }
return trader.Run(ctx) if err := trader.Run(ctx); err != nil {
return err
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
defer cancel()
trader.Graceful.Shutdown(shutdownCtx)
return nil
} }
var RunCmd = &cobra.Command{ var RunCmd = &cobra.Command{
@ -196,7 +205,7 @@ var RunCmd = &cobra.Command{
if err := runConfig(ctx, userConfig); err != nil { if err := runConfig(ctx, userConfig); err != nil {
return err return err
} }
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
return nil return nil
} }

View File

@ -2,6 +2,7 @@ package bollgrid
import ( import (
"context" "context"
"sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -41,6 +42,9 @@ type Strategy struct {
// This field will be injected automatically since we defined the Symbol field. // This field will be injected automatically since we defined the Symbol field.
*bbgo.StandardIndicatorSet *bbgo.StandardIndicatorSet
// Graceful let you define the graceful shutdown handler
*bbgo.Graceful
// Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc // Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc
// This field will be injected automatically since we defined the Symbol field. // This field will be injected automatically since we defined the Symbol field.
types.Market types.Market
@ -315,6 +319,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.submitReverseOrder(o) s.submitReverseOrder(o)
}) })
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error")
}
})
// avoid using time ticker since we will need back testing here // avoid using time ticker since we will need back testing here
session.Stream.OnKLineClosed(func(kline types.KLine) { session.Stream.OnKLineClosed(func(kline types.KLine) {
// skip kline events that does not belong to this symbol // skip kline events that does not belong to this symbol

View File

@ -4,6 +4,7 @@ package flashcrash
import ( import (
"context" "context"
"sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -44,6 +45,9 @@ type Strategy struct {
// StandardIndicatorSet contains the standard indicators of a market (symbol) // StandardIndicatorSet contains the standard indicators of a market (symbol)
// This field will be injected automatically since we defined the Symbol field. // This field will be injected automatically since we defined the Symbol field.
*bbgo.StandardIndicatorSet *bbgo.StandardIndicatorSet
// Graceful shutdown function
*bbgo.Graceful
// -------------------------- // --------------------------
// ewma is the exponential weighted moving average indicator // ewma is the exponential weighted moving average indicator
@ -94,26 +98,6 @@ func (s *Strategy) updateBidOrders(orderExecutor bbgo.OrderExecutor, session *bb
s.activeOrders.Add(orders...) s.activeOrders.Add(orders...)
} }
func (s *Strategy) orderUpdateHandler(order types.Order) {
if order.Symbol != s.Symbol {
return
}
log.Infof("received order update: %+v", order)
switch order.Status {
case types.OrderStatusFilled:
s.activeOrders.Remove(order)
case types.OrderStatusCanceled, types.OrderStatusRejected:
log.Infof("order status %s, removing %d from the active order pool...", order.Status, order.OrderID)
s.activeOrders.Remove(order)
case types.OrderStatusPartiallyFilled:
s.activeOrders.Add(order)
}
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: string(s.Interval)}) session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: string(s.Interval)})
} }
@ -121,12 +105,21 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this. // we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
s.activeOrders = bbgo.NewLocalActiveOrderBook() s.activeOrders = bbgo.NewLocalActiveOrderBook()
s.activeOrders.BindStream(session.Stream)
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error")
}
})
s.ewma = s.StandardIndicatorSet.GetEWMA(types.IntervalWindow{ s.ewma = s.StandardIndicatorSet.GetEWMA(types.IntervalWindow{
Interval: s.Interval, Interval: s.Interval,
Window: 25, Window: 25,
}) })
session.Stream.OnOrderUpdate(s.orderUpdateHandler)
session.Stream.OnKLineClosed(func(kline types.KLine) { session.Stream.OnKLineClosed(func(kline types.KLine) {
s.updateOrders(orderExecutor, session) s.updateOrders(orderExecutor, session)
}) })

View File

@ -2,6 +2,7 @@ package grid
import ( import (
"context" "context"
"sync"
"sync/atomic" "sync/atomic"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -27,6 +28,8 @@ type Strategy struct {
// This field will be injected automatically since it's a single exchange strategy. // This field will be injected automatically since it's a single exchange strategy.
*bbgo.Notifiability *bbgo.Notifiability
*bbgo.Graceful
// OrderExecutor is an interface for submitting order. // OrderExecutor is an interface for submitting order.
// This field will be injected automatically since it's a single exchange strategy. // This field will be injected automatically since it's a single exchange strategy.
bbgo.OrderExecutor bbgo.OrderExecutor
@ -193,6 +196,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// we don't persist orders so that we can not clear the previous orders for now. just need time to support this. // we don't persist orders so that we can not clear the previous orders for now. just need time to support this.
s.activeOrders = bbgo.NewLocalActiveOrderBook() s.activeOrders = bbgo.NewLocalActiveOrderBook()
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error")
}
})
session.Stream.OnOrderUpdate(s.orderUpdateHandler) session.Stream.OnOrderUpdate(s.orderUpdateHandler)
session.Stream.OnTradeUpdate(s.tradeUpdateHandler) session.Stream.OnTradeUpdate(s.tradeUpdateHandler)
session.Stream.OnConnect(func() { session.Stream.OnConnect(func() {