pivotshort: pull out GeneralOrderExecutor

This commit is contained in:
c9s 2022-06-18 11:45:24 +08:00
parent 807a3e125c
commit 0326c34013
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 87 additions and 39 deletions

View File

@ -51,6 +51,10 @@ func (c *TradeCollector) Position() *types.Position {
return c.position return c.position
} }
func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}
// QueueTrade sends the trade object to the trade channel, // QueueTrade sends the trade object to the trade channel,
// so that the goroutine can receive the trade and process in the background. // so that the goroutine can receive the trade and process in the background.
func (c *TradeCollector) QueueTrade(trade types.Trade) { func (c *TradeCollector) QueueTrade(trade types.Trade) {
@ -109,7 +113,12 @@ func (c *TradeCollector) Process() bool {
if c.orderStore.Exists(trade.OrderID) { if c.orderStore.Exists(trade.OrderID) {
c.doneTrades[key] = struct{}{} c.doneTrades[key] = struct{}{}
if profit, netProfit, madeProfit := c.position.AddTrade(trade); madeProfit { profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
_ = p
c.EmitTrade(trade, profit, netProfit) c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, profit, netProfit) c.EmitProfit(trade, profit, netProfit)
} else { } else {

View File

@ -142,7 +142,8 @@ type Strategy struct {
orderStore *bbgo.OrderStore orderStore *bbgo.OrderStore
tradeCollector *bbgo.TradeCollector tradeCollector *bbgo.TradeCollector
session *bbgo.ExchangeSession session *bbgo.ExchangeSession
orderExecutor *GeneralOrderExecutor
lastLow fixedpoint.Value lastLow fixedpoint.Value
pivot *indicator.Pivot pivot *indicator.Pivot
@ -244,15 +245,76 @@ func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol) return fmt.Sprintf("%s:%s", ID, s.Symbol)
} }
// GeneralOrderExecutor implements the general order executor for strategy
type GeneralOrderExecutor struct {
session *bbgo.ExchangeSession
symbol string
strategy string
strategyInstanceID string
activeMakerOrders *bbgo.ActiveOrderBook
orderStore *bbgo.OrderStore
tradeCollector *bbgo.TradeCollector
}
func NewGeneralOrderExecutor(session *bbgo.ExchangeSession, symbol, strategy, strategyInstanceID string) *GeneralOrderExecutor {
orderStore := bbgo.NewOrderStore(symbol)
return &GeneralOrderExecutor{
session: session,
symbol: symbol,
strategy: strategy,
strategyInstanceID: strategyInstanceID,
activeMakerOrders: bbgo.NewActiveOrderBook(symbol),
orderStore: orderStore,
tradeCollector: bbgo.NewTradeCollector(symbol, nil, orderStore),
}
}
func (e *GeneralOrderExecutor) Bind(position *types.Position, profitStats *types.ProfitStats, notify func(obj interface{}, args ...interface{})) {
// Always update the position fields
position.Strategy = e.strategy
position.StrategyInstanceID = e.strategyInstanceID
e.activeMakerOrders.BindStream(e.session.UserDataStream)
e.orderStore.BindStream(e.session.UserDataStream)
e.tradeCollector.SetPosition(position)
// trade notify
e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
notify(trade)
})
// profit stats
e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
profitStats.AddTrade(trade)
if profit.IsZero() {
return
}
p := position.NewProfit(trade, profit, netProfit)
p.Strategy = e.strategy
p.StrategyInstanceID = e.strategyInstanceID
profitStats.AddProfit(p)
notify(&profitStats)
})
e.tradeCollector.OnPositionUpdate(func(position *types.Position) {
log.Infof("position changed: %s", position)
notify(position)
})
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
var instanceID = s.InstanceID()
// initial required information // initial required information
s.session = session s.session = session
s.activeMakerOrders = bbgo.NewActiveOrderBook(s.Symbol) s.orderExecutor = NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID)
s.activeMakerOrders.BindStream(session.UserDataStream)
s.orderStore = bbgo.NewOrderStore(s.Symbol) // TODO: migrate this
s.orderStore.BindStream(session.UserDataStream) s.activeMakerOrders = s.orderExecutor.activeMakerOrders
s.orderStore = s.orderExecutor.orderStore
if s.Position == nil { if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market) s.Position = types.NewPositionFromMarket(s.Market)
@ -262,34 +324,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.ProfitStats = types.NewProfitStats(s.Market) s.ProfitStats = types.NewProfitStats(s.Market)
} }
s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify)
if s.TradeStats == nil { if s.TradeStats == nil {
s.TradeStats = &TradeStats{} s.TradeStats = &TradeStats{}
} }
instanceID := s.InstanceID() s.tradeCollector = s.orderExecutor.tradeCollector
// Always update the position fields
s.Position.Strategy = ID
s.Position.StrategyInstanceID = instanceID
s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore)
// trade notify
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
s.Notifiability.Notify(trade)
})
// profit stats
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
s.ProfitStats.AddTrade(trade)
if !profit.IsZero() {
p := s.Position.NewProfit(trade, profit, netProfit)
p.Strategy = ID
p.StrategyInstanceID = instanceID
s.ProfitStats.AddProfit(p)
s.Notify(&s.ProfitStats)
}
})
// trade stats // trade stats
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
@ -312,10 +353,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
} }
}) })
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
log.Infof("position changed: %s", s.Position)
s.Notify(s.Position)
})
s.tradeCollector.BindStream(session.UserDataStream) s.tradeCollector.BindStream(session.UserDataStream)
store, _ := session.MarketDataStore(s.Symbol) store, _ := session.MarketDataStore(s.Symbol)

View File

@ -112,11 +112,13 @@ func (p *Position) NewProfit(trade Trade, profit, netProfit fixedpoint.Value) Pr
Fee: trade.Fee, Fee: trade.Fee,
FeeCurrency: trade.FeeCurrency, FeeCurrency: trade.FeeCurrency,
Exchange: trade.Exchange, Exchange: trade.Exchange,
IsMargin: trade.IsMargin, IsMargin: trade.IsMargin,
IsFutures: trade.IsFutures, IsFutures: trade.IsFutures,
IsIsolated: trade.IsIsolated, IsIsolated: trade.IsIsolated,
TradedAt: trade.Time.Time(), TradedAt: trade.Time.Time(),
Strategy: p.Strategy,
StrategyInstanceID: p.StrategyInstanceID,
} }
} }