Merge pull request #485 from zenixls2/feature/backtest_sig

feature: add CancelOrders and CancelOrdersTo to executor
This commit is contained in:
Yo-An Lin 2022-03-16 21:22:32 +08:00 committed by GitHub
commit fae4f181b5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 63 additions and 33 deletions

View File

@ -13,6 +13,7 @@ import (
type OrderExecutor interface { type OrderExecutor interface {
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
CancelOrders(ctx context.Context, orders ...types.Order) error
OnTradeUpdate(cb func(trade types.Trade)) OnTradeUpdate(cb func(trade types.Trade))
OnOrderUpdate(cb func(order types.Order)) OnOrderUpdate(cb func(order types.Order))
@ -23,6 +24,7 @@ type OrderExecutor interface {
type OrderExecutionRouter interface { type OrderExecutionRouter interface {
// SubmitOrdersTo submit order to a specific exchange Session // SubmitOrdersTo submit order to a specific exchange Session
SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
CancelOrdersTo(ctx context.Context, session string, orders ...types.Order) error
} }
type ExchangeOrderExecutionRouter struct { type ExchangeOrderExecutionRouter struct {
@ -50,6 +52,18 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi
return es.Exchange.SubmitOrders(ctx, formattedOrders...) return es.Exchange.SubmitOrders(ctx, formattedOrders...)
} }
func (e *ExchangeOrderExecutionRouter) CancelOrdersTo(ctx context.Context, session string, orders ...types.Order) error {
if executor, ok := e.executors[session]; ok {
return executor.CancelOrders(ctx, orders...)
}
es, ok := e.sessions[session]
if !ok {
return fmt.Errorf("exchange session %s not found", session)
}
return es.Exchange.CancelOrders(ctx, orders...)
}
// ExchangeOrderExecutor is an order executor wrapper for single exchange instance. // ExchangeOrderExecutor is an order executor wrapper for single exchange instance.
//go:generate callbackgen -type ExchangeOrderExecutor //go:generate callbackgen -type ExchangeOrderExecutor
type ExchangeOrderExecutor struct { type ExchangeOrderExecutor struct {
@ -101,6 +115,13 @@ func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...type
return e.Session.Exchange.SubmitOrders(ctx, formattedOrders...) return e.Session.Exchange.SubmitOrders(ctx, formattedOrders...)
} }
func (e *ExchangeOrderExecutor) CancelOrders(ctx context.Context, orders ...types.Order) error {
for _, order := range orders {
log.Infof("cancelling order: %s", order)
}
return e.Session.Exchange.CancelOrders(ctx, orders...)
}
type BasicRiskController struct { type BasicRiskController struct {
Logger *log.Logger Logger *log.Logger

View File

@ -9,6 +9,7 @@ import (
"os" "os"
"path/filepath" "path/filepath"
"strings" "strings"
"syscall"
"time" "time"
"github.com/fatih/color" "github.com/fatih/color"
@ -324,28 +325,35 @@ var BacktestCmd = &cobra.Command{
klineChans = append(klineChans, KChanEx{KChan: c, Exchange: exchange}) klineChans = append(klineChans, KChanEx{KChan: c, Exchange: exchange})
} }
for { runCtx, cancelRun := context.WithCancel(ctx)
count := len(klineChans) go func() {
for _, kchanex := range klineChans { defer cancelRun()
kLine, more := <-kchanex.KChan for {
if more { count := len(klineChans)
kchanex.Exchange.ConsumeKLine(kLine) for _, kchanex := range klineChans {
} else { kLine, more := <-kchanex.KChan
if err := kchanex.Exchange.CloseMarketData(); err != nil { if more {
return err kchanex.Exchange.ConsumeKLine(kLine)
} else {
if err := kchanex.Exchange.CloseMarketData(); err != nil {
log.Errorf("%v", err)
return
}
count--
} }
count-- }
if count == 0 {
break
} }
} }
if count == 0 { }()
break
} cmdutil.WaitForSignal(runCtx, syscall.SIGINT, syscall.SIGTERM)
}
log.Infof("shutting down trader...") log.Infof("shutting down trader...")
shutdownCtx, cancel := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) shutdownCtx, cancelShutdown := context.WithDeadline(runCtx, time.Now().Add(10*time.Second))
trader.Graceful.Shutdown(shutdownCtx) trader.Graceful.Shutdown(shutdownCtx)
cancel() cancelShutdown()
// put the logger back to print the pnl // put the logger back to print the pnl
log.SetLevel(log.InfoLevel) log.SetLevel(log.InfoLevel)

View File

@ -272,7 +272,7 @@ func (s *Strategy) placeGridOrders(orderExecutor bbgo.OrderExecutor, session *bb
} }
func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) { func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
if err := session.Exchange.CancelOrders(context.Background(), s.activeOrders.Orders()...); err != nil { if err := orderExecutor.CancelOrders(context.Background(), s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error") log.WithError(err).Errorf("cancel order error")
} }
@ -359,13 +359,13 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
defer wg.Done() defer wg.Done()
log.Infof("canceling active orders...") log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { if err := orderExecutor.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error") log.WithError(err).Errorf("cancel order error")
} }
if s.CancelProfitOrdersOnShutdown { if s.CancelProfitOrdersOnShutdown {
log.Infof("canceling profit orders...") log.Infof("canceling profit orders...")
err := session.Exchange.CancelOrders(ctx, s.profitOrders.Orders()...) err := orderExecutor.CancelOrders(ctx, s.profitOrders.Orders()...)
if err != nil { if err != nil {
log.WithError(err).Errorf("cancel profit order error") log.WithError(err).Errorf("cancel profit order error")

View File

@ -86,9 +86,9 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
targetSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()}) targetSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval.String()})
} }
func (s *Strategy) clear(ctx context.Context, session *bbgo.ExchangeSession) { func (s *Strategy) clear(ctx context.Context, orderExecutor bbgo.OrderExecutor) {
if s.order.OrderID > 0 { if s.order.OrderID > 0 {
if err := session.Exchange.CancelOrders(ctx, s.order); err != nil { if err := orderExecutor.CancelOrders(ctx, s.order); err != nil {
log.WithError(err).Errorf("can not cancel trailingstop order: %+v", s.order) log.WithError(err).Errorf("can not cancel trailingstop order: %+v", s.order)
} }
@ -217,14 +217,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
closePrice := kline.Close closePrice := kline.Close
// ok, it's our call, we need to cancel the stop limit order first // ok, it's our call, we need to cancel the stop limit order first
s.clear(ctx, session) s.clear(ctx, orderExecutor)
s.place(ctx, orderExecutor, session, indicator, closePrice) s.place(ctx, orderExecutor, session, indicator, closePrice)
}) })
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
log.Infof("canceling trailingstop order...") log.Infof("canceling trailingstop order...")
s.clear(ctx, session) s.clear(ctx, orderExecutor)
}) })
if lastPrice, ok := session.LastPrice(s.Symbol); ok { if lastPrice, ok := session.LastPrice(s.Symbol); ok {
@ -261,14 +261,14 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
closePrice := kline.Close closePrice := kline.Close
// ok, it's our call, we need to cancel the stop limit order first // ok, it's our call, we need to cancel the stop limit order first
s.clear(ctx, session) s.clear(ctx, &orderExecutor)
s.place(ctx, &orderExecutor, session, indicator, closePrice) s.place(ctx, &orderExecutor, session, indicator, closePrice)
}) })
s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) { s.Graceful.OnShutdown(func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
log.Infof("canceling trailingstop order...") log.Infof("canceling trailingstop order...")
s.clear(ctx, session) s.clear(ctx, &orderExecutor)
}) })
if lastPrice, ok := session.LastPrice(s.Symbol); ok { if lastPrice, ok := session.LastPrice(s.Symbol); ok {

View File

@ -62,7 +62,7 @@ func (s *Strategy) ID() string {
} }
func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) { func (s *Strategy) updateOrders(orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) {
if err := session.Exchange.CancelOrders(context.Background(), s.activeOrders.Bids.Orders()...); err != nil { if err := orderExecutor.CancelOrders(context.Background(), s.activeOrders.Bids.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error") log.WithError(err).Errorf("cancel order error")
} }
@ -119,7 +119,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
log.Infof("canceling active orders...") log.Infof("canceling active orders...")
if err := session.Exchange.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil { if err := orderExecutor.CancelOrders(ctx, s.activeOrders.Orders()...); err != nil {
log.WithError(err).Errorf("cancel order error") log.WithError(err).Errorf("cancel order error")
} }
}) })

