pivotshort: refactor and redesign order executor

Signed-off-by: c9s <yoanlin93@gmail.com>
This commit is contained in:
c9s 2022-06-18 12:30:42 +08:00
parent 0326c34013
commit 47e76a9eb5
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
6 changed files with 80 additions and 95 deletions

View File

@ -44,7 +44,7 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi
return nil, fmt.Errorf("exchange session %s not found", session) return nil, fmt.Errorf("exchange session %s not found", session)
} }
formattedOrders, err := formatOrders(es, orders) formattedOrders, err := es.FormatOrders(orders)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -93,7 +93,7 @@ func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder)
} }
func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) { func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := formatOrders(e.Session, orders) formattedOrders, err := e.Session.FormatOrders(orders)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -314,18 +314,6 @@ func (c *BasicRiskController) ProcessOrders(session *ExchangeSession, orders ...
return outOrders, nil return outOrders, nil
} }
func formatOrders(session *ExchangeSession, orders []types.SubmitOrder) (formattedOrders []types.SubmitOrder, err error) {
for _, order := range orders {
o, err := session.FormatOrder(order)
if err != nil {
return formattedOrders, err
}
formattedOrders = append(formattedOrders, o)
}
return formattedOrders, err
}
func max(a, b int64) int64 { func max(a, b int64) int64 {
if a > b { if a > b {
return a return a

View File

@ -32,7 +32,7 @@ func (e *RiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders ...t
} }
} }
formattedOrders, err := formatOrders(e.Session, orders) formattedOrders, err := e.Session.FormatOrders(orders)
if err != nil { if err != nil {
return retOrders, err return retOrders, err
} }

View File

@ -973,3 +973,15 @@ func (session *ExchangeSession) SlackAttachment() slack.Attachment {
Footer: util.Render("update time {{ . }}", time.Now().Format(time.RFC822)), Footer: util.Render("update time {{ . }}", time.Now().Format(time.RFC822)),
} }
} }
func (session *ExchangeSession) FormatOrders(orders []types.SubmitOrder) (formattedOrders []types.SubmitOrder, err error) {
for _, order := range orders {
o, err := session.FormatOrder(order)
if err != nil {
return formattedOrders, err
}
formattedOrders = append(formattedOrders, o)
}
return formattedOrders, err
}

View File

@ -22,10 +22,12 @@ type TradeCollector struct {
orderStore *OrderStore orderStore *OrderStore
doneTrades map[types.TradeKey]struct{} doneTrades map[types.TradeKey]struct{}
recoverCallbacks []func(trade types.Trade) recoverCallbacks []func(trade types.Trade)
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)
positionUpdateCallbacks []func(position *types.Position) positionUpdateCallbacks []func(position *types.Position)
profitCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value) profitCallbacks []func(trade types.Trade, profit *types.Profit)
} }
func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector { func NewTradeCollector(symbol string, position *types.Position, orderStore *OrderStore) *TradeCollector {
@ -114,15 +116,13 @@ func (c *TradeCollector) Process() bool {
if c.orderStore.Exists(trade.OrderID) { if c.orderStore.Exists(trade.OrderID) {
c.doneTrades[key] = struct{}{} c.doneTrades[key] = struct{}{}
profit, netProfit, madeProfit := c.position.AddTrade(trade) profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit { if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit) p := c.position.NewProfit(trade, profit, netProfit)
_ = p
c.EmitTrade(trade, profit, netProfit) c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, profit, netProfit) c.EmitProfit(trade, &p)
} else { } else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
} }
positionChanged = true positionChanged = true
return true return true
@ -149,11 +149,14 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
return false return false
} }
if profit, netProfit, madeProfit := c.position.AddTrade(trade); madeProfit { profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit) c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, profit, netProfit) c.EmitProfit(trade, &p)
} else { } else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
} }
c.EmitPositionUpdate(c.position) c.EmitPositionUpdate(c.position)
c.doneTrades[key] = struct{}{} c.doneTrades[key] = struct{}{}

View File

@ -37,12 +37,12 @@ func (c *TradeCollector) EmitPositionUpdate(position *types.Position) {
} }
} }
func (c *TradeCollector) OnProfit(cb func(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value)) { func (c *TradeCollector) OnProfit(cb func(trade types.Trade, profit *types.Profit)) {
c.profitCallbacks = append(c.profitCallbacks, cb) c.profitCallbacks = append(c.profitCallbacks, cb)
} }
func (c *TradeCollector) EmitProfit(trade types.Trade, profit fixedpoint.Value, netProfit fixedpoint.Value) { func (c *TradeCollector) EmitProfit(trade types.Trade, profit *types.Profit) {
for _, cb := range c.profitCallbacks { for _, cb := range c.profitCallbacks {
cb(trade, profit, netProfit) cb(trade, profit)
} }
} }

View File

