Merge pull request #1156 from c9s/refactor/order-executor

REFACTOR: pull out Fast* methods to FastOrderExecutor
This commit is contained in:
Yo-An Lin 2023-05-16 17:00:32 +08:00 committed by GitHub
commit 0e23a5cda5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 87 additions and 61 deletions

View File

@ -0,0 +1,67 @@
package bbgo
import (
"context"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/types"
)
// FastOrderExecutor provides shorter submit order / cancel order round-trip time
// for strategies that need to response more faster, e.g. 1s kline or market trades related strategies.
type FastOrderExecutor struct {
*GeneralOrderExecutor
}
func NewFastOrderExecutor(session *ExchangeSession, symbol, strategy, strategyInstanceID string, position *types.Position) *FastOrderExecutor {
oe := NewGeneralOrderExecutor(session, symbol, strategy, strategyInstanceID, position)
return &FastOrderExecutor{
GeneralOrderExecutor: oe,
}
}
// SubmitOrders sends []types.SubmitOrder directly to the exchange without blocking wait on the status update.
// This is a faster version of GeneralOrderExecutor.SubmitOrders(). Created orders will be consumed in newly created goroutine (in non-backteset session).
// @param ctx: golang context type.
// @param submitOrders: Lists of types.SubmitOrder to be sent to the exchange.
// @return *types.SubmitOrder: SubmitOrder with calculated quantity and price.
// @return error: Error message.
func (e *FastOrderExecutor) SubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := e.session.FormatOrders(submitOrders)
if err != nil {
return nil, err
}
createdOrders, errIdx, err := BatchPlaceOrder(ctx, e.session.Exchange, nil, formattedOrders...)
if len(errIdx) > 0 {
return nil, err
}
if IsBackTesting {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
} else {
go func() {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
}()
}
return createdOrders, err
}
// Cancel cancels all active maker orders if orders is not given, otherwise cancel the given orders
func (e *FastOrderExecutor) Cancel(ctx context.Context, orders ...types.Order) error {
if e.activeMakerOrders.NumOfOrders() == 0 {
return nil
}
if err := e.activeMakerOrders.FastCancel(ctx, e.session.Exchange, orders...); err != nil {
return errors.Wrap(err, "fast cancel order error")
}
return nil
}

View File

@ -190,38 +190,6 @@ func (e *GeneralOrderExecutor) CancelOrders(ctx context.Context, orders ...types
return err
}
// FastSubmitOrders send []types.SubmitOrder directly to the exchange without blocking wait on the status update.
// This is a faster version of SubmitOrders(). Created orders will be consumed in newly created goroutine (in non-backteset session).
// @param ctx: golang context type.
// @param submitOrders: Lists of types.SubmitOrder to be sent to the exchange.
// @return *types.SubmitOrder: SubmitOrder with calculated quantity and price.
// @return error: Error message.
func (e *GeneralOrderExecutor) FastSubmitOrders(ctx context.Context, submitOrders ...types.SubmitOrder) (types.OrderSlice, error) {
formattedOrders, err := e.session.FormatOrders(submitOrders)
if err != nil {
return nil, err
}
createdOrders, errIdx, err := BatchPlaceOrder(ctx, e.session.Exchange, nil, formattedOrders...)
if len(errIdx) > 0 {
return nil, err
}
if IsBackTesting {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
} else {
go func() {
e.orderStore.Add(createdOrders...)
e.activeMakerOrders.Add(createdOrders...)
e.tradeCollector.Process()
}()
}
return createdOrders, err
}
func (e *GeneralOrderExecutor) SetLogger(logger log.FieldLogger) {
e.logger = logger
}
@ -467,19 +435,6 @@ func (e *GeneralOrderExecutor) GracefulCancel(ctx context.Context, orders ...typ
return nil
}
// FastCancel cancels all active maker orders if orders is not given, otherwise cancel the given orders
func (e *GeneralOrderExecutor) FastCancel(ctx context.Context, orders ...types.Order) error {
if e.activeMakerOrders.NumOfOrders() == 0 {
return nil
}
if err := e.activeMakerOrders.FastCancel(ctx, e.session.Exchange, orders...); err != nil {
return errors.Wrap(err, "fast cancel order error")
}
return nil
}
// ClosePosition closes the current position by a percentage.
// percentage 0.1 means close 10% position
// tag is the order tag you want to attach, you may pass multiple tags, the tags will be combined into one tag string by commas.

View File

