Merge pull request #973 from c9s/refactor/shutdown-handler

refactor/feature: add isolation context support
This commit is contained in:
Yo-An Lin 2022-10-03 16:40:01 +08:00 committed by GitHub
commit ef6a22d142
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 118 additions and 76 deletions

View File

@ -1,18 +0,0 @@
// Code generated by "callbackgen -type Graceful"; DO NOT EDIT.
package bbgo
import (
"context"
"sync"
)
func (g *Graceful) OnShutdown(cb ShutdownHandler) {
g.shutdownCallbacks = append(g.shutdownCallbacks, cb)
}
func (g *Graceful) EmitShutdown(ctx context.Context, wg *sync.WaitGroup) {
for _, cb := range g.shutdownCallbacks {
cb(ctx, wg)
}
}

View File

@ -8,17 +8,15 @@ import (
"github.com/sirupsen/logrus"
)
var graceful = &Graceful{}
type ShutdownHandler func(ctx context.Context, wg *sync.WaitGroup)
//go:generate callbackgen -type Graceful
type Graceful struct {
//go:generate callbackgen -type GracefulShutdown
type GracefulShutdown struct {
shutdownCallbacks []ShutdownHandler
}
// Shutdown is a blocking call to emit all shutdown callbacks at the same time.
func (g *Graceful) Shutdown(ctx context.Context) {
func (g *GracefulShutdown) Shutdown(ctx context.Context) {
var wg sync.WaitGroup
wg.Add(len(g.shutdownCallbacks))
@ -31,14 +29,18 @@ func (g *Graceful) Shutdown(ctx context.Context) {
cancel()
}
func OnShutdown(f ShutdownHandler) {
graceful.OnShutdown(f)
func OnShutdown(ctx context.Context, f ShutdownHandler) {
isolatedContext := NewIsolationFromContext(ctx)
isolatedContext.gracefulShutdown.OnShutdown(f)
}
func Shutdown() {
func Shutdown(ctx context.Context) {
logrus.Infof("shutting down...")
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
graceful.Shutdown(ctx)
isolatedContext := NewIsolationFromContext(ctx)
todo := context.WithValue(context.TODO(), IsolationContextKey, isolatedContext)
timeoutCtx, cancel := context.WithTimeout(todo, 30*time.Second)
defaultIsolation.gracefulShutdown.Shutdown(timeoutCtx)
cancel()
}

View File

@ -0,0 +1,18 @@
// Code generated by "callbackgen -type GracefulShutdown"; DO NOT EDIT.
package bbgo
import (
"context"
"sync"
)
func (g *GracefulShutdown) OnShutdown(cb ShutdownHandler) {
g.shutdownCallbacks = append(g.shutdownCallbacks, cb)
}
func (g *GracefulShutdown) EmitShutdown(ctx context.Context, wg *sync.WaitGroup) {
for _, cb := range g.shutdownCallbacks {
cb(ctx, wg)
}
}

30
pkg/bbgo/isolation.go Normal file
View File

@ -0,0 +1,30 @@
package bbgo
import (
"context"
)
const IsolationContextKey = "bbgo"
var defaultIsolation *Isolation = nil
func init() {
defaultIsolation = NewIsolation()
}
type Isolation struct {
gracefulShutdown GracefulShutdown
}
func NewIsolation() *Isolation {
return &Isolation{}
}
func NewIsolationFromContext(ctx context.Context) *Isolation {
isolatedContext, ok := ctx.Value(IsolationContextKey).(*Isolation)
if ok {
return isolatedContext
}
return defaultIsolation
}

View File

@ -88,6 +88,8 @@ type Trader struct {
crossExchangeStrategies []CrossExchangeStrategy
exchangeStrategies map[string][]SingleExchangeStrategy
gracefulShutdown GracefulShutdown
logger Logger
}
@ -218,8 +220,7 @@ func (trader *Trader) RunSingleExchangeStrategy(ctx context.Context, strategy Si
}
if shutdown, ok := strategy.(StrategyShutdown); ok {
// Register the shutdown callback
OnShutdown(shutdown.Shutdown)
trader.gracefulShutdown.OnShutdown(shutdown.Shutdown)
}
return strategy.Run(ctx, orderExecutor, session)
@ -429,6 +430,10 @@ func (trader *Trader) SaveState() error {
})
}
func (trader *Trader) Shutdown(ctx context.Context) {
trader.gracefulShutdown.Shutdown(ctx)
}
var defaultPersistenceSelector = &PersistenceSelector{
StoreID: "default",
Type: "memory",

View File

@ -484,7 +484,7 @@ var BacktestCmd = &cobra.Command{
cmdutil.WaitForSignal(runCtx, syscall.SIGINT, syscall.SIGTERM)
log.Infof("shutting down trader...")
bbgo.Shutdown()
bbgo.Shutdown(ctx)
// put the logger back to print the pnl
log.SetLevel(log.InfoLevel)

View File

@ -77,7 +77,7 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
bbgo.Shutdown()
bbgo.Shutdown(ctx)
return nil
}
@ -196,7 +196,7 @@ func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *bbgo.Con
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
bbgo.Shutdown()
bbgo.Shutdown(ctx)
if err := trader.SaveState(); err != nil {
log.WithError(err).Errorf("can not save strategy states")

View File

@ -6,10 +6,11 @@ import (
"os"
"sync"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)
const ID = "audacitymaker"
@ -99,7 +100,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel active orders
_ = s.orderExecutor.GracefulCancel(ctx)
// Close 100% position
//_ = s.ClosePosition(ctx, fixedpoint.One)
// _ = s.ClosePosition(ctx, fixedpoint.One)
})
// initial required information
@ -123,7 +124,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.OrderFlow.Bind(session, s.orderExecutor)
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())

View File

@ -347,7 +347,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.profitOrders.BindStream(session.UserDataStream)
// setup graceful shutting down handler
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
// call Done to notify the main process.
defer wg.Done()
log.Infof("canceling active orders...")

View File

@ -596,7 +596,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// s.book = types.NewStreamBook(s.Symbol)
// s.book.BindStreamForBackground(session.MarketDataStream)
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_ = s.orderExecutor.GracefulCancel(ctx)

View File

@ -400,9 +400,9 @@ func (s *Strategy) DrawIndicators(time types.Time) *types.Canvas {
hi := s.drift.drift.Abs().Highest(Length)
ratio := highestPrice / highestDrift
//canvas.Plot("upband", s.ma.Add(s.stdevHigh), time, Length)
// canvas.Plot("upband", s.ma.Add(s.stdevHigh), time, Length)
canvas.Plot("ma", s.ma, time, Length)
//canvas.Plot("downband", s.ma.Minus(s.stdevLow), time, Length)
// canvas.Plot("downband", s.ma.Minus(s.stdevLow), time, Length)
fmt.Printf("%f %f\n", highestPrice, hi)
canvas.Plot("trend", s.trendLine, time, Length)
@ -844,12 +844,12 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.highestPrice = 0
s.lowestPrice = 0
} else if s.p.IsLong() {
s.buyPrice = s.p.ApproximateAverageCost.Float64() //trade.Price.Float64()
s.buyPrice = s.p.ApproximateAverageCost.Float64() // trade.Price.Float64()
s.sellPrice = 0
s.highestPrice = math.Max(s.buyPrice, s.highestPrice)
s.lowestPrice = s.buyPrice
} else if s.p.IsShort() {
s.sellPrice = s.p.ApproximateAverageCost.Float64() //trade.Price.Float64()
s.sellPrice = s.p.ApproximateAverageCost.Float64() // trade.Price.Float64()
s.buyPrice = 0
s.highestPrice = s.sellPrice
if s.lowestPrice == 0 {
@ -951,7 +951,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
var buffer bytes.Buffer

View File

@ -10,13 +10,14 @@ import (
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
"github.com/sirupsen/logrus"
)
const ID = "elliottwave"
@ -379,7 +380,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
var buffer bytes.Buffer
for _, daypnl := range s.TradeStats.IntervalProfits[types.Interval1d].GetNonProfitableIntervals() {
fmt.Fprintf(&buffer, "%s\n", daypnl)

View File

@ -211,7 +211,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.place(ctx, orderExecutor, session, indicator, closePrice)
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling trailingstop order...")
s.clear(ctx, orderExecutor)
@ -255,7 +255,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
s.place(ctx, &orderExecutor, session, indicator, closePrice)
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling trailingstop order...")
s.clear(ctx, &orderExecutor)

View File

@ -1202,7 +1202,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling active orders...")

View File

@ -6,10 +6,11 @@ import (
"os"
"sync"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)
const ID = "factorzoo"
@ -97,7 +98,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel active orders
_ = s.orderExecutor.GracefulCancel(ctx)
// Close 100% position
//_ = s.ClosePosition(ctx, fixedpoint.One)
// _ = s.ClosePosition(ctx, fixedpoint.One)
})
// initial required information
@ -121,7 +122,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.Linear.Bind(session, s.orderExecutor)
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())

View File

@ -110,7 +110,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)
s.activeOrders.BindStream(session.UserDataStream)
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
log.Infof("canceling active orders...")

View File

@ -619,7 +619,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
})
s.tradeCollector.BindStream(session.UserDataStream)
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
if err := s.SaveState(); err != nil {

View File

@ -151,7 +151,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel active orders
_ = s.orderExecutor.GracefulCancel(ctx)
// Close 100% position
//_ = s.ClosePosition(ctx, fixedpoint.One)
// _ = s.ClosePosition(ctx, fixedpoint.One)
})
// initial required information
@ -162,14 +162,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats)
//modify := func(p float64) float64 {
// modify := func(p float64) float64 {
// return p
//}
//if s.GraphPNLDeductFee {
// }
// if s.GraphPNLDeductFee {
// modify = func(p float64) float64 {
// return p * (1. - Fee)
// }
//}
// }
profit := floats.Slice{1., 1.}
price, _ := s.session.LastPrice(s.Symbol)
initAsset := s.CalcAssetValue(price).Float64()
@ -195,8 +195,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.nrr.LoadK((*klines)[0:])
}
//startTime := s.Environment.StartTime()
//s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1h, startTime))
// startTime := s.Environment.StartTime()
// s.TradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1h, startTime))
s.session.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, s.Interval, func(kline types.KLine) {
@ -253,7 +253,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
bbgo.SendPhoto(&buffer)
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())
@ -270,7 +270,7 @@ func (s *Strategy) CalcAssetValue(price fixedpoint.Value) fixedpoint.Value {
func (s *Strategy) DrawPNL(profit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
//log.Errorf("pnl Highest: %f, Lowest: %f", types.Highest(profit, profit.Length()), types.Lowest(profit, profit.Length()))
// log.Errorf("pnl Highest: %f, Lowest: %f", types.Highest(profit, profit.Length()), types.Lowest(profit, profit.Length()))
length := profit.Length()
if s.GraphPNLDeductFee {
canvas.PlotRaw("pnl (with Fee Deducted)", profit, length)

View File

@ -183,7 +183,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.FailedBreakHigh.Bind(session, s.orderExecutor)
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())

View File

@ -643,7 +643,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}))
// Graceful shutdown
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// Output accumulated profit report

