Merge pull request #49 from c9s/enhancement/notification-routing

enhancement: improve notification system for session-based and symbol-based routing rules
This commit is contained in:
Yo-An Lin 2020-10-31 18:51:18 +08:00 committed by GitHub
commit 1d69b2dc10
8 changed files with 205 additions and 120 deletions

View File

@ -36,7 +36,7 @@ type NotificationRouting struct {
PnL string `json:"pnL,omitempty" yaml:"pnL,omitempty"` PnL string `json:"pnL,omitempty" yaml:"pnL,omitempty"`
} }
type Notifications struct { type NotificationConfig struct {
Slack *SlackNotification `json:"slack,omitempty" yaml:"slack,omitempty"` Slack *SlackNotification `json:"slack,omitempty" yaml:"slack,omitempty"`
SymbolChannels map[string]string `json:"symbolChannels,omitempty" yaml:"symbolChannels,omitempty"` SymbolChannels map[string]string `json:"symbolChannels,omitempty" yaml:"symbolChannels,omitempty"`
@ -53,7 +53,7 @@ type Session struct {
type Config struct { type Config struct {
Imports []string `json:"imports" yaml:"imports"` Imports []string `json:"imports" yaml:"imports"`
Notifications *Notifications `json:"notifications,omitempty" yaml:"notifications,omitempty"` Notifications *NotificationConfig `json:"notifications,omitempty" yaml:"notifications,omitempty"`
Sessions map[string]Session `json:"sessions,omitempty" yaml:"sessions,omitempty"` Sessions map[string]Session `json:"sessions,omitempty" yaml:"sessions,omitempty"`

View File

@ -10,8 +10,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
) )
var LoadedExchangeStrategies = make(map[string]SingleExchangeStrategy) var LoadedExchangeStrategies = make(map[string]SingleExchangeStrategy)
@ -29,6 +31,10 @@ func RegisterStrategy(key string, s interface{}) {
// Environment presents the real exchange data layer // Environment presents the real exchange data layer
type Environment struct { type Environment struct {
// Notifiability here for environment is for the streaming data notification
// note that, for back tests, we don't need notification.
Notifiability
TradeService *service.TradeService TradeService *service.TradeService
TradeSync *service.TradeSync TradeSync *service.TradeSync
@ -174,6 +180,146 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
return nil return nil
} }
// configure notification rules
// for symbol-based routes, we should register the same symbol rules for each session.
// for session-based routes, we should set the fixed callbacks for each session
func (environ *Environment) ConfigureNotification(conf *NotificationConfig) {
// configure routing here
if conf.SymbolChannels != nil {
environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
}
if conf.SessionChannels != nil {
environ.SessionChannelRouter.AddRoute(conf.SessionChannels)
}
if conf.Routing != nil {
// configure passive object notification routing
switch conf.Routing.Trade {
case "$session":
defaultTradeUpdateHandler := func(trade types.Trade) {
text := util.Render(TemplateTradeReport, trade)
environ.Notify(text, &trade)
}
for name := range environ.sessions {
session := environ.sessions[name]
// if we can route session name to channel successfully...
channel, ok := environ.SessionChannelRouter.Route(name)
if ok {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
text := util.Render(TemplateTradeReport, trade)
environ.NotifyTo(channel, text, &trade)
})
} else {
session.Stream.OnTradeUpdate(defaultTradeUpdateHandler)
}
}
case "$symbol":
// configure object routes for Trade
environ.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
trade, matched := obj.(*types.Trade)
if !matched {
return
}
channel, ok = environ.SymbolChannelRouter.Route(trade.Symbol)
return
})
// use same handler for each session
handler := func(trade types.Trade) {
text := util.Render(TemplateTradeReport, trade)
channel, ok := environ.RouteObject(&trade)
if ok {
environ.NotifyTo(channel, text, &trade)
} else {
environ.Notify(text, &trade)
}
}
for _, session := range environ.sessions {
session.Stream.OnTradeUpdate(handler)
}
}
switch conf.Routing.Order {
case "$session":
defaultOrderUpdateHandler := func(order types.Order) {
text := util.Render(TemplateOrderReport, order)
environ.Notify(text, &order)
}
for name := range environ.sessions {
session := environ.sessions[name]
// if we can route session name to channel successfully...
channel, ok := environ.SessionChannelRouter.Route(name)
if ok {
session.Stream.OnOrderUpdate(func(order types.Order) {
text := util.Render(TemplateOrderReport, order)
environ.NotifyTo(channel, text, &order)
})
} else {
session.Stream.OnOrderUpdate(defaultOrderUpdateHandler)
}
}
case "$symbol":
// add object route
environ.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
order, matched := obj.(*types.Order)
if !matched {
return
}
channel, ok = environ.SymbolChannelRouter.Route(order.Symbol)
return
})
// use same handler for each session
handler := func(order types.Order) {
text := util.Render(TemplateOrderReport, order)
channel, ok := environ.RouteObject(&order)
if ok {
environ.NotifyTo(channel, text, &order)
} else {
environ.Notify(text, &order)
}
}
for _, session := range environ.sessions {
session.Stream.OnOrderUpdate(handler)
}
}
switch conf.Routing.SubmitOrder {
case "$symbol":
// add object route
environ.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
order, matched := obj.(*types.SubmitOrder)
if !matched {
return
}
channel, ok = environ.SymbolChannelRouter.Route(order.Symbol)
return
})
}
// currently not used
switch conf.Routing.PnL {
case "$symbol":
environ.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
report, matched := obj.(*pnl.AverageCostPnlReport)
if !matched {
return
}
channel, ok = environ.SymbolChannelRouter.Route(report.Symbol)
return
})
}
}
}
// SyncTradesFrom overrides the default trade scan time (-7 days) // SyncTradesFrom overrides the default trade scan time (-7 days)
func (environ *Environment) SyncTradesFrom(t time.Time) *Environment { func (environ *Environment) SyncTradesFrom(t time.Time) *Environment {
environ.tradeScanTime = t environ.tradeScanTime = t

View File

@ -18,18 +18,22 @@ type Notifiability struct {
ObjectChannelRouter *ObjectChannelRouter ObjectChannelRouter *ObjectChannelRouter
} }
// RouteSession routes symbol name to channel
func (m *Notifiability) RouteSymbol(symbol string) (channel string, ok bool) { func (m *Notifiability) RouteSymbol(symbol string) (channel string, ok bool) {
return m.SymbolChannelRouter.Route(symbol) return m.SymbolChannelRouter.Route(symbol)
} }
// RouteSession routes session name to channel
func (m *Notifiability) RouteSession(session string) (channel string, ok bool) { func (m *Notifiability) RouteSession(session string) (channel string, ok bool) {
return m.SessionChannelRouter.Route(session) return m.SessionChannelRouter.Route(session)
} }
// RouteObject routes object to channel
func (m *Notifiability) RouteObject(obj interface{}) (channel string, ok bool) { func (m *Notifiability) RouteObject(obj interface{}) (channel string, ok bool) {
return m.ObjectChannelRouter.Route(obj) return m.ObjectChannelRouter.Route(obj)
} }
// AddNotifier adds the notifier that implements the Notifier interface.
func (m *Notifiability) AddNotifier(notifier Notifier) { func (m *Notifiability) AddNotifier(notifier Notifier) {
m.notifiers = append(m.notifiers, notifier) m.notifiers = append(m.notifiers, notifier)
} }

View File

@ -39,13 +39,35 @@ type ExchangeOrderExecutor struct {
session *ExchangeSession `json:"-"` session *ExchangeSession `json:"-"`
} }
func (e *ExchangeOrderExecutor) notifySubmitOrders(orders ...types.SubmitOrder) {
for _, order := range orders {
// pass submit order as an interface object.
channel, ok := e.RouteObject(&order)
if ok {
e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, &order)
} else {
e.Notify(":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, &order)
}
}
}
func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) { func (e *ExchangeOrderExecutor) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) ([]types.Order, error) {
formattedOrders, err := formatOrders(orders, e.session) formattedOrders, err := formatOrders(orders, e.session)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// e.Notify(":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order) for _, order := range formattedOrders {
// pass submit order as an interface object.
channel, ok := e.RouteObject(&order)
if ok {
e.NotifyTo(channel, ":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order)
} else {
e.Notify(":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order)
}
}
e.notifySubmitOrders(formattedOrders...)
return e.session.Exchange.SubmitOrders(ctx, formattedOrders...) return e.session.Exchange.SubmitOrders(ctx, formattedOrders...)
} }
@ -69,7 +91,6 @@ func (e *BasicRiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders
market := order.Market market := order.Market
quantity := order.Quantity quantity := order.Quantity
balances := e.session.Account.Balances() balances := e.session.Account.Balances()
switch order.Side { switch order.Side {
@ -140,10 +161,10 @@ func (e *BasicRiskControlOrderExecutor) SubmitOrders(ctx context.Context, orders
} }
formattedOrders = append(formattedOrders, o) formattedOrders = append(formattedOrders, o)
e.Notify(":memo: Submitting %s %s %s order with quantity %s @ %s", o.Symbol, o.Side, o.Type, o.QuantityString, o.PriceString, &o)
} }
e.notifySubmitOrders(formattedOrders...)
return e.session.Exchange.SubmitOrders(ctx, formattedOrders...) return e.session.Exchange.SubmitOrders(ctx, formattedOrders...)
} }

