diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 6a815ce7e..f647f7ec6 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -590,31 +590,45 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) - // TODO: improve order executor - orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession} - returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ - Market: s.sourceMarket, - Symbol: s.Symbol, - Type: types.OrderTypeMarket, - Side: side, - Quantity: quantity, - MarginSideEffect: types.SideEffectTypeMarginBuy, - }) + submitOrders := []types.SubmitOrder{ + { + Market: s.sourceMarket, + Symbol: s.Symbol, + Type: types.OrderTypeMarket, + Side: side, + Quantity: quantity, + MarginSideEffect: types.SideEffectTypeMarginBuy, + }, + } + formattedOrders, err := s.sourceSession.FormatOrders(submitOrders) + if err != nil { + log.WithError(err).Errorf("unable to format orders") + return + } + + orderCreateCallback := func(createdOrder types.Order) { + s.orderStore.Add(createdOrder) + s.activeMakerOrders.Add(createdOrder) + } + + defer s.tradeCollector.Process() + + createdOrders, _, err := bbgo.BatchPlaceOrder(ctx, s.sourceSession.Exchange, orderCreateCallback, formattedOrders...) if err != nil { s.hedgeErrorRateReservation = s.hedgeErrorLimiter.Reserve() log.WithError(err).Errorf("market order submit error: %s", err.Error()) return } - // if it's selling, than we should add positive position + log.Infof("submitted hedge orders: %+v", createdOrders) + + // if it's selling, then we should add a positive position if side == types.SideTypeSell { s.CoveredPosition = s.CoveredPosition.Add(quantity) } else { s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg()) } - - s.orderStore.Add(returnOrders...) } func (s *Strategy) tradeRecover(ctx context.Context) { @@ -831,29 +845,28 @@ func (s *Strategy) CrossRun( if profit.Compare(fixedpoint.Zero) == 0 { s.Environment.RecordPosition(s.Position, trade, nil) - } else { - log.Infof("%s generated profit: %v", s.Symbol, profit) - - p := s.Position.NewProfit(trade, profit, netProfit) - p.Strategy = ID - p.StrategyInstanceID = instanceID - bbgo.Notify(&p) - s.ProfitStats.AddProfit(p) - - s.Environment.RecordPosition(s.Position, trade, &p) - - if s.CircuitBreaker != nil { - s.CircuitBreaker.RecordProfit(profit, trade.Time.Time()) - } } }) + s.tradeCollector.OnProfit(func(trade types.Trade, profit *types.Profit) { + if s.CircuitBreaker != nil { + s.CircuitBreaker.RecordProfit(profit.Profit, trade.Time.Time()) + } + + bbgo.Notify(profit) + + s.ProfitStats.AddProfit(*profit) + s.Environment.RecordPosition(s.Position, trade, profit) + }) + s.tradeCollector.OnPositionUpdate(func(position *types.Position) { bbgo.Notify(position) }) s.tradeCollector.OnRecover(func(trade types.Trade) { bbgo.Notify("Recovered trade", trade) }) + + // bind two user data streams so that we can collect the trades together s.tradeCollector.BindStream(s.sourceSession.UserDataStream) s.tradeCollector.BindStream(s.makerSession.UserDataStream)