From 8af2f2f83faf9365f8263d9474f9bd6ce7760343 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 19 Jul 2022 16:59:56 +0800 Subject: [PATCH 1/2] add defaulter interface --- pkg/bbgo/trader.go | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index e8cbc13ed..57dcca7f4 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -24,10 +24,15 @@ type SingleExchangeStrategy interface { Run(ctx context.Context, orderExecutor OrderExecutor, session *ExchangeSession) error } +// StrategyInitializer's Initialize method is called before the Subscribe method call. type StrategyInitializer interface { Initialize() error } +type StrategyDefaulter interface { + Defaults() error +} + // ExchangeSessionSubscriber provides an interface for collecting subscriptions from different strategies // Subscribe method will be called before the user data stream connection is created. type ExchangeSessionSubscriber interface { @@ -153,6 +158,12 @@ func (trader *Trader) Subscribe() { for sessionName, strategies := range trader.exchangeStrategies { session := trader.environment.sessions[sessionName] for _, strategy := range strategies { + if defaulter, ok := strategy.(StrategyDefaulter) ; ok { + if err := defaulter.Defaults(); err != nil { + panic(err) + } + } + if initializer, ok := strategy.(StrategyInitializer); ok { if err := initializer.Initialize(); err != nil { panic(err) From ab83805b3446ddd974a4e7febb93e7288e81a1a5 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 19 Jul 2022 17:13:35 +0800 Subject: [PATCH 2/2] bbgo: add StrategyShutdown interface --- pkg/bbgo/graceful_callbacks.go | 2 +- pkg/bbgo/graceful_shutdown.go | 6 ++++-- pkg/bbgo/trader.go | 35 +++++++++++++++++++++++++++------- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/pkg/bbgo/graceful_callbacks.go b/pkg/bbgo/graceful_callbacks.go index 848b6fdcd..f194e7b2b 100644 --- a/pkg/bbgo/graceful_callbacks.go +++ b/pkg/bbgo/graceful_callbacks.go @@ -7,7 +7,7 @@ import ( "sync" ) -func (g *Graceful) OnShutdown(cb func(ctx context.Context, wg *sync.WaitGroup)) { +func (g *Graceful) OnShutdown(cb ShutdownHandler) { g.shutdownCallbacks = append(g.shutdownCallbacks, cb) } diff --git a/pkg/bbgo/graceful_shutdown.go b/pkg/bbgo/graceful_shutdown.go index c3248b0c8..4dbf6824a 100644 --- a/pkg/bbgo/graceful_shutdown.go +++ b/pkg/bbgo/graceful_shutdown.go @@ -10,9 +10,11 @@ import ( var graceful = &Graceful{} +type ShutdownHandler func(ctx context.Context, wg *sync.WaitGroup) + //go:generate callbackgen -type Graceful type Graceful struct { - shutdownCallbacks []func(ctx context.Context, wg *sync.WaitGroup) + shutdownCallbacks []ShutdownHandler } // Shutdown is a blocking call to emit all shutdown callbacks at the same time. @@ -29,7 +31,7 @@ func (g *Graceful) Shutdown(ctx context.Context) { cancel() } -func OnShutdown(f func(ctx context.Context, wg *sync.WaitGroup)) { +func OnShutdown(f ShutdownHandler) { graceful.OnShutdown(f) } diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 57dcca7f4..9b5ca86a6 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "github.com/pkg/errors" log "github.com/sirupsen/logrus" @@ -14,6 +15,12 @@ import ( "github.com/c9s/bbgo/pkg/interact" ) +// Strategy method calls: +// -> Defaults() (optional method) +// -> Initialize() (optional method) +// -> Validate() (optional method) +// -> Run() (optional method) +// -> Shutdown(shutdownCtx context.Context, wg *sync.WaitGroup) type StrategyID interface { ID() string } @@ -33,6 +40,14 @@ type StrategyDefaulter interface { Defaults() error } +type StrategyValidator interface { + Validate() error +} + +type StrategyShutdown interface { + Shutdown(ctx context.Context, wg *sync.WaitGroup) +} + // ExchangeSessionSubscriber provides an interface for collecting subscriptions from different strategies // Subscribe method will be called before the user data stream connection is created. type ExchangeSessionSubscriber interface { @@ -48,10 +63,6 @@ type CrossExchangeStrategy interface { CrossRun(ctx context.Context, orderExecutionRouter OrderExecutionRouter, sessions map[string]*ExchangeSession) error } -type Validator interface { - Validate() error -} - type Logging interface { EnableLogging() DisableLogging() @@ -158,7 +169,7 @@ func (trader *Trader) Subscribe() { for sessionName, strategies := range trader.exchangeStrategies { session := trader.environment.sessions[sessionName] for _, strategy := range strategies { - if defaulter, ok := strategy.(StrategyDefaulter) ; ok { + if defaulter, ok := strategy.(StrategyDefaulter); ok { if err := defaulter.Defaults(); err != nil { panic(err) } @@ -179,6 +190,12 @@ func (trader *Trader) Subscribe() { } for _, strategy := range trader.crossExchangeStrategies { + if defaulter, ok := strategy.(StrategyDefaulter); ok { + if err := defaulter.Defaults(); err != nil { + panic(err) + } + } + if initializer, ok := strategy.(StrategyInitializer); ok { if err := initializer.Initialize(); err != nil { panic(err) @@ -240,13 +257,17 @@ func (trader *Trader) RunSingleExchangeStrategy(ctx context.Context, strategy Si } } - // If the strategy has Validate() method, run it and check the error - if v, ok := strategy.(Validator); ok { + if v, ok := strategy.(StrategyValidator); ok { if err := v.Validate(); err != nil { return fmt.Errorf("failed to validate the config: %w", err) } } + if shutdown, ok := strategy.(StrategyShutdown); ok { + // Register the shutdown callback + OnShutdown(shutdown.Shutdown) + } + return strategy.Run(ctx, orderExecutor, session) }