View File

@ -303,7 +303,7 @@ func (s *Strategy) submitOrders(ctx context.Context, orderExecutor bbgo.OrderExe
} }
// Cancel order // Cancel order
func (s *Strategy) cancelOrder(orderID uint64, ctx context.Context, session *bbgo.ExchangeSession) error { func (s *Strategy) cancelOrder(orderID uint64, ctx context.Context, orderExecutor bbgo.OrderExecutor) error {
// Cancel the original order // Cancel the original order
order, ok := s.orderStore.Get(orderID) order, ok := s.orderStore.Get(orderID)
if ok { if ok {
@ -311,7 +311,7 @@ func (s *Strategy) cancelOrder(orderID uint64, ctx context.Context, session *bbg
case types.OrderStatusCanceled, types.OrderStatusRejected, types.OrderStatusFilled: case types.OrderStatusCanceled, types.OrderStatusRejected, types.OrderStatusFilled:
// Do nothing // Do nothing
default: default:
if err := session.Exchange.CancelOrders(ctx, order); err != nil { if err := orderExecutor.CancelOrders(ctx, order); err != nil {
return err return err
} }
} }
@ -454,7 +454,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.tradeCollector.OnPositionUpdate(func(position *types.Position) { s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
if position.Base.Compare(s.Market.MinQuantity) > 0 { // Update order if we have a position if position.Base.Compare(s.Market.MinQuantity) > 0 { // Update order if we have a position
// Cancel the original order // Cancel the original order
if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, session); err != nil { if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, orderExecutor); err != nil {
log.WithError(err).Errorf("Can not cancel the original trailing stop order!") log.WithError(err).Errorf("Can not cancel the original trailing stop order!")
} }
s.trailingStopControl.OrderID = 0 s.trailingStopControl.OrderID = 0
@ -517,7 +517,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.trailingStopControl.CurrentHighestPrice = highPrice s.trailingStopControl.CurrentHighestPrice = highPrice
// Cancel the original order // Cancel the original order
if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, session); err != nil { if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, orderExecutor); err != nil {
log.WithError(err).Errorf("Can not cancel the original trailing stop order!") log.WithError(err).Errorf("Can not cancel the original trailing stop order!")
} }
s.trailingStopControl.OrderID = 0 s.trailingStopControl.OrderID = 0
@ -681,7 +681,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// Cancel trailing stop order // Cancel trailing stop order
if s.TrailingStopTarget.TrailingStopCallbackRatio.Sign() > 0 { if s.TrailingStopTarget.TrailingStopCallbackRatio.Sign() > 0 {
if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, session); err != nil { if err := s.cancelOrder(s.trailingStopControl.OrderID, ctx, orderExecutor); err != nil {
log.WithError(err).Errorf("Can not cancel the trailing stop order!") log.WithError(err).Errorf("Can not cancel the trailing stop order!")
} }
s.trailingStopControl.OrderID = 0 s.trailingStopControl.OrderID = 0

View File

@ -804,7 +804,8 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
defer tradeScanTicker.Stop() defer tradeScanTicker.Stop()
defer func() { defer func() {
if err := s.makerSession.Exchange.CancelOrders(context.Background(), s.activeMakerOrders.Orders()...); err != nil { if err := s.activeMakerOrders.GracefulCancel(context.Background(),
s.makerSession.Exchange); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol) log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
} }
}() }()