From 955479486ae4e16865cfed20ebacd460c61841e0 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 27 Oct 2020 08:17:42 +0800 Subject: [PATCH] add symbol channel router and object channel router for notification --- pkg/bbgo/environment.go | 64 --------------------------- pkg/bbgo/reporter.go | 95 +++++++++++++++++++++++++++++++++++++++++ pkg/bbgo/trader.go | 17 ++++++++ pkg/cmd/run.go | 3 +- 4 files changed, 114 insertions(+), 65 deletions(-) diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 85c189abd..a3d8eb6ae 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -3,7 +3,6 @@ package bbgo import ( "context" "fmt" - "regexp" "strings" "time" @@ -14,7 +13,6 @@ import ( "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/store" "github.com/c9s/bbgo/pkg/types" - "github.com/c9s/bbgo/pkg/util" ) var LoadedExchangeStrategies = make(map[string]SingleExchangeStrategy) @@ -29,52 +27,6 @@ func RegisterCrossExchangeStrategy(key string, configmap CrossExchangeStrategy) LoadedCrossExchangeStrategies[key] = configmap } -type TradeReporter struct { - notifier Notifier - - channel string - channelRoutes map[*regexp.Regexp]string -} - -func NewTradeReporter(notifier Notifier) *TradeReporter { - return &TradeReporter{ - notifier: notifier, - channelRoutes: make(map[*regexp.Regexp]string), - } -} - -func (reporter *TradeReporter) Channel(channel string) *TradeReporter { - reporter.channel = channel - return reporter -} - -func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter { - for pattern, channel := range routes { - reporter.channelRoutes[regexp.MustCompile(pattern)] = channel - } - - return reporter -} - -func (reporter *TradeReporter) getChannel(symbol string) string { - for pattern, channel := range reporter.channelRoutes { - if pattern.MatchString(symbol) { - return channel - } - } - - return reporter.channel -} - -func (reporter *TradeReporter) Report(trade types.Trade) { - var channel = reporter.getChannel(trade.Symbol) - - var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade) - if err := reporter.notifier.NotifyTo(channel, text, trade); err != nil { - log.WithError(err).Errorf("notifier error, channel=%s", channel) - } -} - // Environment presents the real exchange data layer type Environment struct { TradeService *service.TradeService @@ -86,7 +38,6 @@ type Environment struct { tradeReporter *TradeReporter } - func NewEnvironment() *Environment { return &Environment{ // default trade scan time @@ -110,11 +61,6 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s return session } -func (environ *Environment) ReportTrade(notifier Notifier) *TradeReporter { - environ.tradeReporter = NewTradeReporter(notifier) - return environ.tradeReporter -} - func (environ *Environment) Init(ctx context.Context) (err error) { for _, session := range environ.sessions { var markets types.MarketMap @@ -131,16 +77,6 @@ func (environ *Environment) Init(ctx context.Context) (err error) { } session.markets = markets - - if session.tradeReporter != nil { - session.Stream.OnTrade(func(trade types.Trade) { - session.tradeReporter.Report(trade) - }) - } else if environ.tradeReporter != nil { - session.Stream.OnTrade(func(trade types.Trade) { - environ.tradeReporter.Report(trade) - }) - } } return nil diff --git a/pkg/bbgo/reporter.go b/pkg/bbgo/reporter.go index 43aa8895c..59d954b3a 100644 --- a/pkg/bbgo/reporter.go +++ b/pkg/bbgo/reporter.go @@ -1,9 +1,14 @@ package bbgo import ( + "regexp" + "github.com/robfig/cron/v3" + "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/accounting/pnl" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" ) type PnLReporter interface { @@ -77,3 +82,93 @@ func (reporter *AverageCostPnLReporter) Run() { } } } + +type SymbolChannelRouter struct { + routes map[*regexp.Regexp]string +} + +func (router *SymbolChannelRouter) RouteSymbols(routes map[string]string) *SymbolChannelRouter { + for pattern, channel := range routes { + router.routes[regexp.MustCompile(pattern)] = channel + } + + return router +} + +func (router *SymbolChannelRouter) Dispatch(symbol string) (channel string, ok bool) { + for pattern, channel := range router.routes { + if pattern.MatchString(symbol) { + ok = true + return channel, ok + } + } + + return channel, ok +} + +type ObjectChannelHandler func(obj interface{}) (channel string, ok bool) + +type ObjectChannelRouter struct { + routes []ObjectChannelHandler +} + +func (router *ObjectChannelRouter) Route(f ObjectChannelHandler) *ObjectChannelRouter { + router.routes = append(router.routes, f) + return router +} + +func (router *ObjectChannelRouter) Dispatch(obj interface{}) (channel string, ok bool) { + for _, f := range router.routes { + channel, ok = f(obj) + if ok { + return + } + } + return +} + +type TradeReporter struct { + notifier Notifier + + channel string + channelRoutes map[*regexp.Regexp]string +} + +func NewTradeReporter(notifier Notifier) *TradeReporter { + return &TradeReporter{ + notifier: notifier, + channelRoutes: make(map[*regexp.Regexp]string), + } +} + +func (reporter *TradeReporter) Channel(channel string) *TradeReporter { + reporter.channel = channel + return reporter +} + +func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter { + for pattern, channel := range routes { + reporter.channelRoutes[regexp.MustCompile(pattern)] = channel + } + + return reporter +} + +func (reporter *TradeReporter) getChannel(symbol string) string { + for pattern, channel := range reporter.channelRoutes { + if pattern.MatchString(symbol) { + return channel + } + } + + return reporter.channel +} + +func (reporter *TradeReporter) Report(trade types.Trade) { + var channel = reporter.getChannel(trade.Symbol) + + var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade) + if err := reporter.notifier.NotifyTo(channel, text, trade); err != nil { + logrus.WithError(err).Errorf("notifier error, channel=%s", channel) + } +} diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index 85898917c..9be0a743b 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -40,6 +40,7 @@ type Trader struct { crossExchangeStrategies []CrossExchangeStrategy exchangeStrategies map[string][]SingleExchangeStrategy + tradeReporter *TradeReporter // reportTimer *time.Timer // ProfitAndLossCalculator *accounting.ProfitAndLossCalculator } @@ -51,6 +52,12 @@ func NewTrader(environ *Environment) *Trader { } } +func (trader *Trader) ReportTrade() *TradeReporter { + trader.tradeReporter = NewTradeReporter(&trader.Notifiability) + return trader.tradeReporter +} + + // AttachStrategyOn attaches the single exchange strategy on an exchange session. // Single exchange strategy is the default behavior. func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader { @@ -85,6 +92,16 @@ func (trader *Trader) Run(ctx context.Context) error { for sessionName, strategies := range trader.exchangeStrategies { session := trader.environment.sessions[sessionName] + if session.tradeReporter != nil { + session.Stream.OnTrade(func(trade types.Trade) { + session.tradeReporter.Report(trade) + }) + } else if trader.tradeReporter != nil { + session.Stream.OnTrade(func(trade types.Trade) { + trader.tradeReporter.Report(trade) + }) + } + // We can move this to the exchange session, // that way we can mount the notification on the exchange with DSL diff --git a/pkg/cmd/run.go b/pkg/cmd/run.go index 9bf158903..4338725de 100644 --- a/pkg/cmd/run.go +++ b/pkg/cmd/run.go @@ -88,11 +88,12 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error { environ := bbgo.NewEnvironment() environ.SyncTrades(db) - environ.ReportTrade(notifierSet) trader := bbgo.NewTrader(environ) trader.AddNotifier(notifierSet) + trader.ReportTrade() + if len(userConfig.Sessions) == 0 { for _, n := range bbgo.SupportedExchanges { if viper.IsSet(string(n) + "-api-key") {