View File

@ -578,7 +578,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
})
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
// Cancel trailing stop order

View File

@ -3,14 +3,16 @@ package trendtrader
import (
"context"
"fmt"
"github.com/c9s/bbgo/pkg/dynamic"
"os"
"sync"
"github.com/c9s/bbgo/pkg/dynamic"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
)
const ID = "trendtrader"
@ -54,8 +56,8 @@ type Strategy struct {
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
//session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Trend.Interval})
//session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: types.Interval1m})
// session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Trend.Interval})
// session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: types.Interval1m})
if s.TrendLine != nil {
dynamic.InheritStructValues(s.TrendLine, s)
@ -99,7 +101,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel active orders
_ = s.orderExecutor.GracefulCancel(ctx)
// Close 100% position
//_ = s.ClosePosition(ctx, fixedpoint.One)
// _ = s.ClosePosition(ctx, fixedpoint.One)
})
// initial required information
@ -123,7 +125,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.TrendLine.Bind(session, s.orderExecutor)
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
_, _ = fmt.Fprintln(os.Stderr, s.TradeStats.String())

View File

@ -376,7 +376,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
}()
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -341,7 +341,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
s.State = s.newDefaultState()
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
})

View File

@ -192,7 +192,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
}
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -880,7 +880,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
}
}()
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
close(s.stopC)

View File

@ -180,7 +180,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
return err
}
bbgo.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()
s.SaveState()