refactor trade report and move trade reporter to the environment layer

This commit is contained in:
c9s 2020-10-31 04:36:45 +08:00
parent 8867ceb951
commit ec9b5230aa
5 changed files with 32 additions and 50 deletions

View File

@ -29,10 +29,15 @@ func RegisterStrategy(key string, s interface{}) {
// Environment presents the real exchange data layer
type Environment struct {
// Notifiability here for environment is for the streaming data notification
// note that, for back tests, we don't need notification.
Notifiability
TradeService *service.TradeService
TradeSync *service.TradeSync
tradeScanTime time.Time
tradeReporter *TradeReporter
sessions map[string]*ExchangeSession
}
@ -53,6 +58,12 @@ func (environ *Environment) SyncTrades(db *sqlx.DB) *Environment {
return environ
}
func (environ *Environment) ReportTrade() *TradeReporter {
environ.tradeReporter = NewTradeReporter(&environ.Notifiability)
return environ.tradeReporter
}
func (environ *Environment) AddExchange(name string, exchange types.Exchange) (session *ExchangeSession) {
session = NewExchangeSession(name, exchange)
environ.sessions[name] = session
@ -158,6 +169,13 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
session.marketDataStores[kline.Symbol].AddKLine(kline)
})
// session based trade reporter
if environ.tradeReporter != nil {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
environ.tradeReporter.Report(trade)
})
}
session.Stream.OnTradeUpdate(func(trade types.Trade) {
// append trades
session.Trades[trade.Symbol] = append(session.Trades[trade.Symbol], trade)

View File

@ -143,44 +143,21 @@ func (router *ObjectChannelRouter) Route(obj interface{}) (channel string, ok bo
type TradeReporter struct {
*Notifiability
channel string
channelRoutes map[*regexp.Regexp]string
}
func NewTradeReporter(notifiability *Notifiability) *TradeReporter {
return &TradeReporter{
Notifiability: notifiability,
channelRoutes: make(map[*regexp.Regexp]string),
Notifiability: notifiability,
}
}
func (reporter *TradeReporter) Channel(channel string) *TradeReporter {
reporter.channel = channel
return reporter
}
func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter {
for pattern, channel := range routes {
reporter.channelRoutes[regexp.MustCompile(pattern)] = channel
}
return reporter
}
func (reporter *TradeReporter) getChannel(symbol string) string {
for pattern, channel := range reporter.channelRoutes {
if pattern.MatchString(symbol) {
return channel
}
}
return reporter.channel
}
func (reporter *TradeReporter) Report(trade types.Trade) {
var channel = reporter.getChannel(trade.Symbol)
text := util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade)
var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade)
reporter.NotifyTo(channel, text, trade)
channel, ok := reporter.RouteSymbol(trade.Symbol)
if ok {
reporter.NotifyTo(channel, text, trade)
} else {
reporter.Notify(text, trade)
}
}

View File

@ -87,6 +87,7 @@ func (set *StandardIndicatorSet) GetEWMA(iw types.IntervalWindow) *indicator.EWM
// It also maintains and collects the data returned from the stream.
type ExchangeSession struct {
// exchange session based notification system
// we make it as a value field so that we can configure it separately
Notifiability
// Exchange session name

View File

@ -44,8 +44,6 @@ type Trader struct {
crossExchangeStrategies []CrossExchangeStrategy
exchangeStrategies map[string][]SingleExchangeStrategy
tradeReporter *TradeReporter
}
func NewTrader(environ *Environment) *Trader {
@ -55,11 +53,6 @@ func NewTrader(environ *Environment) *Trader {
}
}
func (trader *Trader) ReportTrade() *TradeReporter {
trader.tradeReporter = NewTradeReporter(&trader.Notifiability)
return trader.tradeReporter
}
// AttachStrategyOn attaches the single exchange strategy on an exchange session.
// Single exchange strategy is the default behavior.
func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader {
@ -102,16 +95,6 @@ func (trader *Trader) Run(ctx context.Context) error {
return err
}
// session based trade reporter
for sessionName := range trader.environment.sessions {
var session = trader.environment.sessions[sessionName]
if trader.tradeReporter != nil {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
trader.tradeReporter.Report(trade)
})
}
}
// load and run session strategies
for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName]

View File

@ -97,6 +97,8 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
environ.AddExchange(sessionName, exchange)
}
}
environ.ReportTrade()
trader := bbgo.NewTrader(environ)
@ -122,7 +124,9 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
}
}
// configure rules
// configure notification rules
// for symbol-based routes, we should register the same symbol rules for each session.
// for session-based routes, we should set the fixed callbacks for each session
if conf := userConfig.Notifications; conf != nil {
// configure routing here
if conf.SymbolChannels != nil {
@ -168,7 +172,6 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
}
}
trader.ReportTrade()
if userConfig.RiskControls != nil {
trader.SetRiskControls(userConfig.RiskControls)