Merge pull request #236 from c9s/fix/cross-exchange-risk-control

fix cross exchange order executor for the basic risk control
This commit is contained in:
Yo-An Lin 2021-05-12 19:04:57 +08:00 committed by GitHub
commit 0a3987b616
5 changed files with 29 additions and 15 deletions

View File

@ -268,7 +268,7 @@ func InitExchangeSession(name string, session *ExchangeSession) error {
session.positions = make(map[string]*Position)
session.standardIndicatorSets = make(map[string]*StandardIndicatorSet)
session.orderStores = make(map[string]*OrderStore)
session.orderExecutor = &ExchangeOrderExecutor{
session.OrderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,

View File

@ -17,10 +17,12 @@ type OrderExecutor interface {
OnTradeUpdate(cb func(trade types.Trade))
OnOrderUpdate(cb func(order types.Order))
EmitTradeUpdate(trade types.Trade)
EmitOrderUpdate(order types.Order)
}
type OrderExecutionRouter interface {
// SubmitOrderTo submit order to a specific exchange Session
// SubmitOrdersTo submit order to a specific exchange Session
SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
}
@ -28,9 +30,14 @@ type ExchangeOrderExecutionRouter struct {
Notifiability
sessions map[string]*ExchangeSession
executors map[string]OrderExecutor
}
func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, session string, orders ...types.SubmitOrder) (types.OrderSlice, error) {
if executor, ok := e.executors[session] ; ok {
return executor.SubmitOrders(ctx, orders...)
}
es, ok := e.sessions[session]
if !ok {
return nil, fmt.Errorf("exchange session %s not found", session)
@ -47,9 +54,11 @@ func (e *ExchangeOrderExecutionRouter) SubmitOrdersTo(ctx context.Context, sessi
// ExchangeOrderExecutor is an order executor wrapper for single exchange instance.
//go:generate callbackgen -type ExchangeOrderExecutor
type ExchangeOrderExecutor struct {
Notifiability `json:"-"`
// MinQuoteBalance fixedpoint.Value `json:"minQuoteBalance,omitempty" yaml:"minQuoteBalance,omitempty"`
Session *ExchangeSession
Notifiability `json:"-" yaml:"-"`
Session *ExchangeSession `json:"-" yaml:"-"`
// private trade update callbacks
tradeUpdateCallbacks []func(trade types.Trade)

View File

@ -132,6 +132,8 @@ type ExchangeSession struct {
IsInitialized bool `json:"-" yaml:"-"`
OrderExecutor *ExchangeOrderExecutor `json:"orderExecutor,omitempty" yaml:"orderExecutor,omitempty"`
// Stream is the connection stream of the exchange
Stream types.Stream `json:"-" yaml:"-"`
@ -162,7 +164,6 @@ type ExchangeSession struct {
orderStores map[string]*OrderStore
orderExecutor *ExchangeOrderExecutor
usedSymbols map[string]struct{}
initializedSymbols map[string]struct{}
@ -197,7 +198,7 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
logger: log.WithField("session", name),
}
session.orderExecutor = &ExchangeOrderExecutor{
session.OrderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
@ -248,8 +249,8 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
session.Account.UpdateBalances(balances)
// forward trade updates and order updates to the order executor
session.Stream.OnTradeUpdate(session.orderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(session.orderExecutor.EmitOrderUpdate)
session.Stream.OnTradeUpdate(session.OrderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(session.OrderExecutor.EmitOrderUpdate)
session.Account.BindStream(session.Stream)
// insert trade into db right before everything

View File

@ -260,12 +260,12 @@ func (trader *Trader) getSessionOrderExecutor(sessionName string) OrderExecutor
var session = trader.environment.sessions[sessionName]
// default to base order executor
var orderExecutor OrderExecutor = session.orderExecutor
var orderExecutor OrderExecutor = session.OrderExecutor
// Since the risk controls are loaded from the config file
if trader.riskControls != nil && trader.riskControls.SessionBasedRiskControl != nil {
if control, ok := trader.riskControls.SessionBasedRiskControl[sessionName]; ok {
control.SetBaseOrderExecutor(session.orderExecutor)
control.SetBaseOrderExecutor(session.OrderExecutor)
// pick the wrapped order executor
if control.OrderExecutor != nil {
@ -306,6 +306,11 @@ func (trader *Trader) Run(ctx context.Context) error {
router := &ExchangeOrderExecutionRouter{
Notifiability: trader.environment.Notifiability,
sessions: trader.environment.sessions,
executors: make(map[string]OrderExecutor),
}
for sessionID := range trader.environment.sessions {
var orderExecutor = trader.getSessionOrderExecutor(sessionID)
router.executors[sessionID] = orderExecutor
}
for _, strategy := range trader.crossExchangeStrategies {

View File

@ -154,7 +154,7 @@ func aggregatePrice(pvs types.PriceVolumeSlice, requiredQuantity fixedpoint.Valu
return price
}
func (s *Strategy) updateQuote(ctx context.Context) {
func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter) {
if err := s.makerSession.Exchange.CancelOrders(ctx, s.activeMakerOrders.Orders()...); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
return
@ -358,8 +358,7 @@ func (s *Strategy) updateQuote(ctx context.Context) {
return
}
makerOrderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.makerSession}
makerOrders, err := makerOrderExecutor.SubmitOrders(ctx, submitOrders...)
makerOrders, err := orderExecutionRouter.SubmitOrdersTo(ctx, s.MakerExchange, submitOrders...)
if err != nil {
log.WithError(err).Errorf("order error: %s", err.Error())
return
@ -527,7 +526,7 @@ func (s *Strategy) Validate() error {
return nil
}
func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession) error {
// configure default values
if s.UpdateInterval == 0 {
s.UpdateInterval = types.Duration(time.Second)
@ -657,7 +656,7 @@ func (s *Strategy) CrossRun(ctx context.Context, _ bbgo.OrderExecutionRouter, se
return
case <-quoteTicker.C:
s.updateQuote(ctx)
s.updateQuote(ctx, orderExecutionRouter)
case <-posTicker.C:
position := s.state.HedgePosition.AtomicLoad()