add symbol channel router and object channel router for notification

This commit is contained in:
c9s 2020-10-27 08:17:42 +08:00
parent 1d8e0bff5a
commit 955479486a
4 changed files with 114 additions and 65 deletions

View File

@ -3,7 +3,6 @@ package bbgo
import (
"context"
"fmt"
"regexp"
"strings"
"time"
@ -14,7 +13,6 @@ import (
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/store"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
var LoadedExchangeStrategies = make(map[string]SingleExchangeStrategy)
@ -29,52 +27,6 @@ func RegisterCrossExchangeStrategy(key string, configmap CrossExchangeStrategy)
LoadedCrossExchangeStrategies[key] = configmap
}
type TradeReporter struct {
notifier Notifier
channel string
channelRoutes map[*regexp.Regexp]string
}
func NewTradeReporter(notifier Notifier) *TradeReporter {
return &TradeReporter{
notifier: notifier,
channelRoutes: make(map[*regexp.Regexp]string),
}
}
func (reporter *TradeReporter) Channel(channel string) *TradeReporter {
reporter.channel = channel
return reporter
}
func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter {
for pattern, channel := range routes {
reporter.channelRoutes[regexp.MustCompile(pattern)] = channel
}
return reporter
}
func (reporter *TradeReporter) getChannel(symbol string) string {
for pattern, channel := range reporter.channelRoutes {
if pattern.MatchString(symbol) {
return channel
}
}
return reporter.channel
}
func (reporter *TradeReporter) Report(trade types.Trade) {
var channel = reporter.getChannel(trade.Symbol)
var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade)
if err := reporter.notifier.NotifyTo(channel, text, trade); err != nil {
log.WithError(err).Errorf("notifier error, channel=%s", channel)
}
}
// Environment presents the real exchange data layer
type Environment struct {
TradeService *service.TradeService
@ -86,7 +38,6 @@ type Environment struct {
tradeReporter *TradeReporter
}
func NewEnvironment() *Environment {
return &Environment{
// default trade scan time
@ -110,11 +61,6 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s
return session
}
func (environ *Environment) ReportTrade(notifier Notifier) *TradeReporter {
environ.tradeReporter = NewTradeReporter(notifier)
return environ.tradeReporter
}
func (environ *Environment) Init(ctx context.Context) (err error) {
for _, session := range environ.sessions {
var markets types.MarketMap
@ -131,16 +77,6 @@ func (environ *Environment) Init(ctx context.Context) (err error) {
}
session.markets = markets
if session.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
session.tradeReporter.Report(trade)
})
} else if environ.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
environ.tradeReporter.Report(trade)
})
}
}
return nil

View File

@ -1,9 +1,14 @@
package bbgo
import (
"regexp"
"github.com/robfig/cron/v3"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
type PnLReporter interface {
@ -77,3 +82,93 @@ func (reporter *AverageCostPnLReporter) Run() {
}
}
}
type SymbolChannelRouter struct {
routes map[*regexp.Regexp]string
}
func (router *SymbolChannelRouter) RouteSymbols(routes map[string]string) *SymbolChannelRouter {
for pattern, channel := range routes {
router.routes[regexp.MustCompile(pattern)] = channel
}
return router
}
func (router *SymbolChannelRouter) Dispatch(symbol string) (channel string, ok bool) {
for pattern, channel := range router.routes {
if pattern.MatchString(symbol) {
ok = true
return channel, ok
}
}
return channel, ok
}
type ObjectChannelHandler func(obj interface{}) (channel string, ok bool)
type ObjectChannelRouter struct {
routes []ObjectChannelHandler
}
func (router *ObjectChannelRouter) Route(f ObjectChannelHandler) *ObjectChannelRouter {
router.routes = append(router.routes, f)
return router
}
func (router *ObjectChannelRouter) Dispatch(obj interface{}) (channel string, ok bool) {
for _, f := range router.routes {
channel, ok = f(obj)
if ok {
return
}
}
return
}
type TradeReporter struct {
notifier Notifier
channel string
channelRoutes map[*regexp.Regexp]string
}
func NewTradeReporter(notifier Notifier) *TradeReporter {
return &TradeReporter{
notifier: notifier,
channelRoutes: make(map[*regexp.Regexp]string),
}
}
func (reporter *TradeReporter) Channel(channel string) *TradeReporter {
reporter.channel = channel
return reporter
}
func (reporter *TradeReporter) ChannelBySymbol(routes map[string]string) *TradeReporter {
for pattern, channel := range routes {
reporter.channelRoutes[regexp.MustCompile(pattern)] = channel
}
return reporter
}
func (reporter *TradeReporter) getChannel(symbol string) string {
for pattern, channel := range reporter.channelRoutes {
if pattern.MatchString(symbol) {
return channel
}
}
return reporter.channel
}
func (reporter *TradeReporter) Report(trade types.Trade) {
var channel = reporter.getChannel(trade.Symbol)
var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade)
if err := reporter.notifier.NotifyTo(channel, text, trade); err != nil {
logrus.WithError(err).Errorf("notifier error, channel=%s", channel)
}
}

View File

@ -40,6 +40,7 @@ type Trader struct {
crossExchangeStrategies []CrossExchangeStrategy
exchangeStrategies map[string][]SingleExchangeStrategy
tradeReporter *TradeReporter
// reportTimer *time.Timer
// ProfitAndLossCalculator *accounting.ProfitAndLossCalculator
}
@ -51,6 +52,12 @@ func NewTrader(environ *Environment) *Trader {
}
}
func (trader *Trader) ReportTrade() *TradeReporter {
trader.tradeReporter = NewTradeReporter(&trader.Notifiability)
return trader.tradeReporter
}
// AttachStrategyOn attaches the single exchange strategy on an exchange session.
// Single exchange strategy is the default behavior.
func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader {
@ -85,6 +92,16 @@ func (trader *Trader) Run(ctx context.Context) error {
for sessionName, strategies := range trader.exchangeStrategies {
session := trader.environment.sessions[sessionName]
if session.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
session.tradeReporter.Report(trade)
})
} else if trader.tradeReporter != nil {
session.Stream.OnTrade(func(trade types.Trade) {
trader.tradeReporter.Report(trade)
})
}
// We can move this to the exchange session,
// that way we can mount the notification on the exchange with DSL

View File

@ -88,11 +88,12 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
environ := bbgo.NewEnvironment()
environ.SyncTrades(db)
environ.ReportTrade(notifierSet)
trader := bbgo.NewTrader(environ)
trader.AddNotifier(notifierSet)
trader.ReportTrade()
if len(userConfig.Sessions) == 0 {
for _, n := range bbgo.SupportedExchanges {
if viper.IsSet(string(n) + "-api-key") {