refactor exchange session initialization

This commit is contained in:
c9s 2021-05-12 11:59:29 +08:00
parent 8d63647104
commit df11112d64
3 changed files with 73 additions and 58 deletions

View File

@ -210,62 +210,80 @@ func (environ *Environment) AddExchangesByViperKeys() error {
return nil
}
func NewExchangeSessionFromConfig(name string, sessionConfig *ExchangeSession) (*ExchangeSession, error) {
exchangeName, err := types.ValidExchangeName(sessionConfig.ExchangeName)
func InitExchangeSession(name string, session *ExchangeSession) error {
session.Name = name
exchangeName, err := types.ValidExchangeName(session.ExchangeName)
if err != nil {
return nil, err
return err
}
var exchange types.Exchange
if sessionConfig.Key != "" && sessionConfig.Secret != "" {
if !sessionConfig.PublicOnly {
if len(sessionConfig.Key) == 0 || len(sessionConfig.Secret) == 0 {
return nil, fmt.Errorf("can not create exchange %s: empty key or secret", exchangeName)
if session.Key != "" && session.Secret != "" {
if !session.PublicOnly {
if len(session.Key) == 0 || len(session.Secret) == 0 {
return fmt.Errorf("can not create exchange %s: empty key or secret", exchangeName)
}
}
exchange, err = cmdutil.NewExchangeStandard(exchangeName, sessionConfig.Key, sessionConfig.Secret, sessionConfig.SubAccount)
exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, session.SubAccount)
} else {
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, sessionConfig.EnvVarPrefix)
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
}
if err != nil {
return nil, err
return err
}
session.Exchange = exchange
session.Notifiability = Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
}
session.Stream = exchange.NewStream()
// configure exchange
if sessionConfig.Margin {
if session.Margin {
marginExchange, ok := exchange.(types.MarginExchange)
if !ok {
return nil, fmt.Errorf("exchange %s does not support margin", exchangeName)
return fmt.Errorf("exchange %s does not support margin", exchangeName)
}
if sessionConfig.IsolatedMargin {
marginExchange.UseIsolatedMargin(sessionConfig.IsolatedMarginSymbol)
if session.IsolatedMargin {
marginExchange.UseIsolatedMargin(session.IsolatedMarginSymbol)
} else {
marginExchange.UseMargin()
}
}
session := NewExchangeSession(name, exchange)
session.ExchangeName = sessionConfig.ExchangeName
session.EnvVarPrefix = sessionConfig.EnvVarPrefix
session.Key = sessionConfig.Key
session.Secret = sessionConfig.Secret
session.SubAccount = sessionConfig.SubAccount
session.PublicOnly = sessionConfig.PublicOnly
session.Margin = sessionConfig.Margin
session.IsolatedMargin = sessionConfig.IsolatedMargin
session.IsolatedMarginSymbol = sessionConfig.IsolatedMarginSymbol
session.Withdrawal = sessionConfig.Withdrawal
return session, nil
// pointer fields
session.Subscriptions = make(map[types.Subscription]types.Subscription)
session.Account = &types.Account{}
session.Trades = make(map[string]*types.TradeSlice)
session.markets = make(map[string]types.Market)
session.lastPrices = make(map[string]float64)
session.startPrices = make(map[string]float64)
session.marketDataStores = make(map[string]*MarketDataStore)
session.positions = make(map[string]*Position)
session.standardIndicatorSets = make(map[string]*StandardIndicatorSet)
session.orderStores = make(map[string]*OrderStore)
session.orderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
session.usedSymbols = make(map[string]struct{})
session.initializedSymbols = make(map[string]struct{})
session.logger = log.WithField("session", name)
return nil
}
func (environ *Environment) AddExchangesFromSessionConfig(sessions map[string]*ExchangeSession) error {
for sessionName, sessionConfig := range sessions {
session, err := NewExchangeSessionFromConfig(sessionName, sessionConfig)
if err != nil {
for sessionName, session := range sessions {
if err := InitExchangeSession(sessionName, session); err != nil {
return err
}
@ -275,7 +293,6 @@ func (environ *Environment) AddExchangesFromSessionConfig(sessions map[string]*E
return nil
}
// Init prepares the data that will be used by the strategies
func (environ *Environment) Init(ctx context.Context) (err error) {
for n := range environ.sessions {
@ -301,7 +318,6 @@ func (environ *Environment) Start(ctx context.Context) (err error) {
return
}
func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error {
if conf.Redis != nil {
if err := env.Set(conf.Redis); err != nil {

View File

@ -116,7 +116,7 @@ type ExchangeSession struct {
SubAccount string `json:"subAccount,omitempty" yaml:"subAccount,omitempty"`
// Withdrawal is used for enabling withdrawal functions
Withdrawal bool `json:"withdrawal" yaml:"withdrawal"`
Withdrawal bool `json:"withdrawal,omitempty" yaml:"withdrawal,omitempty"`
PublicOnly bool `json:"publicOnly,omitempty" yaml:"publicOnly"`
Margin bool `json:"margin,omitempty" yaml:"margin"`
@ -139,6 +139,10 @@ type ExchangeSession struct {
Exchange types.Exchange `json:"-" yaml:"-"`
// Trades collects the executed trades from the exchange
// map: symbol -> []trade
Trades map[string]*types.TradeSlice `json:"-" yaml:"-"`
// markets defines market configuration of a symbol
markets map[string]types.Market
@ -148,10 +152,6 @@ type ExchangeSession struct {
lastPrices map[string]float64
lastPriceUpdatedAt time.Time
// Trades collects the executed trades from the exchange
// map: symbol -> []trade
Trades map[string]*types.TradeSlice `json:"-" yaml:"-"`
// marketDataStores contains the market data store of each market
marketDataStores map[string]*MarketDataStore
@ -171,7 +171,7 @@ type ExchangeSession struct {
}
func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
return &ExchangeSession{
session := &ExchangeSession{
Notifiability: Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
@ -196,6 +196,14 @@ func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
initializedSymbols: make(map[string]struct{}),
logger: log.WithField("session", name),
}
session.orderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
return session
}
// Init initializes the basic data structure and market information by its exchange.
@ -239,17 +247,9 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
session.Account.UpdateBalances(balances)
var orderExecutor = &ExchangeOrderExecutor{
// copy the notification system so that we can route
Notifiability: session.Notifiability,
Session: session,
}
// forward trade updates and order updates to the order executor
session.Stream.OnTradeUpdate(orderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(orderExecutor.EmitOrderUpdate)
session.orderExecutor = orderExecutor
session.Stream.OnTradeUpdate(session.orderExecutor.EmitTradeUpdate)
session.Stream.OnOrderUpdate(session.orderExecutor.EmitOrderUpdate)
session.Account.BindStream(session.Stream)
// insert trade into db right before everything

View File

@ -78,7 +78,7 @@ func (s *Server) newEngine() *gin.Engine {
r.POST("/api/environment/sync", func(c *gin.Context) {
go func() {
if err := s.Environ.Sync(context.Background()) ; err != nil {
if err := s.Environ.Sync(context.Background()); err != nil {
logrus.WithError(err).Error("sync error")
}
}()
@ -125,15 +125,15 @@ func (s *Server) newEngine() *gin.Engine {
r.GET("/api/trading-volume", s.tradingVolume)
r.POST("/api/sessions/test", func(c *gin.Context) {
var sessionConfig bbgo.ExchangeSession
if err := c.BindJSON(&sessionConfig); err != nil {
var session bbgo.ExchangeSession
if err := c.BindJSON(&session); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
return
}
session, err := bbgo.NewExchangeSessionFromConfig(sessionConfig.ExchangeName, &sessionConfig)
err := bbgo.InitExchangeSession(session.ExchangeName, &session)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
@ -174,16 +174,15 @@ func (s *Server) newEngine() *gin.Engine {
})
r.POST("/api/sessions", func(c *gin.Context) {
var sessionConfig bbgo.ExchangeSession
if err := c.BindJSON(&sessionConfig); err != nil {
var session bbgo.ExchangeSession
if err := c.BindJSON(&session); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
return
}
session, err := bbgo.NewExchangeSessionFromConfig(sessionConfig.ExchangeName, &sessionConfig)
if err != nil {
if err := bbgo.InitExchangeSession(session.ExchangeName, &session); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"error": err.Error(),
})
@ -193,9 +192,9 @@ func (s *Server) newEngine() *gin.Engine {
if s.Config.Sessions == nil {
s.Config.Sessions = make(map[string]*bbgo.ExchangeSession)
}
s.Config.Sessions[sessionConfig.Name] = session
s.Config.Sessions[session.Name] = &session
s.Environ.AddExchangeSession(sessionConfig.Name, session)
s.Environ.AddExchangeSession(session.Name, &session)
if err := session.Init(c, s.Environ); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{