xmaker: migrate xmaker persistence

This commit is contained in:
c9s 2022-05-05 15:05:38 +08:00
parent 10a7928580
commit 6635fd749d
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 64 additions and 59 deletions

View File

@ -9,12 +9,17 @@ import (
type State struct { type State struct {
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty"` CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty"`
// Deprecated:
Position *types.Position `json:"position,omitempty"` Position *types.Position `json:"position,omitempty"`
// Deprecated:
ProfitStats ProfitStats `json:"profitStats,omitempty"` ProfitStats ProfitStats `json:"profitStats,omitempty"`
} }
type ProfitStats struct { type ProfitStats struct {
types.ProfitStats *types.ProfitStats
lock sync.Mutex lock sync.Mutex
MakerExchange types.ExchangeName `json:"makerExchange"` MakerExchange types.ExchangeName `json:"makerExchange"`

View File

@ -13,7 +13,6 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator" "github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
) )
@ -98,6 +97,11 @@ type Strategy struct {
state *State state *State
// persistence fields
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
book *types.StreamOrderBook book *types.StreamOrderBook
activeMakerOrders *bbgo.LocalActiveOrderBook activeMakerOrders *bbgo.LocalActiveOrderBook
@ -119,6 +123,10 @@ func (s *Strategy) ID() string {
return ID return ID
} }
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
}
func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) { func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
sourceSession, ok := sessions[s.SourceExchange] sourceSession, ok := sessions[s.SourceExchange]
if !ok { if !ok {
@ -270,7 +278,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
// 1. place bid orders when we already bought too much // 1. place bid orders when we already bought too much
// 2. place ask orders when we already sold too much // 2. place ask orders when we already sold too much
if s.MaxExposurePosition.Sign() > 0 { if s.MaxExposurePosition.Sign() > 0 {
pos := s.state.Position.GetBase() pos := s.Position.GetBase()
if pos.Compare(s.MaxExposurePosition.Neg()) > 0 { if pos.Compare(s.MaxExposurePosition.Neg()) > 0 {
// stop sell if we over-sell // stop sell if we over-sell
@ -565,9 +573,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
// if it's selling, than we should add positive position // if it's selling, than we should add positive position
if side == types.SideTypeSell { if side == types.SideTypeSell {
s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity) s.CoveredPosition = s.CoveredPosition.Add(quantity)
} else { } else {
s.state.CoveredPosition = s.state.CoveredPosition.Add(quantity.Neg()) s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
} }
s.orderStore.Add(returnOrders...) s.orderStore.Add(returnOrders...)
@ -593,39 +601,10 @@ func (s *Strategy) LoadState() error {
var state State var state State
// load position // load position
if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err != nil { if err := s.Persistence.Load(&state, ID, s.Symbol, stateKey); err == nil {
if err != service.ErrPersistenceNotExists {
return err
}
s.state = &State{}
} else {
s.state = &state s.state = &state
} }
// if position is nil, we need to allocate a new position for calculation
if s.state.Position == nil {
s.state.Position = types.NewPositionFromMarket(s.makerMarket)
}
s.state.Position.Market = s.makerMarket
s.state.ProfitStats.Symbol = s.makerMarket.Symbol
s.state.ProfitStats.BaseCurrency = s.makerMarket.BaseCurrency
s.state.ProfitStats.QuoteCurrency = s.makerMarket.QuoteCurrency
s.state.ProfitStats.MakerExchange = s.makerSession.ExchangeName
if s.state.ProfitStats.AccumulatedSince == 0 {
s.state.ProfitStats.AccumulatedSince = time.Now().Unix()
}
return nil
}
func (s *Strategy) SaveState() error {
if err := s.Persistence.Save(s.state, ID, s.Symbol, stateKey); err != nil {
return err
} else {
log.Infof("%s state is saved => %+v", ID, s.state)
}
return nil return nil
} }
@ -708,25 +687,54 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
}, 1.0) }, 1.0)
// restore state // restore state
instanceID := fmt.Sprintf("%s-%s", ID, s.Symbol) instanceID := s.InstanceID()
s.groupID = util.FNV32(instanceID) s.groupID = util.FNV32(instanceID)
log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID) log.Infof("using group id %d from fnv(%s)", s.groupID, instanceID)
if err := s.LoadState(); err != nil { if err := s.LoadState(); err != nil {
return err return err
} else { } else {
s.Notify("xmaker: %s position is restored", s.Symbol, s.state.Position) s.Notify("xmaker: %s position is restored", s.Symbol, s.Position)
}
if s.Position == nil {
if s.state != nil && s.state.Position != nil {
s.Position = s.state.Position
} else {
s.Position = types.NewPositionFromMarket(s.makerMarket)
}
// force update for legacy code
s.Position.Market = s.makerMarket
}
if s.ProfitStats == nil {
if s.state != nil {
p2 := s.state.ProfitStats
s.ProfitStats = &p2
} else {
s.ProfitStats = &ProfitStats{
ProfitStats: types.NewProfitStats(s.makerMarket),
MakerExchange: s.makerSession.ExchangeName,
}
}
}
if s.CoveredPosition.IsZero() {
if s.state != nil && !s.CoveredPosition.IsZero() {
s.CoveredPosition = s.state.CoveredPosition
}
} }
if s.makerSession.MakerFeeRate.Sign() > 0 || s.makerSession.TakerFeeRate.Sign() > 0 { if s.makerSession.MakerFeeRate.Sign() > 0 || s.makerSession.TakerFeeRate.Sign() > 0 {
s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{ s.Position.SetExchangeFeeRate(types.ExchangeName(s.MakerExchange), types.ExchangeFee{
MakerFeeRate: s.makerSession.MakerFeeRate, MakerFeeRate: s.makerSession.MakerFeeRate,
TakerFeeRate: s.makerSession.TakerFeeRate, TakerFeeRate: s.makerSession.TakerFeeRate,
}) })
} }
if s.sourceSession.MakerFeeRate.Sign() > 0 || s.sourceSession.TakerFeeRate.Sign() > 0 { if s.sourceSession.MakerFeeRate.Sign() > 0 || s.sourceSession.TakerFeeRate.Sign() > 0 {
s.state.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{ s.Position.SetExchangeFeeRate(types.ExchangeName(s.SourceExchange), types.ExchangeFee{
MakerFeeRate: s.sourceSession.MakerFeeRate, MakerFeeRate: s.sourceSession.MakerFeeRate,
TakerFeeRate: s.sourceSession.TakerFeeRate, TakerFeeRate: s.sourceSession.TakerFeeRate,
}) })
@ -742,7 +750,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.orderStore.BindStream(s.sourceSession.UserDataStream) s.orderStore.BindStream(s.sourceSession.UserDataStream)
s.orderStore.BindStream(s.makerSession.UserDataStream) s.orderStore.BindStream(s.makerSession.UserDataStream)
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.state.Position, s.orderStore) s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
if s.NotifyTrade { if s.NotifyTrade {
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
@ -753,27 +761,23 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange() c := trade.PositionChange()
if trade.Exchange == s.sourceSession.ExchangeName { if trade.Exchange == s.sourceSession.ExchangeName {
s.state.CoveredPosition = s.state.CoveredPosition.Add(c) s.CoveredPosition = s.CoveredPosition.Add(c)
} }
s.state.ProfitStats.AddTrade(trade) s.ProfitStats.AddTrade(trade)
if profit.Compare(fixedpoint.Zero) == 0 { if profit.Compare(fixedpoint.Zero) == 0 {
s.Environment.RecordPosition(s.state.Position, trade, nil) s.Environment.RecordPosition(s.Position, trade, nil)
} else { } else {
log.Infof("%s generated profit: %v", s.Symbol, profit) log.Infof("%s generated profit: %v", s.Symbol, profit)
p := s.state.Position.NewProfit(trade, profit, netProfit) p := s.Position.NewProfit(trade, profit, netProfit)
p.Strategy = ID p.Strategy = ID
p.StrategyInstanceID = instanceID p.StrategyInstanceID = instanceID
s.Notify(&p) s.Notify(&p)
s.state.ProfitStats.AddProfit(p) s.ProfitStats.AddProfit(p)
s.Environment.RecordPosition(s.state.Position, trade, &p) s.Environment.RecordPosition(s.Position, trade, &p)
}
if err := s.SaveState(); err != nil {
log.WithError(err).Error("save state error")
} }
}) })
@ -824,7 +828,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.updateQuote(ctx, orderExecutionRouter) s.updateQuote(ctx, orderExecutionRouter)
case <-reportTicker.C: case <-reportTicker.C:
s.Notifiability.Notify(&s.state.ProfitStats) s.Notifiability.Notify(&s.ProfitStats)
case <-tradeScanTicker.C: case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval) log.Infof("scanning trades from %s ago...", tradeScanInterval)
@ -846,15 +850,15 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
// uncover position = -5 - -3 (covered position) = -2 // uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process() s.tradeCollector.Process()
position := s.state.Position.GetBase() position := s.Position.GetBase()
uncoverPosition := position.Sub(s.state.CoveredPosition) uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs() absPos := uncoverPosition.Abs()
if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 { if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol, s.Symbol,
position, position,
s.state.CoveredPosition, s.CoveredPosition,
uncoverPosition, uncoverPosition,
) )
@ -879,11 +883,7 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
log.WithError(err).Errorf("graceful cancel error") log.WithError(err).Errorf("graceful cancel error")
} }
if err := s.SaveState(); err != nil { s.Notify("%s: %s position", ID, s.Symbol, s.Position)
log.WithError(err).Errorf("can not save state: %+v", s.state)
} else {
s.Notify("%s: %s position is saved", ID, s.Symbol, s.state.Position)
}
}) })
return nil return nil