all: rewrite and clean up graceful shutdown api

This commit is contained in:
c9s 2022-06-30 13:48:04 +08:00
parent 7d5474e3dd
commit 527070d13d
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
23 changed files with 53 additions and 66 deletions

View File

@ -3,18 +3,40 @@ package bbgo
import (
"context"
"sync"
"time"
"github.com/sirupsen/logrus"
)
var graceful = &Graceful{}
//go:generate callbackgen -type Graceful
type Graceful struct {
shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup)
}
// Shutdown is a blocking call to emit all shutdown callbacks at the same time.
func (g *Graceful) Shutdown(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(g.shutdownCallbacks))
go g.EmitShutdown(ctx, &wg)
// for each shutdown callback, we give them 10 second
shtCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
go g.EmitShutdown(shtCtx, &wg)
wg.Wait()
cancel()
}
func OnShutdown(f func(ctx context.Context, wg *sync.WaitGroup)) {
graceful.OnShutdown(f)
}
func Shutdown() {
logrus.Infof("shutting down...")
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
graceful.Shutdown(ctx)
cancel()
}

View File

@ -24,7 +24,6 @@ func (s *TestStructWithoutInstanceID) ID() string {
type TestStruct struct {
*Environment
*Graceful
Position *types.Position `persistence:"position"`
Integer int64 `persistence:"integer"`

View File

@ -73,8 +73,6 @@ type Trader struct {
exchangeStrategies map[string][]SingleExchangeStrategy
logger Logger
Graceful Graceful
}
func NewTrader(environ *Environment) *Trader {
@ -416,7 +414,6 @@ func (trader *Trader) injectCommonServices(s interface{}) error {
}
return parseStructAndInject(s,
&trader.Graceful,
&trader.logger,
Notification,
trader.environment.TradeService,

View File

@ -443,9 +443,7 @@ var BacktestCmd = &cobra.Command{
cmdutil.WaitForSignal(runCtx, syscall.SIGINT, syscall.SIGTERM)
log.Infof("shutting down trader...")
shutdownCtx, cancelShutdown := context.WithDeadline(runCtx, time.Now().Add(10*time.Second))
trader.Graceful.Shutdown(shutdownCtx)
cancelShutdown()
bbgo.Shutdown()
// put the logger back to print the pnl
log.SetLevel(log.InfoLevel)

View File

@ -8,7 +8,6 @@ import (
"path/filepath"
"runtime/pprof"
"syscall"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
@ -78,12 +77,7 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
// graceful period = 15 second
shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(15*time.Second))
log.Infof("shutting down...")
trader.Graceful.Shutdown(shutdownCtx)
cancelShutdown()
bbgo.Shutdown()
return nil
}
@ -216,10 +210,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
log.Infof("shutting down...")
shutdownCtx, cancelShutdown := context.WithDeadline(ctx, time.Now().Add(30*time.Second))
trader.Graceful.Shutdown(shutdownCtx)
cancelShutdown()
bbgo.Shutdown()
if err := trader.SaveState(); err != nil {
log.WithError(err).Errorf("can not save strategy states")

View File

@ -5,7 +5,6 @@ import (
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -21,14 +20,14 @@ type TestStrategy struct {
func Test_reflectMergeStructFields(t *testing.T) {
t.Run("zero value", func(t *testing.T) {
a := &TestStrategy{Symbol: "BTCUSDT"}
b := &bbgo.CumulatedVolumeTakeProfit{Symbol: ""}
b := &struct{ Symbol string }{Symbol: ""}
MergeStructValues(b, a)
assert.Equal(t, "BTCUSDT", b.Symbol)
})
t.Run("non-zero value", func(t *testing.T) {
a := &TestStrategy{Symbol: "BTCUSDT"}
b := &bbgo.CumulatedVolumeTakeProfit{Symbol: "ETHUSDT"}
b := &struct{ Symbol string }{Symbol: "ETHUSDT"}
MergeStructValues(b, a)
assert.Equal(t, "ETHUSDT", b.Symbol, "should be the original value")
})
@ -40,7 +39,10 @@ func Test_reflectMergeStructFields(t *testing.T) {
}{
IntervalWindow: iw,
}
b := &bbgo.CumulatedVolumeTakeProfit{}
b := &struct {
Symbol string
types.IntervalWindow
}{}
MergeStructValues(b, a)
assert.Equal(t, iw, b.IntervalWindow)
})
@ -52,7 +54,9 @@ func Test_reflectMergeStructFields(t *testing.T) {
}{
IntervalWindow: iw,
}
b := &bbgo.CumulatedVolumeTakeProfit{
b := &struct {
types.IntervalWindow
}{
IntervalWindow: types.IntervalWindow{Interval: types.Interval5m, Window: 9},
}
MergeStructValues(b, a)

View File

@ -41,9 +41,6 @@ type Strategy struct {
// This field will be injected automatically since we defined the Symbol field.
*bbgo.StandardIndicatorSet
// Graceful let you define the graceful shutdown handler
*bbgo.Graceful
// Market stores the configuration of the market, for example, VolumePrecision, PricePrecision, MinLotSize... etc
// This field will be injected automatically since we defined the Symbol field.
types.Market
@ -350,7 +347,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.profitOrders.BindStream(session.UserDataStream)
// setup graceful shutting down handler
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
// call Done to notify the main process.
defer wg.Done()
log.Infof("canceling active orders...")

View File

@ -49,7 +49,6 @@ type BollingerSetting struct {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Environment *bbgo.Environment
@ -616,7 +615,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// s.book = types.NewStreamBook(s.Symbol)
// s.book.BindStreamForBackground(session.MarketDataStream)
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_ = s.orderExecutor.GracefulCancel(ctx)

View File

@ -47,7 +47,6 @@ func (b BudgetPeriod) Duration() time.Duration {
// Strategy is the Dollar-Cost-Average strategy
type Strategy struct {
*bbgo.Graceful
Environment *bbgo.Environment
Symbol string `json:"symbol"`

View File

@ -25,7 +25,6 @@ func init() {
}
type Strategy struct {
*bbgo.Graceful
SourceExchangeName string `json:"sourceExchange"`
@ -217,7 +216,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.place(ctx, orderExecutor, session, indicator, closePrice)
})
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling trailingstop order...")
s.clear(ctx, orderExecutor)
@ -261,7 +260,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
s.place(ctx, &orderExecutor, session, indicator, closePrice)
})
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling trailingstop order...")
s.clear(ctx, &orderExecutor)

View File

@ -51,7 +51,6 @@ type Strategy struct {
KLineEndTime types.Time
*bbgo.Environment
*bbgo.Graceful
bbgo.StrategyController
activeMakerOrders *bbgo.ActiveOrderBook
@ -1221,7 +1220,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
}
})
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling active orders...")
s.CancelAll(ctx)

View File

@ -49,10 +49,6 @@ type Strategy struct {
// This field will be injected automatically since we defined the Symbol field.
*bbgo.StandardIndicatorSet
// Graceful shutdown function
*bbgo.Graceful
// --------------------------
// ewma is the exponential weighted moving average indicator
ewma *indicator.EWMA
}
@ -114,7 +110,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)
s.activeOrders.BindStream(session.UserDataStream)
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling active orders...")

View File

@ -31,7 +31,6 @@ type IntervalWindowSetting struct {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Environment *bbgo.Environment

View File

@ -45,8 +45,6 @@ type State struct {
}
type Strategy struct {
*bbgo.Graceful `json:"-" yaml:"-"`
*bbgo.Persistence
// OrderExecutor is an interface for submitting order.
@ -621,7 +619,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
s.tradeCollector.BindStream(session.UserDataStream)
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if err := s.SaveState(); err != nil {

View File

@ -70,8 +70,6 @@ type Entry struct {
}
type Strategy struct {
*bbgo.Graceful
Environment *bbgo.Environment
Symbol string `json:"symbol"`
Market types.Market
@ -124,6 +122,8 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
for i := range s.ExitMethods {
m := s.ExitMethods[i]
// we need to pass some information from the strategy configuration to the exit methods, like symbol, interval and window
dynamic.MergeStructValues(&m, s)
m.Subscribe(session)
}
@ -369,7 +369,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
}
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())
wg.Done()
})

View File

@ -29,9 +29,6 @@ func init() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Notifiability
Environment *bbgo.Environment
StandardIndicatorSet *bbgo.StandardIndicatorSet
Market types.Market

View File

@ -30,7 +30,6 @@ func init() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Environment *bbgo.Environment
@ -391,7 +390,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
// Graceful shutdown
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -134,7 +134,6 @@ func (control *TrailingStopControl) GenerateStopOrder(quantity fixedpoint.Value)
type Strategy struct {
*bbgo.Persistence `json:"-"`
*bbgo.Environment `json:"-"`
*bbgo.Graceful `json:"-"`
session *bbgo.ExchangeSession
@ -582,7 +581,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// Cancel trailing stop order

View File

@ -30,7 +30,6 @@ func init() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Environment *bbgo.Environment
@ -377,7 +376,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
}()
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -135,8 +135,6 @@ func (a *Address) UnmarshalJSON(body []byte) error {
}
type Strategy struct {
*bbgo.Graceful
Interval types.Duration `json:"interval"`
Addresses map[string]Address `json:"addresses"`
@ -342,7 +340,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
s.State = s.newDefaultState()
}
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
})

View File

@ -57,7 +57,6 @@ func (s *State) Reset() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Symbol string `json:"symbol"`
@ -193,7 +192,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
}
}
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -33,7 +33,6 @@ func init() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
Environment *bbgo.Environment
@ -879,7 +878,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
}
}()
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -58,7 +58,6 @@ func (s *State) Reset() {
}
type Strategy struct {
*bbgo.Graceful
*bbgo.Persistence
*bbgo.Environment
@ -180,7 +179,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
return err
}
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
s.SaveState()