all: add context parameter to Sync()

This commit is contained in:
c9s 2022-10-03 18:45:24 +08:00
parent ce318fff3b
commit 8a50474ad1
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
20 changed files with 27 additions and 37 deletions

View File

@ -1,6 +1,7 @@
package bbgo package bbgo
import ( import (
"context"
"reflect" "reflect"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -16,7 +17,7 @@ var DefaultPersistenceServiceFacade = &service.PersistenceServiceFacade{
var PersistenceServiceFacade = DefaultPersistenceServiceFacade var PersistenceServiceFacade = DefaultPersistenceServiceFacade
// Sync syncs the object properties into the persistence layer // Sync syncs the object properties into the persistence layer
func Sync(obj interface{}) { func Sync(ctx context.Context, obj interface{}) {
id := dynamic.CallID(obj) id := dynamic.CallID(obj)
if len(id) == 0 { if len(id) == 0 {
log.Warnf("InstanceID() is not provided, can not sync persistence") log.Warnf("InstanceID() is not provided, can not sync persistence")

View File

@ -111,7 +111,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats) s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol) s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

View File

@ -516,7 +516,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.Bind() s.orderExecutor.Bind()
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.ExitMethods.Bind(session, s.orderExecutor) s.ExitMethods.Bind(session, s.orderExecutor)
@ -531,7 +531,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.OnSuspend(func() { s.OnSuspend(func() {
_ = s.orderExecutor.GracefulCancel(ctx) _ = s.orderExecutor.GracefulCancel(ctx)
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.OnEmergencyStop(func() { s.OnEmergencyStop(func() {

View File

@ -47,7 +47,6 @@ func (b BudgetPeriod) Duration() time.Duration {
// Strategy is the Dollar-Cost-Average strategy // Strategy is the Dollar-Cost-Average strategy
type Strategy struct { type Strategy struct {
Environment *bbgo.Environment Environment *bbgo.Environment
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Market types.Market Market types.Market
@ -110,7 +109,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.orderExecutor.BindEnvironment(s.Environment) s.orderExecutor.BindEnvironment(s.Environment)
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()

View File

@ -795,7 +795,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats) s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats) s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.GeneralOrderExecutor.Bind() s.GeneralOrderExecutor.Bind()

View File

@ -310,7 +310,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats) s.GeneralOrderExecutor.BindProfitStats(s.ProfitStats)
s.GeneralOrderExecutor.BindTradeStats(s.TradeStats) s.GeneralOrderExecutor.BindTradeStats(s.TradeStats)
s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(p *types.Position) { s.GeneralOrderExecutor.TradeCollector().OnPositionUpdate(func(p *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.GeneralOrderExecutor.Bind() s.GeneralOrderExecutor.Bind()

View File

@ -708,7 +708,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
// s.orderExecutor.BindTradeStats(s.TradeStats) // s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()

View File

@ -109,7 +109,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats) s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol) s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

View File

@ -533,14 +533,6 @@ func (s *Strategy) LoadState() error {
return nil return nil
} }
func (s *Strategy) SaveState() error {
log.Infof("backing up grid state...")
submitOrders := s.activeOrders.Backup()
s.State.Orders = submitOrders
bbgo.Sync(s)
return nil
}
// InstanceID returns the instance identifier from the current grid configuration parameters // InstanceID returns the instance identifier from the current grid configuration parameters
func (s *Strategy) InstanceID() string { func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s-%s-%d-%d-%d", ID, s.Symbol, s.GridNum, s.UpperPrice.Int(), s.LowerPrice.Int()) return fmt.Sprintf("%s-%s-%d-%d-%d", ID, s.Symbol, s.GridNum, s.UpperPrice.Int(), s.LowerPrice.Int())
@ -603,11 +595,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
if err := s.SaveState(); err != nil { submitOrders := s.activeOrders.Backup()
log.WithError(err).Errorf("can not save state: %+v", s.State) s.State.Orders = submitOrders
} else { bbgo.Sync(ctx, s)
bbgo.Notify("%s: %s grid is saved", ID, s.Symbol)
}
// now we can cancel the open orders // now we can cancel the open orders
log.Infof("canceling active orders...") log.Infof("canceling active orders...")

View File

@ -179,7 +179,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
cumProfit.Update(s.CalcAssetValue(trade.Price).Float64()) cumProfit.Update(s.CalcAssetValue(trade.Price).Float64())
}) })
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol) s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

View File

@ -165,7 +165,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats) s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()

View File

@ -175,7 +175,7 @@ func (s *Strategy) Suspend(ctx context.Context) error {
log.WithError(err).Errorf("graceful cancel order error") log.WithError(err).Errorf("graceful cancel order error")
} }
bbgo.Sync(s) bbgo.Sync(ctx, s)
return nil return nil
} }
@ -416,7 +416,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats) s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()

View File

@ -88,7 +88,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
instanceID := s.InstanceID() instanceID := s.InstanceID()
s.orderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position) s.orderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()

View File

@ -104,7 +104,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Update our counter and sync the changes to the persistence layer on time // Update our counter and sync the changes to the persistence layer on time
// If you don't do this, BBGO will sync it automatically when BBGO shuts down. // If you don't do this, BBGO will sync it automatically when BBGO shuts down.
s.State.Counter++ s.State.Counter++
bbgo.Sync(s) bbgo.Sync(ctx, s)
// To check if we have the quote balance // To check if we have the quote balance
// When symbol = "BTCUSDT", the quote currency is USDT // When symbol = "BTCUSDT", the quote currency is USDT

View File

@ -544,14 +544,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Sync position to redis on trade // Sync position to redis on trade
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
// StrategyController // StrategyController
s.Status = types.StrategyStatusRunning s.Status = types.StrategyStatusRunning
s.OnSuspend(func() { s.OnSuspend(func() {
_ = s.orderExecutor.GracefulCancel(ctx) _ = s.orderExecutor.GracefulCancel(ctx)
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.OnEmergencyStop(func() { s.OnEmergencyStop(func() {
_ = s.orderExecutor.GracefulCancel(ctx) _ = s.orderExecutor.GracefulCancel(ctx)

View File

@ -346,7 +346,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.OnSuspend(func() { s.OnSuspend(func() {
// Cancel all order // Cancel all order
_ = s.orderExecutor.GracefulCancel(ctx) _ = s.orderExecutor.GracefulCancel(ctx)
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.OnEmergencyStop(func() { s.OnEmergencyStop(func() {

View File

@ -112,7 +112,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor.BindProfitStats(s.ProfitStats) s.orderExecutor.BindProfitStats(s.ProfitStats)
s.orderExecutor.BindTradeStats(s.TradeStats) s.orderExecutor.BindTradeStats(s.TradeStats)
s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) { s.orderExecutor.TradeCollector().OnPositionUpdate(func(position *types.Position) {
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
s.orderExecutor.Bind() s.orderExecutor.Bind()
s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol) s.activeOrders = bbgo.NewActiveOrderBook(s.Symbol)

View File

@ -279,7 +279,7 @@ func (s *Strategy) checkBalance(ctx context.Context, sessions map[string]*bbgo.E
s.State.DailyNumberOfTransfers += 1 s.State.DailyNumberOfTransfers += 1
s.State.DailyAmountOfTransfers = s.State.DailyAmountOfTransfers.Add(requiredAmount) s.State.DailyAmountOfTransfers = s.State.DailyAmountOfTransfers.Add(requiredAmount)
bbgo.Sync(s) bbgo.Sync(ctx, s)
} }
} }

View File

@ -182,7 +182,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
close(s.stopC) close(s.stopC)
bbgo.Sync(s) bbgo.Sync(context.Background(), s)
}) })
// from here, set data binding // from here, set data binding

View File

@ -129,7 +129,7 @@ func (s *Strategy) recordNetAssetValue(ctx context.Context, sessions map[string]
if s.State.IsOver24Hours() { if s.State.IsOver24Hours() {
s.State.Reset() s.State.Reset()
} }
bbgo.Sync(s) bbgo.Sync(ctx, s)
} }
} }
@ -146,7 +146,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
bbgo.Sync(s) bbgo.Sync(ctx, s)
}) })
if s.ReportOnStart { if s.ReportOnStart {