move ExchangeOrderExecutor into exchange session

This commit is contained in:
c9s 2021-01-30 20:03:59 +08:00
parent de8e717a41
commit ddcc8ae4ee
3 changed files with 21 additions and 15 deletions

View File

@ -106,6 +106,9 @@ func (environ *Environment) SetDB(db *sqlx.DB) *Environment {
// AddExchangeSession adds the existing exchange session or pre-created exchange session
func (environ *Environment) AddExchangeSession(name string, session *ExchangeSession) *ExchangeSession {
// update Notifiability from the environment
session.Notifiability = environ.Notifiability
environ.sessions[name] = session
return session
}
@ -185,6 +188,7 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
for n := range environ.sessions {
var session = environ.sessions[n]
if err := session.Init(ctx, environ); err != nil {
return err
}

View File

@ -144,6 +144,8 @@ type ExchangeSession struct {
orderStores map[string]*OrderStore
orderExecutor *ExchangeOrderExecutor
usedSymbols map[string]struct{}
initializedSymbols map[string]struct{}
}
@ -170,9 +172,8 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
marketDataStores: make(map[string]*MarketDataStore),
standardIndicatorSets: make(map[string]*StandardIndicatorSet),
orderStores: make(map[string]*OrderStore),
usedSymbols: make(map[string]struct{}),
initializedSymbols: make(map[string]struct{}),
usedSymbols: make(map[string]struct{}),
initializedSymbols: make(map[string]struct{}),
}
}
@ -183,6 +184,17 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
var log = log.WithField("session", session.Name)
var orderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
// forward trade updates and order updates to the order executor
session.Stream.OnTradeUpdate(orderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(orderExecutor.EmitOrderUpdate)
session.orderExecutor = orderExecutor
var markets, err = LoadExchangeMarketsWithCache(ctx, session.Exchange)
if err != nil {
return err

View File

@ -152,25 +152,15 @@ func (trader *Trader) Run(ctx context.Context) error {
for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName]
var baseOrderExecutor = &ExchangeOrderExecutor{
// copy the environment notification system so that we can route
Notifiability: trader.environment.Notifiability,
Session: session,
}
// forward trade updates and order updates to the order executor
session.Stream.OnTradeUpdate(baseOrderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(baseOrderExecutor.EmitOrderUpdate)
// default to base order executor
var orderExecutor OrderExecutor = baseOrderExecutor
var orderExecutor OrderExecutor = session.orderExecutor
// Since the risk controls are loaded from the config file
if riskControls := trader.riskControls; riskControls != nil {
if trader.riskControls.SessionBasedRiskControl != nil {
control, ok := trader.riskControls.SessionBasedRiskControl[sessionName]
if ok {
control.SetBaseOrderExecutor(baseOrderExecutor)
control.SetBaseOrderExecutor(session.orderExecutor)
// pick the order executor
if control.OrderExecutor != nil {