configure channel routers

This commit is contained in:
c9s 2020-10-27 09:38:29 +08:00
parent 42f947506c
commit 8453e95300
5 changed files with 100 additions and 83 deletions

View File

@ -5,16 +5,25 @@ imports:
notifications:
slack:
defaultChannel: "dev-bbgo"
errorChannel: "bbgo-error"
defaultChannel: "#dev-bbgo"
errorChannel: "#error"
reportTrades:
channelBySymbol:
"btcusdt": "bbgo-btcusdt"
"ethusdt": "bbgo-ethusdt"
"bnbusdt": "bbgo-bnbusdt"
"sxpusdt": "bbgo-sxpusdt"
"maxusdt": "max-maxusdt"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "#btc"
"^ETH": "#eth"
# if you want to route channel by exchange session
sessionChannels:
max: "#bbgo-max"
binance: "#bbgo-binance"
# routing rules
routing:
trade: "$symbol"
order: "$symbol"
submitOrder: "$session"
pnL: "#bbgo-pnl"
reportPnL:
- averageCostBySymbols:

View File

@ -4,12 +4,6 @@ notifications:
defaultChannel: "bbgo"
errorChannel: "bbgo-error"
reportTrades:
channelBySymbol:
"btcusdt": "bbgo-btcusdt"
"ethusdt": "bbgo-ethusdt"
"bnbusdt": "bbgo-bnbusdt"
reportPnL:
- averageCostBySymbols:
- "BTCUSDT"

View File

@ -4,14 +4,6 @@ notifications:
defaultChannel: "bbgo"
errorChannel: "bbgo-error"
reportTrades:
channelBySymbol:
"btcusdt": "bbgo-btcusdt"
"ethusdt": "bbgo-ethusdt"
"bnbusdt": "bbgo-bnbusdt"
"sxpusdt": "bbgo-sxpusdt"
"maxusdt": "max-maxusdt"
reportPnL:
- averageCostBySymbols:
- "BTCUSDT"

View File

@ -17,21 +17,21 @@ func (n *NullNotifier) Notify(format string, args ...interface{}) error {
type Notifiability struct {
notifiers []Notifier
sessionChannelRouter *PatternChannelRouter
symbolChannelRouter *PatternChannelRouter
objectChannelRouter *ObjectChannelRouter
SessionChannelRouter *PatternChannelRouter
SymbolChannelRouter *PatternChannelRouter
ObjectChannelRouter *ObjectChannelRouter
}
func (m *Notifiability) RouteSymbol(symbol string) (channel string, ok bool) {
return m.symbolChannelRouter.Route(symbol)
return m.SymbolChannelRouter.Route(symbol)
}
func (m *Notifiability) RouteSession(session string) (channel string, ok bool) {
return m.sessionChannelRouter.Route(session)
return m.SessionChannelRouter.Route(session)
}
func (m *Notifiability) RouteObject(obj interface{}) (channel string, ok bool) {
return m.objectChannelRouter.Route(obj)
return m.ObjectChannelRouter.Route(obj)
}
func (m *Notifiability) AddNotifier(notifier Notifier) {

View File

@ -18,6 +18,7 @@ import (
flag "github.com/spf13/pflag"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
@ -67,67 +68,14 @@ func compileRunFile(filepath string, config *bbgo.Config) error {
}
func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
// configure notifiers
notifierSet := &bbgo.Notifiability{}
if conf := userConfig.Notifications; conf != nil {
// configure routing here
var symbolChannelRouter = bbgo.NewPatternChannelRouter(conf.SymbolChannels)
var sessionChannelRouter = bbgo.NewPatternChannelRouter(conf.SessionChannels)
var objectChannelRouter = bbgo.NewObjectChannelRouter()
_ = sessionChannelRouter
if conf.Routing != nil {
switch conf.Routing.Trade {
case "$session":
case "$symbol":
objectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
if trade, matched := obj.(*types.Trade); matched {
if channel, ok = symbolChannelRouter.Route(trade.Symbol); ok {
return
}
}
return
})
default:
}
}
}
// for slack
slackToken := viper.GetString("slack-token")
if len(slackToken) > 0 {
if conf := userConfig.Notifications.Slack; conf != nil {
if conf.ErrorChannel != "" {
log.Infof("found slack configured, setting up log hook...")
log.AddHook(slacklog.NewLogHook(slackToken, conf.ErrorChannel))
}
log.Infof("adding slack notifier...")
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
notifierSet.AddNotifier(notifier)
}
}
environ := bbgo.NewEnvironment()
db, err := cmdutil.ConnectMySQL()
if err != nil {
return err
}
environ := bbgo.NewEnvironment()
environ.SyncTrades(db)
trader := bbgo.NewTrader(environ)
trader.AddNotifier(notifierSet)
trader.ReportTrade()
if len(userConfig.Sessions) == 0 {
for _, n := range bbgo.SupportedExchanges {
if viper.IsSet(string(n) + "-api-key") {
@ -154,6 +102,80 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
}
}
trader := bbgo.NewTrader(environ)
// configure notifiers
notifierSet := &bbgo.Notifiability{
SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil),
SessionChannelRouter: bbgo.NewPatternChannelRouter(nil),
ObjectChannelRouter: bbgo.NewObjectChannelRouter(),
}
// for slack
slackToken := viper.GetString("slack-token")
if len(slackToken) > 0 {
if conf := userConfig.Notifications.Slack; conf != nil {
if conf.ErrorChannel != "" {
log.Infof("found slack configured, setting up log hook...")
log.AddHook(slacklog.NewLogHook(slackToken, conf.ErrorChannel))
}
log.Infof("adding slack notifier...")
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
notifierSet.AddNotifier(notifier)
}
}
// configure rules
if conf := userConfig.Notifications; conf != nil {
// configure routing here
if conf.SymbolChannels != nil {
notifierSet.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
}
if conf.SessionChannels != nil {
notifierSet.SessionChannelRouter.AddRoute(conf.SessionChannels)
}
if conf.Routing != nil {
if conf.Routing.Trade == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
trade, matched := obj.(*types.Trade)
if !matched {
return
}
channel, ok = notifierSet.SymbolChannelRouter.Route(trade.Symbol)
return
})
}
if conf.Routing.Order == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
order, matched := obj.(*types.Order)
if !matched {
return
}
channel, ok = notifierSet.SymbolChannelRouter.Route(order.Symbol)
return
})
}
if conf.Routing.PnL == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
report, matched := obj.(*pnl.AverageCostPnlReport)
if !matched {
return
}
channel, ok = notifierSet.SymbolChannelRouter.Route(report.Symbol)
return
})
}
}
}
trader.AddNotifier(notifierSet)
trader.ReportTrade()
for _, entry := range userConfig.ExchangeStrategies {
for _, mount := range entry.Mounts {
log.Infof("attaching strategy %T on %s...", entry.Strategy, mount)