diff --git a/pkg/bbgo/order_execution.go b/pkg/bbgo/order_execution.go index 940b69df5..0b5edf1a4 100644 --- a/pkg/bbgo/order_execution.go +++ b/pkg/bbgo/order_execution.go @@ -44,7 +44,7 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi return nil, fmt.Errorf("exchange session %s not found", session) } - formattedOrders, err := formatOrders(es, orders) + formattedOrders, err := es.FormatOrders(orders) if err != nil { return nil, err } @@ -93,7 +93,7 @@ func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder) } func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) { - formattedOrders, err := formatOrders(e.Session, orders) + formattedOrders, err := e.Session.FormatOrders(orders) if err != nil { return nil, err } @@ -314,18 +314,6 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ... return outOrders, nil } -func formatOrders(session *ExchangeSession, orders []types.SubmitOrder) (formattedOrders []types.SubmitOrder, err error) { - for _, order := range orders { - o, err := session.FormatOrder(order) - if err != nil { - return formattedOrders, err - } - formattedOrders = append(formattedOrders, o) - } - - return formattedOrders, err -} - func max(a, b int64) int64 { if a > b { return a diff --git a/pkg/bbgo/risk_controls.go b/pkg/bbgo/risk_controls.go index 266c0ca01..121a6fc03 100644 --- a/pkg/bbgo/risk_controls.go +++ b/pkg/bbgo/risk_controls.go @@ -32,7 +32,7 @@ func (e *RiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders ...t } } - formattedOrders, err := formatOrders(e.Session, orders) + formattedOrders, err := e.Session.FormatOrders(orders) if err != nil { return retOrders, err } diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 1deaed260..67441e831 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -973,3 +973,15 @@ func (session *ExchangeSession) SlackAttachment() slack.Attachment { Footer: util.Render("update time {{ . }}", time.Now().Format(time.RFC822)), } } + +func (session *ExchangeSession) FormatOrders(orders []types.SubmitOrder) (formattedOrders []types.SubmitOrder, err error) { + for _, order := range orders { + o, err := session.FormatOrder(order) + if err != nil { + return formattedOrders, err + } + formattedOrders = append(formattedOrders, o) + } + + return formattedOrders, err +} diff --git a/pkg/bbgo/tradecollector.go b/pkg/bbgo/tradecollector.go index 1d4e8d541..0c23798f3 100644 --- a/pkg/bbgo/tradecollector.go +++ b/pkg/bbgo/tradecollector.go @@ -22,10 +22,12 @@ type TradeCollector struct { orderStore *OrderStore doneTrades map[types.TradeKey]struct{} - recoverCallbacks []func(trade types.Trade) - tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) + recoverCallbacks []func(trade types.Trade) + + tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) + positionUpdateCallbacks []func(position *types.Position) - profitCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) + profitCallbacks []func(trade types.Trade, profit *types.Profit) } func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { @@ -114,15 +116,13 @@ func (c *TradeCollector) Process() bool { if c.orderStore.Exists(trade.OrderID) { c.doneTrades[key] = struct{}{} profit, netProfit, madeProfit := c.position.AddTrade(trade) - if madeProfit { p := c.position.NewProfit(trade, profit, netProfit) - _ = p - c.EmitTrade(trade, profit, netProfit) - c.EmitProfit(trade, profit, netProfit) + c.EmitProfit(trade, &p) } else { c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + c.EmitProfit(trade, nil) } positionChanged = true return true @@ -149,11 +149,14 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool { return false } - if profit, netProfit, madeProfit := c.position.AddTrade(trade); madeProfit { + profit, netProfit, madeProfit := c.position.AddTrade(trade) + if madeProfit { + p := c.position.NewProfit(trade, profit, netProfit) c.EmitTrade(trade, profit, netProfit) - c.EmitProfit(trade, profit, netProfit) + c.EmitProfit(trade, &p) } else { c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + c.EmitProfit(trade, nil) } c.EmitPositionUpdate(c.position) c.doneTrades[key] = struct{}{} diff --git a/pkg/bbgo/tradecollector_callbacks.go b/pkg/bbgo/tradecollector_callbacks.go index af8bf1bd1..44756224f 100644 --- a/pkg/bbgo/tradecollector_callbacks.go +++ b/pkg/bbgo/tradecollector_callbacks.go @@ -37,12 +37,12 @@ func (c *TradeCollector) EmitPositionUpdate(position *types.Position) { } } -func (c *TradeCollector) OnProfit(cb func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value)) { +func (c *TradeCollector) OnProfit(cb func(trade types.Trade, profit *types.Profit)) { c.profitCallbacks = append(c.profitCallbacks, cb) } -func (c *TradeCollector) EmitProfit(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { +func (c *TradeCollector) EmitProfit(trade types.Trade, profit *types.Profit) { for _, cb := range c.profitCallbacks { - cb(trade, profit, netProfit) + cb(trade, profit) } } diff --git a/pkg/strategy/pivotshort/strategy.go b/pkg/strategy/pivotshort/strategy.go index 4c5567670..efd18c9f6 100644 --- a/pkg/strategy/pivotshort/strategy.go +++ b/pkg/strategy/pivotshort/strategy.go @@ -138,10 +138,6 @@ type Strategy struct { Entry Entry `json:"entry"` Exit Exit `json:"exit"` - activeMakerOrders *bbgo.ActiveOrderBook - orderStore *bbgo.OrderStore - tradeCollector *bbgo.TradeCollector - session *bbgo.ExchangeSession orderExecutor *GeneralOrderExecutor @@ -171,17 +167,6 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { } } -func (s *Strategy) submitOrders(ctx context.Context, orderExecutor bbgo.OrderExecutor, submitOrders ...types.SubmitOrder) { - createdOrders, err := orderExecutor.SubmitOrders(ctx, submitOrders...) - if err != nil { - log.WithError(err).Errorf("can not place orders") - } - - s.orderStore.Add(createdOrders...) - s.activeMakerOrders.Add(createdOrders...) - s.tradeCollector.Process() -} - func (s *Strategy) useQuantityOrBaseBalance(quantity fixedpoint.Value) fixedpoint.Value { if quantity.IsZero() { if balance, ok := s.session.Account.Balance(s.Market.BaseCurrency); ok { @@ -197,8 +182,8 @@ func (s *Strategy) useQuantityOrBaseBalance(quantity fixedpoint.Value) fixedpoin return quantity } -func (s *Strategy) placeLimitSell(ctx context.Context, orderExecutor bbgo.OrderExecutor, price, quantity fixedpoint.Value) { - s.submitOrders(ctx, orderExecutor, types.SubmitOrder{ +func (s *Strategy) placeLimitSell(ctx context.Context, price, quantity fixedpoint.Value) { + _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Symbol: s.Symbol, Price: price, Side: types.SideTypeSell, @@ -208,8 +193,8 @@ func (s *Strategy) placeLimitSell(ctx context.Context, orderExecutor bbgo.OrderE }) } -func (s *Strategy) placeMarketSell(ctx context.Context, orderExecutor bbgo.OrderExecutor, quantity fixedpoint.Value) { - s.submitOrders(ctx, orderExecutor, types.SubmitOrder{ +func (s *Strategy) placeMarketSell(ctx context.Context, quantity fixedpoint.Value) { + _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ Symbol: s.Symbol, Side: types.SideTypeSell, Type: types.OrderTypeMarket, @@ -229,16 +214,7 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu } s.Notify("Closing %s position by %f", s.Symbol, percentage.Float64()) - - createdOrders, err := s.session.Exchange.SubmitOrders(ctx, *submitOrder) - if err != nil { - log.WithError(err).Errorf("can not place position close order") - } - - s.orderStore.Add(createdOrders...) - s.activeMakerOrders.Add(createdOrders...) - s.tradeCollector.Process() - return err + return s.orderExecutor.SubmitOrders(ctx, *submitOrder) } func (s *Strategy) InstanceID() string { @@ -302,6 +278,34 @@ func (e *GeneralOrderExecutor) Bind(position *types.Position, profitStats *types log.Infof("position changed: %s", position) notify(position) }) + + e.tradeCollector.BindStream(e.session.UserDataStream) +} + +func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) error { + formattedOrders, err := e.session.FormatOrders(submitOrders) + if err != nil { + return err + } + + createdOrders, err := e.session.Exchange.SubmitOrders(ctx, formattedOrders...) + if err != nil { + log.WithError(err).Errorf("can not place orders") + } + + e.orderStore.Add(createdOrders...) + e.activeMakerOrders.Add(createdOrders...) + e.tradeCollector.Process() + return err +} + +func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context) error { + if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange); err != nil { + log.WithError(err).Errorf("graceful cancel order error") + return err + } + + return nil } func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { @@ -312,10 +316,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.orderExecutor = NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID) - // TODO: migrate this - s.activeMakerOrders = s.orderExecutor.activeMakerOrders - s.orderStore = s.orderExecutor.orderStore - if s.Position == nil { s.Position = types.NewPositionFromMarket(s.Market) } @@ -324,36 +324,24 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.ProfitStats = types.NewProfitStats(s.Market) } - s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify) + // position recorder + s.orderExecutor.tradeCollector.OnProfit(func(trade types.Trade, profit *types.Profit) { + s.Environment.RecordPosition(s.Position, trade, profit) + // s.Notify(&p) + }) + // trade stats if s.TradeStats == nil { s.TradeStats = &TradeStats{} } - s.tradeCollector = s.orderExecutor.tradeCollector - - // trade stats - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { + s.orderExecutor.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { if profit.IsZero() { s.TradeStats.Add(profit) } }) - // position recorder - s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) { - if profit.IsZero() { - s.Environment.RecordPosition(s.Position, trade, nil) - } else { - log.Infof("%s generated profit: %v", s.Symbol, profit) - p := s.Position.NewProfit(trade, profit, netProfit) - p.Strategy = ID - p.StrategyInstanceID = instanceID - s.Notify(&p) - s.Environment.RecordPosition(s.Position, trade, &p) - } - }) - - s.tradeCollector.BindStream(session.UserDataStream) + s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify) store, _ := session.MarketDataStore(s.Symbol) @@ -395,9 +383,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if resistancePrice.Compare(s.currentBounceShortPrice) != 0 { log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices) - if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { - log.WithError(err).Errorf("graceful cancel order error") - } + _ = s.orderExecutor.GracefulCancel(ctx) + s.currentBounceShortPrice = resistancePrice s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor) } @@ -498,17 +485,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return } - if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { - log.WithError(err).Errorf("graceful cancel order error") - } + _ = s.orderExecutor.GracefulCancel(ctx) quantity := s.useQuantityOrBaseBalance(s.BreakLow.Quantity) if s.BreakLow.MarketOrder { s.Notify("%s price %f breaks the previous low %f with ratio %f, submitting market sell to open a short position", s.Symbol, kline.Close.Float64(), previousLow.Float64(), s.BreakLow.Ratio.Float64()) - s.placeMarketSell(ctx, orderExecutor, quantity) + s.placeMarketSell(ctx, quantity) } else { sellPrice := kline.Close.Mul(fixedpoint.One.Add(s.BreakLow.BounceRatio)) - s.placeLimitSell(ctx, orderExecutor, sellPrice, quantity) + s.placeLimitSell(ctx, sellPrice, quantity) } }) @@ -532,9 +517,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if resistancePrice.Compare(s.currentBounceShortPrice) != 0 { log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices) - if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { - log.WithError(err).Errorf("graceful cancel order error") - } + _ = s.orderExecutor.GracefulCancel(ctx) + s.currentBounceShortPrice = resistancePrice s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor) } @@ -567,9 +551,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se } func (s *Strategy) closePosition(ctx context.Context) { - if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { - log.WithError(err).Errorf("graceful cancel order error") - } + _ = s.orderExecutor.GracefulCancel(ctx) if err := s.ClosePosition(ctx, fixedpoint.One); err != nil { log.WithError(err).Errorf("close position error") @@ -634,7 +616,7 @@ func (s *Strategy) placeOrder(ctx context.Context, price fixedpoint.Value, quant Price: price, Quantity: quantity, } - s.submitOrders(ctx, orderExecutor, submitOrder) + _ = s.orderExecutor.SubmitOrders(ctx, submitOrder) } func (s *Strategy) preloadPivot(pivot *indicator.Pivot, store *bbgo.MarketDataStore) *types.KLine {