refactor notification configuration

This commit is contained in:
c9s 2021-02-21 16:52:47 +08:00
parent fa4e813729
commit 21b092037e
5 changed files with 183 additions and 189 deletions

View File

@ -58,7 +58,7 @@ type SlackNotification struct {
ErrorChannel string `json:"errorChannel,omitempty" yaml:"errorChannel,omitempty"` ErrorChannel string `json:"errorChannel,omitempty" yaml:"errorChannel,omitempty"`
} }
type NotificationRouting struct { type SlackNotificationRouting struct {
Trade string `json:"trade,omitempty" yaml:"trade,omitempty"` Trade string `json:"trade,omitempty" yaml:"trade,omitempty"`
Order string `json:"order,omitempty" yaml:"order,omitempty"` Order string `json:"order,omitempty" yaml:"order,omitempty"`
SubmitOrder string `json:"submitOrder,omitempty" yaml:"submitOrder,omitempty"` SubmitOrder string `json:"submitOrder,omitempty" yaml:"submitOrder,omitempty"`
@ -71,7 +71,7 @@ type NotificationConfig struct {
SymbolChannels map[string]string `json:"symbolChannels,omitempty" yaml:"symbolChannels,omitempty"` SymbolChannels map[string]string `json:"symbolChannels,omitempty" yaml:"symbolChannels,omitempty"`
SessionChannels map[string]string `json:"sessionChannels,omitempty" yaml:"sessionChannels,omitempty"` SessionChannels map[string]string `json:"sessionChannels,omitempty" yaml:"sessionChannels,omitempty"`
Routing *NotificationRouting `json:"routing,omitempty" yaml:"routing,omitempty"` Routing *SlackNotificationRouting `json:"routing,omitempty" yaml:"routing,omitempty"`
} }
type Session struct { type Session struct {

View File

@ -1,19 +1,29 @@
package bbgo package bbgo
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"image/png"
"io/ioutil"
"os" "os"
"strings"
"sync" "sync"
"time" "time"
"github.com/codingconcepts/env" "github.com/codingconcepts/env"
"github.com/pkg/errors"
"github.com/pquerna/otp"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/viper" "github.com/spf13/viper"
"gopkg.in/tucnak/telebot.v2"
"github.com/c9s/bbgo/pkg/accounting/pnl" "github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
"github.com/c9s/bbgo/pkg/notifier/telegramnotifier"
"github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/slack/slacklog"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
) )
@ -159,7 +169,7 @@ func (environ *Environment) AddExchange(name string, exchange types.Exchange) (s
return environ.AddExchangeSession(name, session) return environ.AddExchangeSession(name, session)
} }
func (environ *Environment) AddExchangesFromConfig(userConfig *Config) error { func (environ *Environment) ConfigureExchangeSessions(userConfig *Config) error {
if len(userConfig.Sessions) == 0 { if len(userConfig.Sessions) == 0 {
return environ.AddExchangesByViperKeys() return environ.AddExchangesByViperKeys()
} }
@ -285,7 +295,7 @@ func (environ *Environment) ConfigurePersistence(conf *PersistenceConfig) error
// configure notification rules // configure notification rules
// for symbol-based routes, we should register the same symbol rules for each session. // for symbol-based routes, we should register the same symbol rules for each session.
// for session-based routes, we should set the fixed callbacks for each session // for session-based routes, we should set the fixed callbacks for each session
func (environ *Environment) ConfigureNotification(conf *NotificationConfig) error { func (environ *Environment) ConfigureNotificationRouting(conf *NotificationConfig) error {
// configure routing here // configure routing here
if conf.SymbolChannels != nil { if conf.SymbolChannels != nil {
environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels) environ.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
@ -524,3 +534,165 @@ func getSessionSymbols(session *ExchangeSession, defaultSymbols ...string) ([]st
return session.FindPossibleSymbols() return session.FindPossibleSymbols()
} }
func (environ *Environment) ConfigureNotificationSystem(userConfig *Config) error {
// Configure persistence service, by default we will use memory service
var persistence service.PersistenceService = environ.PersistenceServiceFacade.Memory
if environ.PersistenceServiceFacade.Redis != nil {
persistence = environ.PersistenceServiceFacade.Redis
}
environ.Notifiability = Notifiability{
SymbolChannelRouter: NewPatternChannelRouter(nil),
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
}
slackToken := viper.GetString("slack-token")
if len(slackToken) > 0 && userConfig.Notifications != nil {
if conf := userConfig.Notifications.Slack; conf != nil {
if conf.ErrorChannel != "" {
log.Infof("found slack configured, setting up log hook...")
log.AddHook(slacklog.NewLogHook(slackToken, conf.ErrorChannel))
}
log.Infof("adding slack notifier with default channel: %s", conf.DefaultChannel)
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
environ.AddNotifier(notifier)
}
}
telegramBotToken := viper.GetString("telegram-bot-token")
if len(telegramBotToken) > 0 {
tt := strings.Split(telegramBotToken, ":")
telegramID := tt[0]
bot, err := telebot.NewBot(telebot.Settings{
// You can also set custom API URL.
// If field is empty it equals to "https://api.telegram.org".
// URL: "http://195.129.111.17:8012",
Token: telegramBotToken,
Poller: &telebot.LongPoller{Timeout: 10 * time.Second},
})
if err != nil {
return err
}
// allocate a store, so that we can save the chatID for the owner
var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID)
var interaction = telegramnotifier.NewInteraction(bot, sessionStore)
authToken := viper.GetString("telegram-bot-auth-token")
if len(authToken) > 0 {
interaction.SetAuthToken(authToken)
log.Info("telegram bot auth token is set, using fixed token for authorization...")
printTelegramAuthTokenGuide(authToken)
}
var session telegramnotifier.Session
if err := sessionStore.Load(&session); err != nil || session.Owner == nil {
log.Warnf("telegram session not found, generating new one-time password key for new telegram session...")
qrcodeImagePath := fmt.Sprintf("otp-%s.png", telegramID)
key, err := setupNewOTPKey(qrcodeImagePath)
if err != nil {
return errors.Wrapf(err, "failed to setup totp (time-based one time password) key")
}
session = telegramnotifier.NewSession(key)
if err := sessionStore.Save(&session); err != nil {
return errors.Wrap(err, "failed to save session")
}
}
go interaction.Start(session)
var notifier = telegramnotifier.New(interaction)
environ.Notifiability.AddNotifier(notifier)
}
if userConfig.Notifications != nil {
if err := environ.ConfigureNotificationRouting(userConfig.Notifications); err != nil {
return err
}
}
return nil
}
func writeOTPKeyAsQRCodePNG(key *otp.Key, imagePath string) error {
// Convert TOTP key into a PNG
var buf bytes.Buffer
img, err := key.Image(512, 512)
if err != nil {
return err
}
if err := png.Encode(&buf, img); err != nil {
return err
}
if err := ioutil.WriteFile(imagePath, buf.Bytes(), 0644); err != nil {
return err
}
return nil
}
// setupNewOTPKey generates a new otp key and save the secret as a qrcode image
func setupNewOTPKey(qrcodeImagePath string) (*otp.Key, error) {
key, err := service.NewDefaultTotpKey()
if err != nil {
return nil, errors.Wrapf(err, "failed to setup totp (time-based one time password) key")
}
printOtpKey(key)
if err := writeOTPKeyAsQRCodePNG(key, qrcodeImagePath); err != nil {
return nil, err
}
printTelegramOtpAuthGuide(qrcodeImagePath)
return key, nil
}
func printOtpKey(key *otp.Key) {
fmt.Println("")
fmt.Println("====================PLEASE STORE YOUR OTP KEY=======================")
fmt.Println("")
fmt.Printf("Issuer: %s\n", key.Issuer())
fmt.Printf("AccountName: %s\n", key.AccountName())
fmt.Printf("Secret: %s\n", key.Secret())
fmt.Printf("Key URL: %s\n", key.URL())
fmt.Println("")
fmt.Println("====================================================================")
fmt.Println("")
}
func printTelegramOtpAuthGuide(qrcodeImagePath string) {
log.Infof("To scan your OTP QR code, please run the following command:")
log.Infof("")
log.Infof("")
log.Infof(" open %s", qrcodeImagePath)
log.Infof("")
log.Infof("")
log.Infof("send the auth command with the generated one-time password to the bbgo bot you created to enable the notification")
log.Infof("")
log.Infof("")
log.Infof(" /auth {code}")
log.Infof("")
log.Infof("")
}
func printTelegramAuthTokenGuide(token string) {
fmt.Println("send the following command to the bbgo bot you created to enable the notification")
fmt.Println("")
fmt.Println("")
fmt.Printf(" /auth %s\n", token)
fmt.Println("")
fmt.Println("")
}

View File

@ -66,7 +66,7 @@ var CancelCmd = &cobra.Command{
return err return err
} }
if err := environ.AddExchangesFromConfig(userConfig); err != nil { if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err return err
} }

View File

@ -1,33 +1,22 @@
package cmd package cmd
import ( import (
"bytes"
"context" "context"
"fmt"
"image/png"
"io/ioutil" "io/ioutil"
"os" "os"
"os/exec" "os/exec"
"path/filepath" "path/filepath"
"strings"
"syscall" "syscall"
"time" "time"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/pquerna/otp"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
flag "github.com/spf13/pflag" flag "github.com/spf13/pflag"
"github.com/spf13/viper"
tb "gopkg.in/tucnak/telebot.v2"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil" "github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
"github.com/c9s/bbgo/pkg/notifier/telegramnotifier"
"github.com/c9s/bbgo/pkg/server" "github.com/c9s/bbgo/pkg/server"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/slack/slacklog"
) )
func init() { func init() {
@ -88,116 +77,23 @@ func runSetup(baseCtx context.Context, userConfig *bbgo.Config, enableApiServer
return nil return nil
} }
func newNotificationSystem(userConfig *bbgo.Config, persistence service.PersistenceService) (*bbgo.Notifiability, error) {
notification := &bbgo.Notifiability{
SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil),
SessionChannelRouter: bbgo.NewPatternChannelRouter(nil),
ObjectChannelRouter: bbgo.NewObjectChannelRouter(),
}
slackToken := viper.GetString("slack-token")
if len(slackToken) > 0 && userConfig.Notifications != nil {
if conf := userConfig.Notifications.Slack; conf != nil {
if conf.ErrorChannel != "" {
log.Infof("found slack configured, setting up log hook...")
log.AddHook(slacklog.NewLogHook(slackToken, conf.ErrorChannel))
}
log.Infof("adding slack notifier with default channel: %s", conf.DefaultChannel)
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
notification.AddNotifier(notifier)
}
}
telegramBotToken := viper.GetString("telegram-bot-token")
if len(telegramBotToken) > 0 {
tt := strings.Split(telegramBotToken, ":")
telegramID := tt[0]
bot, err := tb.NewBot(tb.Settings{
// You can also set custom API URL.
// If field is empty it equals to "https://api.telegram.org".
// URL: "http://195.129.111.17:8012",
Token: telegramBotToken,
Poller: &tb.LongPoller{Timeout: 10 * time.Second},
})
if err != nil {
return nil, err
}
// allocate a store, so that we can save the chatID for the owner
var sessionStore = persistence.NewStore("bbgo", "telegram", telegramID)
var interaction = telegramnotifier.NewInteraction(bot, sessionStore)
authToken := viper.GetString("telegram-bot-auth-token")
if len(authToken) > 0 {
interaction.SetAuthToken(authToken)
log.Info("telegram bot auth token is set, using fixed token for authorization...")
printTelegramAuthTokenGuide(authToken)
}
var session telegramnotifier.Session
if err := sessionStore.Load(&session); err != nil || session.Owner == nil {
log.Warnf("telegram session not found, generating new one-time password key for new telegram session...")
qrcodeImagePath := fmt.Sprintf("otp-%s.png", telegramID)
key, err := setupNewOTPKey(qrcodeImagePath)
if err != nil {
return nil, errors.Wrapf(err, "failed to setup totp (time-based one time password) key")
}
session = telegramnotifier.NewSession(key)
if err := sessionStore.Save(&session); err != nil {
return nil, errors.Wrap(err, "failed to save session")
}
}
go interaction.Start(session)
var notifier = telegramnotifier.New(interaction)
notification.AddNotifier(notifier)
}
return notification, nil
}
func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userConfig *bbgo.Config) error { func BootstrapEnvironment(ctx context.Context, environ *bbgo.Environment, userConfig *bbgo.Config) error {
if err := environ.ConfigureDatabase(ctx); err != nil { if err := environ.ConfigureDatabase(ctx); err != nil {
return err return err
} }
if err := environ.AddExchangesFromConfig(userConfig); err != nil { if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err return err
} }
if userConfig.Persistence != nil { if userConfig.Persistence != nil {
if err := environ.ConfigurePersistence(userConfig.Persistence); err != nil { if err := environ.ConfigurePersistence(userConfig.Persistence); err != nil {
return err return errors.Wrap(err, "persistence configuration error")
} }
} }
// configure persistence service, by default we will use memory service if err := environ.ConfigureNotificationSystem(userConfig) ; err != nil {
var persistence service.PersistenceService = service.NewMemoryService() return errors.Wrap(err,"notification configuration error")
if environ.PersistenceServiceFacade != nil {
if environ.PersistenceServiceFacade.Redis != nil {
persistence = environ.PersistenceServiceFacade.Redis
}
}
notification, err := newNotificationSystem(userConfig, persistence)
if err != nil {
return err
}
environ.Notifiability = *notification
if userConfig.Notifications != nil {
if err := environ.ConfigureNotification(userConfig.Notifications); err != nil {
return err
}
} }
return nil return nil
@ -363,77 +259,3 @@ func buildAndRun(ctx context.Context, userConfig *bbgo.Config, args ...string) (
return runCmd, runCmd.Start() return runCmd, runCmd.Start()
} }
func writeOTPKeyAsQRCodePNG(key *otp.Key, imagePath string) error {
// Convert TOTP key into a PNG
var buf bytes.Buffer
img, err := key.Image(512, 512)
if err != nil {
return err
}
if err := png.Encode(&buf, img); err != nil {
return err
}
if err := ioutil.WriteFile(imagePath, buf.Bytes(), 0644); err != nil {
return err
}
return nil
}
// setupNewOTPKey generates a new otp key and save the secret as a qrcode image
func setupNewOTPKey(qrcodeImagePath string) (*otp.Key, error) {
key, err := service.NewDefaultTotpKey()
if err != nil {
return nil, errors.Wrapf(err, "failed to setup totp (time-based one time password) key")
}
printOtpKey(key)
if err := writeOTPKeyAsQRCodePNG(key, qrcodeImagePath) ; err != nil {
return nil, err
}
printTelegramOtpAuthGuide(qrcodeImagePath)
return key, nil
}
func printOtpKey(key *otp.Key) {
fmt.Println("")
fmt.Println("====================PLEASE STORE YOUR OTP KEY=======================")
fmt.Println("")
fmt.Printf("Issuer: %s\n", key.Issuer())
fmt.Printf("AccountName: %s\n", key.AccountName())
fmt.Printf("Secret: %s\n", key.Secret())
fmt.Printf("Key URL: %s\n", key.URL())
fmt.Println("")
fmt.Println("====================================================================")
fmt.Println("")
}
func printTelegramOtpAuthGuide(qrcodeImagePath string) {
log.Infof("To scan your OTP QR code, please run the following command:")
log.Infof("")
log.Infof("")
log.Infof(" open %s", qrcodeImagePath)
log.Infof("")
log.Infof("")
log.Infof("send the auth command with the generated one-time password to the bbgo bot you created to enable the notification")
log.Infof("")
log.Infof("")
log.Infof(" /auth {code}")
log.Infof("")
log.Infof("")
}
func printTelegramAuthTokenGuide(token string) {
fmt.Println("send the following command to the bbgo bot you created to enable the notification")
fmt.Println("")
fmt.Println("")
fmt.Printf(" /auth %s\n", token)
fmt.Println("")
fmt.Println("")
}

View File

@ -54,7 +54,7 @@ var SyncCmd = &cobra.Command{
return err return err
} }
if err := environ.AddExchangesFromConfig(userConfig); err != nil { if err := environ.ConfigureExchangeSessions(userConfig); err != nil {
return err return err
} }