diff --git a/cmd/run.go b/cmd/run.go index 0155a6abc..2a972c294 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -56,8 +56,8 @@ var runCmd = &cobra.Command{ log.AddHook(slacklog.NewLogHook(slackToken, viper.GetString("slack-error-channel"))) - var notifier = slacknotifier.New(slackToken) - _ = notifier + // TODO: load channel from config file + var notifier = slacknotifier.New(slackToken, viper.GetString("slack-channel")) db, err := cmdutil.ConnectMySQL() if err != nil { @@ -65,6 +65,8 @@ var runCmd = &cobra.Command{ } environ := bbgo.NewDefaultEnvironment(db) + environ.ReportTrade(notifier) + trader := bbgo.NewTrader(environ) for _, entry := range userConfig.ExchangeStrategies { diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index 5037e54a2..9a46565cd 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -72,7 +72,7 @@ func (reporter *TradeReporter) Report(trade types.Trade) { var channel = reporter.getChannel(trade.Symbol) var text = util.Render(`:handshake: {{ .Symbol }} {{ .Side }} Trade Execution @ {{ .Price }}`, trade) - if err := reporter.notifier.Notify(channel, text, trade); err != nil { + if err := reporter.notifier.NotifyTo(channel, text, trade); err != nil { log.WithError(err).Error("notifier error") } } @@ -84,6 +84,8 @@ type Environment struct { tradeScanTime time.Time sessions map[string]*ExchangeSession + + tradeReporter *TradeReporter } // NewDefaultEnvironment prepares the exchange sessions from the viper settings. @@ -122,6 +124,11 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s return session } +func (environ *Environment) ReportTrade(notifier Notifier) *TradeReporter { + environ.tradeReporter = NewTradeReporter(notifier) + return environ.tradeReporter +} + func (environ *Environment) Init(ctx context.Context) (err error) { for _, session := range environ.sessions { var markets types.MarketMap @@ -138,6 +145,12 @@ func (environ *Environment) Init(ctx context.Context) (err error) { } session.markets = markets + + if environ.tradeReporter != nil { + session.Stream.OnTrade(func(trade types.Trade) { + environ.tradeReporter.Report(trade) + }) + } } return nil diff --git a/pkg/bbgo/notifier.go b/pkg/bbgo/notifier.go index e6285bd80..5c01b9406 100644 --- a/pkg/bbgo/notifier.go +++ b/pkg/bbgo/notifier.go @@ -1,11 +1,16 @@ package bbgo type Notifier interface { - Notify(channel, format string, args ...interface{}) error + NotifyTo(channel, format string, args ...interface{}) error + Notify(format string, args ...interface{}) error } type NullNotifier struct{} +func (n *NullNotifier) NotifyTo(channel, format string, args ...interface{}) error { + return nil +} + func (n *NullNotifier) Notify(format string, args ...interface{}) error { return nil } diff --git a/pkg/bbgo/trader.go b/pkg/bbgo/trader.go index a2ea87055..073463a45 100644 --- a/pkg/bbgo/trader.go +++ b/pkg/bbgo/trader.go @@ -41,7 +41,7 @@ func (m *Notifiability) AddNotifier(notifier Notifier) { func (m *Notifiability) Notify(msg string, args ...interface{}) { for _, n := range m.notifiers { - n.Notify("", msg, args...) + n.NotifyTo("", msg, args...) } } @@ -164,7 +164,7 @@ func (trader *OrderExecutor) RunStrategyWithHotReload(ctx context.Context, strat log.WithError(err).Error("error load config file") } - trader.Notify("config reloaded, restarting trader") + trader.NotifyTo("config reloaded, restarting trader") traderDone, err = trader.RunStrategy(strategyContext, strategy) if err != nil { @@ -232,22 +232,6 @@ func (trader *Trader) reportPnL() { } */ -/* -func (trader *Trader) NotifyPnL(report *accounting.ProfitAndLossReport) { - for _, n := range trader.notifiers { - n.NotifyPnL(report) - } -} -*/ - -/* -func (trader *Trader) NotifyTrade(trade *types.Trade) { - for _, n := range trader.notifiers { - n.NotifyTrade(trade) - } -} -*/ - func (trader *Trader) SubmitOrder(ctx context.Context, order types.SubmitOrder) { trader.Notify(":memo: Submitting %s %s %s order with quantity: %s", order.Symbol, order.Type, order.Side, order.QuantityString, order) diff --git a/pkg/exchange/binance/stream.go b/pkg/exchange/binance/stream.go index 8ad958ff9..bf9b9bd0e 100644 --- a/pkg/exchange/binance/stream.go +++ b/pkg/exchange/binance/stream.go @@ -282,7 +282,7 @@ func (s *Stream) read(ctx context.Context) { continue } - // log.Notify("[binance] event: %+v", e) + // log.NotifyTo("[binance] event: %+v", e) switch e := e.(type) { case *OutboundAccountInfoEvent: diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 81cd7068b..3e9306892 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -13,16 +13,18 @@ type SlackAttachmentCreator interface { } type Notifier struct { - client *slack.Client + client *slack.Client + channel string } type NotifyOption func(notifier *Notifier) -func New(token string, options ...NotifyOption) *Notifier { +func New(token, channel string, options ...NotifyOption) *Notifier { var client = slack.New(token, slack.OptionDebug(true)) notifier := &Notifier{ - client: client, + channel: channel, + client: client, } for _, o := range options { @@ -32,7 +34,11 @@ func New(token string, options ...NotifyOption) *Notifier { return notifier } -func (n *Notifier) Notify(channel, format string, args ...interface{}) error { +func (n *Notifier) Notify(format string, args ...interface{}) error { + return n.NotifyTo(n.channel, format, args...) +} + +func (n *Notifier) NotifyTo(channel, format string, args ...interface{}) error { var slackAttachments []slack.Attachment var slackArgsOffset = -1 @@ -84,7 +90,7 @@ func (n *Notifier) NotifyTrade(trade *types.Trade) { logrus.WithError(err).Error("slack send error") } } - */ +*/ /* func (n *Notifier) NotifyPnL(report *pnl.AverageCostPnlReport) {