@ -138,10 +138,6 @@ type Strategy struct {
Entry Entry `json:"entry"` Entry Entry `json:"entry"`
Exit Exit `json:"exit"` Exit Exit `json:"exit"`
activeMakerOrders *bbgo.ActiveOrderBook
orderStore *bbgo.OrderStore
tradeCollector *bbgo.TradeCollector
session *bbgo.ExchangeSession session *bbgo.ExchangeSession
orderExecutor *GeneralOrderExecutor orderExecutor *GeneralOrderExecutor
@ -171,17 +167,6 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
} }
} }
func (s *Strategy) submitOrders(ctx context.Context, orderExecutor bbgo.OrderExecutor, submitOrders ...types.SubmitOrder) {
createdOrders, err := orderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil {
log.WithError(err).Errorf("can not place orders")
}
s.orderStore.Add(createdOrders...)
s.activeMakerOrders.Add(createdOrders...)
s.tradeCollector.Process()
}
func (s *Strategy) useQuantityOrBaseBalance(quantity fixedpoint.Value) fixedpoint.Value { func (s *Strategy) useQuantityOrBaseBalance(quantity fixedpoint.Value) fixedpoint.Value {
if quantity.IsZero() { if quantity.IsZero() {
if balance, ok := s.session.Account.Balance(s.Market.BaseCurrency); ok { if balance, ok := s.session.Account.Balance(s.Market.BaseCurrency); ok {
@ -197,8 +182,8 @@ func (s *Strategy) useQuantityOrBaseBalance(quantity fixedpoint.Value) fixedpoin
return quantity return quantity
} }
func (s *Strategy) placeLimitSell(ctx context.Context, orderExecutor bbgo.OrderExecutor, price, quantity fixedpoint.Value) { func (s *Strategy) placeLimitSell(ctx context.Context, price, quantity fixedpoint.Value) {
s.submitOrders(ctx, orderExecutor, types.SubmitOrder{ _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol, Symbol: s.Symbol,
Price: price, Price: price,
Side: types.SideTypeSell, Side: types.SideTypeSell,
@ -208,8 +193,8 @@ func (s *Strategy) placeLimitSell(ctx context.Context, orderExecutor bbgo.OrderE
}) })
} }
func (s *Strategy) placeMarketSell(ctx context.Context, orderExecutor bbgo.OrderExecutor, quantity fixedpoint.Value) { func (s *Strategy) placeMarketSell(ctx context.Context, quantity fixedpoint.Value) {
s.submitOrders(ctx, orderExecutor, types.SubmitOrder{ _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol, Symbol: s.Symbol,
Side: types.SideTypeSell, Side: types.SideTypeSell,
Type: types.OrderTypeMarket, Type: types.OrderTypeMarket,
@ -229,16 +214,7 @@ func (s *Strategy) ClosePosition(ctx context.Context, percentage fixedpoint.Valu
} }
s.Notify("Closing %s position by %f", s.Symbol, percentage.Float64()) s.Notify("Closing %s position by %f", s.Symbol, percentage.Float64())
return s.orderExecutor.SubmitOrders(ctx, *submitOrder)
createdOrders, err := s.session.Exchange.SubmitOrders(ctx, *submitOrder)
if err != nil {
log.WithError(err).Errorf("can not place position close order")
}
s.orderStore.Add(createdOrders...)
s.activeMakerOrders.Add(createdOrders...)
s.tradeCollector.Process()
return err
} }
func (s *Strategy) InstanceID() string { func (s *Strategy) InstanceID() string {
@ -302,6 +278,34 @@ func (e *GeneralOrderExecutor) Bind(position *types.Position, profitStats *types
log.Infof("position changed: %s", position) log.Infof("position changed: %s", position)
notify(position) notify(position)
}) })
e.tradeCollector.BindStream(e.session.UserDataStream)
}
func (e *GeneralOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) error {
formattedOrders, err := e.session.FormatOrders(submitOrders)
if err != nil {
return err
}
createdOrders, err := e.session.Exchange.SubmitOrders(ctx, formattedOrders...)
if err != nil {
log.WithError(err).Errorf("can not place orders")
}
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
return err
}
func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context) error {
if err := e.activeMakerOrders.GracefulCancel(ctx, e.session.Exchange); err != nil {
log.WithError(err).Errorf("graceful cancel order error")
return err
}
return nil
} }
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
@ -312,10 +316,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.orderExecutor = NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID) s.orderExecutor = NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID)
// TODO: migrate this
s.activeMakerOrders = s.orderExecutor.activeMakerOrders
s.orderStore = s.orderExecutor.orderStore
if s.Position == nil { if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market) s.Position = types.NewPositionFromMarket(s.Market)
} }
@ -324,36 +324,24 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.ProfitStats = types.NewProfitStats(s.Market) s.ProfitStats = types.NewProfitStats(s.Market)
} }
s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify) // position recorder
s.orderExecutor.tradeCollector.OnProfit(func(trade types.Trade, profit *types.Profit) {
s.Environment.RecordPosition(s.Position, trade, profit)
// s.Notify(&p)
})
// trade stats
if s.TradeStats == nil { if s.TradeStats == nil {
s.TradeStats = &TradeStats{} s.TradeStats = &TradeStats{}
} }
s.tradeCollector = s.orderExecutor.tradeCollector s.orderExecutor.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
// trade stats
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
if profit.IsZero() { if profit.IsZero() {
s.TradeStats.Add(profit) s.TradeStats.Add(profit)
} }
}) })
// position recorder s.orderExecutor.Bind(s.Position, s.ProfitStats, s.Notifiability.Notify)
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
if profit.IsZero() {
s.Environment.RecordPosition(s.Position, trade, nil)
} else {
log.Infof("%s generated profit: %v", s.Symbol, profit)
p := s.Position.NewProfit(trade, profit, netProfit)
p.Strategy = ID
p.StrategyInstanceID = instanceID
s.Notify(&p)
s.Environment.RecordPosition(s.Position, trade, &p)
}
})
s.tradeCollector.BindStream(session.UserDataStream)
store, _ := session.MarketDataStore(s.Symbol) store, _ := session.MarketDataStore(s.Symbol)
@ -395,9 +383,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
if resistancePrice.Compare(s.currentBounceShortPrice) != 0 { if resistancePrice.Compare(s.currentBounceShortPrice) != 0 {
log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices) log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices)
if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { _ = s.orderExecutor.GracefulCancel(ctx)
log.WithError(err).Errorf("graceful cancel order error")
}
s.currentBounceShortPrice = resistancePrice s.currentBounceShortPrice = resistancePrice
s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor) s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor)
} }
@ -498,17 +485,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
return return
} }
if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { _ = s.orderExecutor.GracefulCancel(ctx)
log.WithError(err).Errorf("graceful cancel order error")
}
quantity := s.useQuantityOrBaseBalance(s.BreakLow.Quantity) quantity := s.useQuantityOrBaseBalance(s.BreakLow.Quantity)
if s.BreakLow.MarketOrder { if s.BreakLow.MarketOrder {
s.Notify("%s price %f breaks the previous low %f with ratio %f, submitting market sell to open a short position", s.Symbol, kline.Close.Float64(), previousLow.Float64(), s.BreakLow.Ratio.Float64()) s.Notify("%s price %f breaks the previous low %f with ratio %f, submitting market sell to open a short position", s.Symbol, kline.Close.Float64(), previousLow.Float64(), s.BreakLow.Ratio.Float64())
s.placeMarketSell(ctx, orderExecutor, quantity) s.placeMarketSell(ctx, quantity)
} else { } else {
sellPrice := kline.Close.Mul(fixedpoint.One.Add(s.BreakLow.BounceRatio)) sellPrice := kline.Close.Mul(fixedpoint.One.Add(s.BreakLow.BounceRatio))
s.placeLimitSell(ctx, orderExecutor, sellPrice, quantity) s.placeLimitSell(ctx, sellPrice, quantity)
} }
}) })
@ -532,9 +517,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
if resistancePrice.Compare(s.currentBounceShortPrice) != 0 { if resistancePrice.Compare(s.currentBounceShortPrice) != 0 {
log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices) log.Infof("updating resistance price... possible resistance prices: %+v", s.resistancePrices)
if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { _ = s.orderExecutor.GracefulCancel(ctx)
log.WithError(err).Errorf("graceful cancel order error")
}
s.currentBounceShortPrice = resistancePrice s.currentBounceShortPrice = resistancePrice
s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor) s.placeBounceSellOrders(ctx, s.currentBounceShortPrice, orderExecutor)
} }
@ -567,9 +551,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
} }
func (s *Strategy) closePosition(ctx context.Context) { func (s *Strategy) closePosition(ctx context.Context) {
if err := s.activeMakerOrders.GracefulCancel(ctx, s.session.Exchange); err != nil { _ = s.orderExecutor.GracefulCancel(ctx)
log.WithError(err).Errorf("graceful cancel order error")
}
if err := s.ClosePosition(ctx, fixedpoint.One); err != nil { if err := s.ClosePosition(ctx, fixedpoint.One); err != nil {
log.WithError(err).Errorf("close position error") log.WithError(err).Errorf("close position error")
@ -634,7 +616,7 @@ func (s *Strategy) placeOrder(ctx context.Context, price fixedpoint.Value, quant
Price: price, Price: price,
Quantity: quantity, Quantity: quantity,
} }
s.submitOrders(ctx, orderExecutor, submitOrder) _ = s.orderExecutor.SubmitOrders(ctx, submitOrder)
} }
func (s *Strategy) preloadPivot(pivot *indicator.Pivot, store *bbgo.MarketDataStore) *types.KLine { func (s *Strategy) preloadPivot(pivot *indicator.Pivot, store *bbgo.MarketDataStore) *types.KLine {