diff --git a/pkg/bbgo/tradecollector.go b/pkg/bbgo/tradecollector.go index c648ec48c..1d4e8d541 100644 --- a/pkg/bbgo/tradecollector.go +++ b/pkg/bbgo/tradecollector.go @@ -51,6 +51,10 @@ func (c *TradeCollector) Position() *types.Position { return c.position } +func (c *TradeCollector) SetPosition(position *types.Position) { + c.position = position +} + // QueueTrade sends the trade object to the trade channel, // so that the goroutine can receive the trade and process in the background. func (c *TradeCollector) QueueTrade(trade types.Trade) { @@ -109,7 +113,12 @@ func (c *TradeCollector) Process() bool { if c.orderStore.Exists(trade.OrderID) { c.doneTrades[key] = struct{}{} - if profit, netProfit, madeProfit := c.position.AddTrade(trade); madeProfit { + profit, netProfit, madeProfit := c.position.AddTrade(trade) + + if madeProfit { + p := c.position.NewProfit(trade, profit, netProfit) + _ = p + c.EmitTrade(trade, profit, netProfit) c.EmitProfit(trade, profit, netProfit) } else { diff --git a/pkg/strategy/pivotshort/strategy.go b/pkg/strategy/pivotshort/strategy.go index 9d28babe3..4c5567670 100644 --- a/pkg/strategy/pivotshort/strategy.go +++ b/pkg/strategy/pivotshort/strategy.go @@ -142,7 +142,8 @@ type Strategy struct { orderStore *bbgo.OrderStore tradeCollector *bbgo.TradeCollector - session *bbgo.ExchangeSession + session *bbgo.ExchangeSession + orderExecutor *GeneralOrderExecutor lastLow fixedpoint.Value pivot *indicator.Pivot @@ -244,15 +245,76 @@ func (s *Strategy) InstanceID() string { return fmt.Sprintf("%s:%s", ID, s.Symbol) } +// GeneralOrderExecutor implements the general order executor for strategy +type GeneralOrderExecutor struct { + session *bbgo.ExchangeSession + symbol string + strategy string + strategyInstanceID string + activeMakerOrders *bbgo.ActiveOrderBook + orderStore *bbgo.OrderStore + tradeCollector *bbgo.TradeCollector +} + +func NewGeneralOrderExecutor(session *bbgo.ExchangeSession, symbol, strategy, strategyInstanceID string) *GeneralOrderExecutor { + orderStore := bbgo.NewOrderStore(symbol) + return &GeneralOrderExecutor{ + session: session, + symbol: symbol, + strategy: strategy, + strategyInstanceID: strategyInstanceID, + activeMakerOrders: bbgo.NewActiveOrderBook(symbol), + orderStore: orderStore, + tradeCollector: bbgo.NewTradeCollector(symbol, nil, orderStore), + } +} + +func (e *GeneralOrderExecutor) Bind(position *types.Position, profitStats *types.ProfitStats, notify func(obj interface{}, args ...interface{})) { + // Always update the position fields + position.Strategy = e.strategy + position.StrategyInstanceID = e.strategyInstanceID + + e.activeMakerOrders.BindStream(e.session.UserDataStream) + e.orderStore.BindStream(e.session.UserDataStream) + e.tradeCollector.SetPosition(position) + + // trade notify + e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { + notify(trade) + }) + + // profit stats + e.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { + profitStats.AddTrade(trade) + + if profit.IsZero() { + return + } + + p := position.NewProfit(trade, profit, netProfit) + p.Strategy = e.strategy + p.StrategyInstanceID = e.strategyInstanceID + profitStats.AddProfit(p) + notify(&profitStats) + }) + + e.tradeCollector.OnPositionUpdate(func(position *types.Position) { + log.Infof("position changed: %s", position) + notify(position) + }) +} + func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { + var instanceID = s.InstanceID() + // initial required information s.session = session - s.activeMakerOrders = bbgo.NewActiveOrderBook(s.Symbol) - s.activeMakerOrders.BindStream(session.UserDataStream) + s.orderExecutor = NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID) - s.orderStore = bbgo.NewOrderStore(s.Symbol) - s.orderStore.BindStream(session.UserDataStream) + // TODO: migrate this + s.activeMakerOrders = s.orderExecutor.activeMakerOrders + s.orderStore = s.orderExecutor.orderStore if s.Position == nil { s.Position = types.NewPositionFromMarket(s.Market) @@ -262,34 +324,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.ProfitStats = types.NewProfitStats(s.Market) } + s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify) + if s.TradeStats == nil { s.TradeStats = &TradeStats{} } - instanceID := s.InstanceID() - - // Always update the position fields - s.Position.Strategy = ID - s.Position.StrategyInstanceID = instanceID - - s.tradeCollector = bbgo.NewTradeCollector(s.Symbol, s.Position, s.orderStore) - - // trade notify - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { - s.Notifiability.Notify(trade) - }) - - // profit stats - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { - s.ProfitStats.AddTrade(trade) - if !profit.IsZero() { - p := s.Position.NewProfit(trade, profit, netProfit) - p.Strategy = ID - p.StrategyInstanceID = instanceID - s.ProfitStats.AddProfit(p) - s.Notify(&s.ProfitStats) - } - }) + s.tradeCollector = s.orderExecutor.tradeCollector // trade stats s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { @@ -312,10 +353,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se } }) - s.tradeCollector.OnPositionUpdate(func(position *types.Position) { - log.Infof("position changed: %s", s.Position) - s.Notify(s.Position) - }) s.tradeCollector.BindStream(session.UserDataStream) store, _ := session.MarketDataStore(s.Symbol) diff --git a/pkg/types/position.go b/pkg/types/position.go index f18daa63c..9a576d73b 100644 --- a/pkg/types/position.go +++ b/pkg/types/position.go @@ -112,11 +112,13 @@ func (p *Position) NewProfit(trade Trade, profit, netProfit fixedpoint.Value) Pr Fee: trade.Fee, FeeCurrency: trade.FeeCurrency, - Exchange: trade.Exchange, - IsMargin: trade.IsMargin, - IsFutures: trade.IsFutures, - IsIsolated: trade.IsIsolated, - TradedAt: trade.Time.Time(), + Exchange: trade.Exchange, + IsMargin: trade.IsMargin, + IsFutures: trade.IsFutures, + IsIsolated: trade.IsIsolated, + TradedAt: trade.Time.Time(), + Strategy: p.Strategy, + StrategyInstanceID: p.StrategyInstanceID, } }