From 6635fd749dcd89c5362694e7f5a4028e4bad8f1b Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 5 May 2022 15:05:38 +0800 Subject: [PATCH] xmaker: migrate xmaker persistence --- pkg/strategy/xmaker/state.go | 7 +- pkg/strategy/xmaker/strategy.go | 116 ++++++++++++++++---------------- 2 files changed, 64 insertions(+), 59 deletions(-) diff --git a/pkg/strategy/xmaker/state.go b/pkg/strategy/xmaker/state.go index ea82bc96f..097a8ebe8 100644 --- a/pkg/strategy/xmaker/state.go +++ b/pkg/strategy/xmaker/state.go @@ -9,12 +9,17 @@ import ( type State struct { CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty"` + + // Deprecated: Position *types.Position `json:"position,omitempty"` + + // Deprecated: ProfitStats ProfitStats `json:"profitStats,omitempty"` } type ProfitStats struct { - types.ProfitStats + *types.ProfitStats + lock sync.Mutex MakerExchange types.ExchangeName `json:"makerExchange"` diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 3d572b4e7..53ae10fde 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -13,7 +13,6 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/indicator" - "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" ) @@ -98,6 +97,11 @@ type Strategy struct { state *State + // persistence fields + Position *types.Position `json:"position,omitempty" persistence:"position"` + ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"` + CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"` + book *types.StreamOrderBook activeMakerOrders *bbgo.LocalActiveOrderBook @@ -119,6 +123,10 @@ func (s *Strategy) ID() string { return ID } +func (s *Strategy) InstanceID() string { + return fmt.Sprintf("%s:%s", ID, s.Symbol) +} + func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) { sourceSession, ok := sessions[s.SourceExchange] if !ok { @@ -270,7 +278,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or // 1. place bid orders when we already bought too much // 2. place ask orders when we already sold too much if s.MaxExposurePosition.Sign() > 0 { - pos := s.state.Position.GetBase() + pos := s.Position.GetBase() if pos.Compare(s.MaxExposurePosition.Neg()) > 0 { // stop sell if we over-sell @@ -565,9 +573,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { // if it's selling, than we should add positive position if side == types.SideTypeSell { - s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity) + s.CoveredPosition = s.CoveredPosition.Add(quantity) } else { - s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity.Neg()) + s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg()) } s.orderStore.Add(returnOrders...) @@ -593,39 +601,10 @@ func (s *Strategy) LoadState() error { var state State // load position - if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err != nil { - if err != service.ErrPersistenceNotExists { - return err - } - - s.state = &State{} - } else { + if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err == nil { s.state = &state } - // if position is nil, we need to allocate a new position for calculation - if s.state.Position == nil { - s.state.Position = types.NewPositionFromMarket(s.makerMarket) - } - s.state.Position.Market = s.makerMarket - - s.state.ProfitStats.Symbol = s.makerMarket.Symbol - s.state.ProfitStats.BaseCurrency = s.makerMarket.BaseCurrency - s.state.ProfitStats.QuoteCurrency = s.makerMarket.QuoteCurrency - s.state.ProfitStats.MakerExchange = s.makerSession.ExchangeName - if s.state.ProfitStats.AccumulatedSince == 0 { - s.state.ProfitStats.AccumulatedSince = time.Now().Unix() - } - - return nil -} - -func (s *Strategy) SaveState() error { - if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil { - return err - } else { - log.Infof("%s state is saved => %+v", ID, s.state) - } return nil } @@ -708,25 +687,54 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order }, 1.0) // restore state - instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol) + instanceID := s.InstanceID() s.groupID = util.FNV32(instanceID) log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID) if err := s.LoadState(); err != nil { return err } else { - s.Notify("xmaker: %s position is restored", s.Symbol, s.state.Position) + s.Notify("xmaker: %s position is restored", s.Symbol, s.Position) + } + + if s.Position == nil { + if s.state != nil && s.state.Position != nil { + s.Position = s.state.Position + } else { + s.Position = types.NewPositionFromMarket(s.makerMarket) + } + + // force update for legacy code + s.Position.Market = s.makerMarket + } + + if s.ProfitStats == nil { + if s.state != nil { + p2 := s.state.ProfitStats + s.ProfitStats = &p2 + } else { + s.ProfitStats = &ProfitStats{ + ProfitStats: types.NewProfitStats(s.makerMarket), + MakerExchange: s.makerSession.ExchangeName, + } + } + } + + if s.CoveredPosition.IsZero() { + if s.state != nil && !s.CoveredPosition.IsZero() { + s.CoveredPosition = s.state.CoveredPosition + } } if s.makerSession.MakerFeeRate.Sign() > 0 || s.makerSession.TakerFeeRate.Sign() > 0 { - s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{ + s.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{ MakerFeeRate: s.makerSession.MakerFeeRate, TakerFeeRate: s.makerSession.TakerFeeRate, }) } if s.sourceSession.MakerFeeRate.Sign() > 0 || s.sourceSession.TakerFeeRate.Sign() > 0 { - s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{ + s.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{ MakerFeeRate: s.sourceSession.MakerFeeRate, TakerFeeRate: s.sourceSession.TakerFeeRate, }) @@ -742,7 +750,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order s.orderStore.BindStream(s.sourceSession.UserDataStream) s.orderStore.BindStream(s.makerSession.UserDataStream) - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore) + s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) if s.NotifyTrade { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { @@ -753,27 +761,23 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { c := trade.PositionChange() if trade.Exchange == s.sourceSession.ExchangeName { - s.state.CoveredPosition = s.state.CoveredPosition.Add(c) + s.CoveredPosition = s.CoveredPosition.Add(c) } - s.state.ProfitStats.AddTrade(trade) + s.ProfitStats.AddTrade(trade) if profit.Compare(fixedpoint.Zero) == 0 { - s.Environment.RecordPosition(s.state.Position, trade, nil) + s.Environment.RecordPosition(s.Position, trade, nil) } else { log.Infof("%s generated profit: %v", s.Symbol, profit) - p := s.state.Position.NewProfit(trade, profit, netProfit) + p := s.Position.NewProfit(trade, profit, netProfit) p.Strategy = ID p.StrategyInstanceID = instanceID s.Notify(&p) - s.state.ProfitStats.AddProfit(p) + s.ProfitStats.AddProfit(p) - s.Environment.RecordPosition(s.state.Position, trade, &p) - } - - if err := s.SaveState(); err != nil { - log.WithError(err).Error("save state error") + s.Environment.RecordPosition(s.Position, trade, &p) } }) @@ -824,7 +828,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order s.updateQuote(ctx, orderExecutionRouter) case <-reportTicker.C: - s.Notifiability.Notify(&s.state.ProfitStats) + s.Notifiability.Notify(&s.ProfitStats) case <-tradeScanTicker.C: log.Infof("scanning trades from %s ago...", tradeScanInterval) @@ -846,15 +850,15 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order // uncover position = -5 - -3 (covered position) = -2 s.tradeCollector.Process() - position := s.state.Position.GetBase() + position := s.Position.GetBase() - uncoverPosition := position.Sub(s.state.CoveredPosition) + uncoverPosition := position.Sub(s.CoveredPosition) absPos := uncoverPosition.Abs() if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 { log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", s.Symbol, position, - s.state.CoveredPosition, + s.CoveredPosition, uncoverPosition, ) @@ -879,11 +883,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order log.WithError(err).Errorf("graceful cancel error") } - if err := s.SaveState(); err != nil { - log.WithError(err).Errorf("can not save state: %+v", s.state) - } else { - s.Notify("%s: %s position is saved", ID, s.Symbol, s.state.Position) - } + s.Notify("%s: %s position", ID, s.Symbol, s.Position) }) return nil