View File

@ -6,8 +6,6 @@ import (
"github.com/robfig/cron/v3" "github.com/robfig/cron/v3"
"github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
) )
type PnLReporter interface { type PnLReporter interface {
@ -101,6 +99,10 @@ func (router *PatternChannelRouter) AddRoute(routes map[string]string) {
return return
} }
if router.routes == nil {
router.routes = make(map[*regexp.Regexp]string)
}
for pattern, channel := range routes { for pattern, channel := range routes {
router.routes[regexp.MustCompile(pattern)] = channel router.routes[regexp.MustCompile(pattern)] = channel
} }
@ -143,44 +145,8 @@ func (router *ObjectChannelRouter) Route(obj interface{}) (channel string, ok bo
type TradeReporter struct { type TradeReporter struct {
*Notifiability *Notifiability
channel string
channelRoutes map[*regexp.Regexp]string
} }
func NewTradeReporter(notifiability *Notifiability) *TradeReporter { const TemplateTradeReport = `:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`
return &TradeReporter{
Notifiability: notifiability,
channelRoutes: make(map[*regexp.Regexp]string),
}
}
func (reporter *TradeReporter) Channel(channel string) *TradeReporter { const TemplateOrderReport = `:handshake: {{ .Symbol }} {{ .Side }} Order Update @ {{ .Price }}`
reporter.channel = channel
return reporter
}
func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter {
for pattern, channel := range routes {
reporter.channelRoutes[regexp.MustCompile(pattern)] = channel
}
return reporter
}
func (reporter *TradeReporter) getChannel(symbol string) string {
for pattern, channel := range reporter.channelRoutes {
if pattern.MatchString(symbol) {
return channel
}
}
return reporter.channel
}
func (reporter *TradeReporter) Report(trade types.Trade) {
var channel = reporter.getChannel(trade.Symbol)
var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade)
reporter.NotifyTo(channel, text, trade)
}

View File

@ -86,6 +86,10 @@ func (set *StandardIndicatorSet) GetEWMA(iw types.IntervalWindow) *indicator.EWM
// ExchangeSession presents the exchange connection session // ExchangeSession presents the exchange connection session
// It also maintains and collects the data returned from the stream. // It also maintains and collects the data returned from the stream.
type ExchangeSession struct { type ExchangeSession struct {
// exchange session based notification system
// we make it as a value field so that we can configure it separately
Notifiability
// Exchange session name // Exchange session name
Name string Name string
@ -119,6 +123,12 @@ type ExchangeSession struct {
func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession { func NewExchangeSession(name string, exchange types.Exchange) *ExchangeSession {
return &ExchangeSession{ return &ExchangeSession{
Notifiability: Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
},
Name: name, Name: name,
Exchange: exchange, Exchange: exchange,
Stream: exchange.NewStream(), Stream: exchange.NewStream(),

View File

@ -36,16 +36,12 @@ type CrossExchangeStrategy interface {
} }
type Trader struct { type Trader struct {
Notifiability
environment *Environment environment *Environment
riskControls *RiskControls riskControls *RiskControls
crossExchangeStrategies []CrossExchangeStrategy crossExchangeStrategies []CrossExchangeStrategy
exchangeStrategies map[string][]SingleExchangeStrategy exchangeStrategies map[string][]SingleExchangeStrategy
tradeReporter *TradeReporter
} }
func NewTrader(environ *Environment) *Trader { func NewTrader(environ *Environment) *Trader {
@ -55,11 +51,6 @@ func NewTrader(environ *Environment) *Trader {
} }
} }
func (trader *Trader) ReportTrade() *TradeReporter {
trader.tradeReporter = NewTradeReporter(&trader.Notifiability)
return trader.tradeReporter
}
// AttachStrategyOn attaches the single exchange strategy on an exchange session. // AttachStrategyOn attaches the single exchange strategy on an exchange session.
// Single exchange strategy is the default behavior. // Single exchange strategy is the default behavior.
func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader { func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader {
@ -102,23 +93,13 @@ func (trader *Trader) Run(ctx context.Context) error {
return err return err
} }
// session based trade reporter
for sessionName := range trader.environment.sessions {
var session = trader.environment.sessions[sessionName]
if trader.tradeReporter != nil {
session.Stream.OnTradeUpdate(func(trade types.Trade) {
trader.tradeReporter.Report(trade)
})
}
}
// load and run session strategies // load and run session strategies
for sessionName, strategies := range trader.exchangeStrategies { for sessionName, strategies := range trader.exchangeStrategies {
var session = trader.environment.sessions[sessionName] var session = trader.environment.sessions[sessionName]
var baseOrderExecutor = &ExchangeOrderExecutor{ var baseOrderExecutor = &ExchangeOrderExecutor{
// copy the parent notifiers and session // copy the environment notification system so that we can route
Notifiability: trader.Notifiability, Notifiability: trader.environment.Notifiability,
session: session, session: session,
} }
@ -146,7 +127,7 @@ func (trader *Trader) Run(ctx context.Context) error {
// get the struct element // get the struct element
rs = rs.Elem() rs = rs.Elem()
if err := injectField(rs, "Notifiability", &trader.Notifiability, false); err != nil { if err := injectField(rs, "Notifiability", &trader.environment.Notifiability, false); err != nil {
log.WithError(err).Errorf("strategy Notifiability injection failed") log.WithError(err).Errorf("strategy Notifiability injection failed")
} }
@ -192,8 +173,7 @@ func (trader *Trader) Run(ctx context.Context) error {
} }
router := &ExchangeOrderExecutionRouter{ router := &ExchangeOrderExecutionRouter{
// copy the parent notifiers Notifiability: trader.environment.Notifiability,
Notifiability: trader.Notifiability,
sessions: trader.environment.sessions, sessions: trader.environment.sessions,
} }
@ -311,7 +291,7 @@ func (trader *OrderExecutor) RunStrategy(ctx context.Context, strategy SingleExc
// ReportPnL configure and set the PnLReporter with the given notifier // ReportPnL configure and set the PnLReporter with the given notifier
func (trader *Trader) ReportPnL() *PnLReporterManager { func (trader *Trader) ReportPnL() *PnLReporterManager {
return NewPnLReporter(&trader.Notifiability) return NewPnLReporter(&trader.environment.Notifiability)
} }
type OrderExecutor interface { type OrderExecutor interface {

View File

@ -18,7 +18,6 @@ import (
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/notifier/slacknotifier" "github.com/c9s/bbgo/pkg/notifier/slacknotifier"
@ -98,10 +97,8 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
} }
} }
trader := bbgo.NewTrader(environ)
// configure notifiers notification := bbgo.Notifiability{
trader.Notifiability = bbgo.Notifiability{
SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil), SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil),
SessionChannelRouter: bbgo.NewPatternChannelRouter(nil), SessionChannelRouter: bbgo.NewPatternChannelRouter(nil),
ObjectChannelRouter: bbgo.NewObjectChannelRouter(), ObjectChannelRouter: bbgo.NewObjectChannelRouter(),
@ -118,57 +115,18 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
log.Infof("adding slack notifier with default channel: %s", conf.DefaultChannel) log.Infof("adding slack notifier with default channel: %s", conf.DefaultChannel)
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel) var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
trader.AddNotifier(notifier) notification.AddNotifier(notifier)
} }
} }
// configure rules environ.Notifiability = notification
if conf := userConfig.Notifications; conf != nil {
// configure routing here if userConfig.Notifications != nil {
if conf.SymbolChannels != nil { environ.ConfigureNotification(userConfig.Notifications)
trader.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
}
if conf.SessionChannels != nil {
trader.SessionChannelRouter.AddRoute(conf.SessionChannels)
} }
if conf.Routing != nil {
if conf.Routing.Trade == "$symbol" {
trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
trade, matched := obj.(*types.Trade)
if !matched {
return
}
channel, ok = trader.SymbolChannelRouter.Route(trade.Symbol)
return
})
}
if conf.Routing.Order == "$symbol" { trader := bbgo.NewTrader(environ)
trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
order, matched := obj.(*types.Order)
if !matched {
return
}
channel, ok = trader.SymbolChannelRouter.Route(order.Symbol)
return
})
}
if conf.Routing.PnL == "$symbol" {
trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
report, matched := obj.(*pnl.AverageCostPnlReport)
if !matched {
return
}
channel, ok = trader.SymbolChannelRouter.Route(report.Symbol)
return
})
}
}
}
trader.ReportTrade()
if userConfig.RiskControls != nil { if userConfig.RiskControls != nil {
trader.SetRiskControls(userConfig.RiskControls) trader.SetRiskControls(userConfig.RiskControls)