@ -121,7 +121,8 @@ type Strategy struct {
ExitMethods bbgo.ExitMethodSet `json:"exits"`
Session *bbgo.ExchangeSession
*bbgo.GeneralOrderExecutor
*bbgo.FastOrderExecutor
getLastPrice func() fixedpoint.Value
}
@ -283,7 +284,7 @@ func (s *Strategy) initIndicators(store *bbgo.SerialMarketDataStore) error {
}
func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64, syscounter int) (int, error) {
nonTraded := s.GeneralOrderExecutor.ActiveMakerOrders().Orders()
nonTraded := s.FastOrderExecutor.ActiveMakerOrders().Orders()
if len(nonTraded) > 0 {
if len(nonTraded) > 1 {
log.Errorf("should only have one order to cancel, got %d", len(nonTraded))
@ -316,7 +317,7 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef, atr float64, syscoun
}
}
if toCancel {
err := s.GeneralOrderExecutor.FastCancel(ctx)
err := s.FastOrderExecutor.Cancel(ctx)
s.pendingLock.Lock()
counters := s.orderPendingCounter
s.orderPendingCounter = make(map[uint64]int)
@ -424,7 +425,7 @@ func (s *Strategy) Rebalance(ctx context.Context) {
if math.Abs(beta) > s.RebalanceFilter && math.Abs(s.beta) > s.RebalanceFilter || math.Abs(s.beta) < s.RebalanceFilter && math.Abs(beta) < s.RebalanceFilter {
return
}
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
balances := s.FastOrderExecutor.Session().GetAccount().Balances()
baseBalance := balances[s.Market.BaseCurrency].Total()
quoteBalance := balances[s.Market.QuoteCurrency].Total()
total := baseBalance.Add(quoteBalance.Div(price))
@ -578,7 +579,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
}
if s.Debug {
balances := s.GeneralOrderExecutor.Session().GetAccount().Balances()
balances := s.FastOrderExecutor.Session().GetAccount().Balances()
bbgo.Notify("source: %.4f, price: %.4f, drift[0]: %.4f, ddrift[0]: %.4f, lowf %.4f, highf: %.4f lowest: %.4f highest: %.4f sp %.4f bp %.4f",
sourcef, pricef, drift[0], ddrift[0], atr, lowf, highf, s.lowestPrice, s.highestPrice, s.sellPrice, s.buyPrice)
// Notify will parse args to strings and process separately
@ -640,7 +641,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
opt.Price = source
opt.Tags = []string{"long"}
submitOrder, err := s.GeneralOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
submitOrder, err := s.FastOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
if err != nil {
errs := filterErrors(multierr.Errors(err))
if len(errs) > 0 {
@ -690,7 +691,7 @@ func (s *Strategy) klineHandler(ctx context.Context, kline types.KLine, counter
}
opt.Price = source
opt.Tags = []string{"short"}
submitOrder, err := s.GeneralOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
submitOrder, err := s.FastOrderExecutor.NewOrderFromOpenPosition(ctx, &opt)
if err != nil {
errs := filterErrors(multierr.Errors(err))
if len(errs) > 0 {
@ -745,11 +746,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.Status = types.StrategyStatusRunning
s.OnSuspend(func() {
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
_ = s.FastOrderExecutor.GracefulCancel(ctx)
})
s.OnEmergencyStop(func() {
_ = s.GeneralOrderExecutor.GracefulCancel(ctx)
_ = s.FastOrderExecutor.GracefulCancel(ctx)
_ = s.ClosePosition(ctx, fixedpoint.One)
})
@ -766,14 +767,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
}
}
s.GeneralOrderExecutor = bbgo.NewGeneralOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.GeneralOrderExecutor.DisableNotify()
orderStore := s.GeneralOrderExecutor.OrderStore()
s.FastOrderExecutor = bbgo.NewFastOrderExecutor(session, s.Symbol, ID, instanceID, s.Position)
s.FastOrderExecutor.DisableNotify()
orderStore := s.FastOrderExecutor.OrderStore()
orderStore.AddOrderUpdate = true
orderStore.RemoveCancelled = true
orderStore.RemoveFilled = true
activeOrders := s.GeneralOrderExecutor.ActiveMakerOrders()
tradeCollector := s.GeneralOrderExecutor.TradeCollector()
activeOrders := s.FastOrderExecutor.ActiveMakerOrders()
tradeCollector := s.FastOrderExecutor.TradeCollector()
tradeStore := tradeCollector.TradeStore()
syscounter := 0

View File

@ -211,7 +211,7 @@ func (s *Strategy) smartCancel(ctx context.Context, pricef float64) int {
panic("not supported side for the order")
}
if toCancel {
err := s.GeneralOrderExecutor.FastCancel(ctx, order)
err := s.GeneralOrderExecutor.CancelOrders(ctx, order)
if err == nil {
delete(s.orderPendingCounter, order.OrderID)
} else {

View File

@ -343,7 +343,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
diffQty := targetBase.Sub(s.Position.Base)
log.Info(alphaNrr.Float64(), s.Position.Base, diffQty.Float64())
s.orderExecutor.FastCancel(ctx)
if err := s.orderExecutor.CancelOrders(ctx); err != nil {
log.WithError(err).Errorf("cancel order error")
}
if diffQty.Sign() > 0 {
_, _ = s.orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Symbol: s.Symbol,