Add pricealert strategy for demonstrating notification

This commit is contained in:
c9s 2020-10-27 13:54:39 +08:00
parent ab43de3efd
commit b3eaf832af
7 changed files with 105 additions and 29 deletions

View File

@ -6,7 +6,7 @@ imports:
notifications: notifications:
slack: slack:
defaultChannel: "#dev-bbgo" defaultChannel: "#dev-bbgo"
errorChannel: "#error" errorChannel: "#bbgo-error"
# if you want to route channel by symbol # if you want to route channel by symbol
symbolChannels: symbolChannels:

29
config/pricealert.yaml Normal file
View File

@ -0,0 +1,29 @@
---
notifications:
slack:
defaultChannel: "#dev-bbgo"
errorChannel: "#bbgo-error"
# if you want to route channel by symbol
symbolChannels:
"^BTC": "#btc"
"^ETH": "#eth"
# object routing rules
routing:
trade: "$symbol"
order: "$symbol"
submitOrder: "$session" # not supported yet
pnL: "#bbgo-pnl"
sessions:
binance:
exchange: binance
envVarPrefix: binance
exchangeStrategies:
- on: binance
pricealert:
symbol: "BTCUSDT"
interval: "1m"
minChange: 0.01

View File

@ -97,12 +97,14 @@ func NewPatternChannelRouter(routes map[string]string) *PatternChannelRouter {
return router return router
} }
func (router *PatternChannelRouter) AddRoute(routes map[string]string) *PatternChannelRouter { func (router *PatternChannelRouter) AddRoute(routes map[string]string) {
if routes == nil {
return
}
for pattern, channel := range routes { for pattern, channel := range routes {
router.routes[regexp.MustCompile(pattern)] = channel router.routes[regexp.MustCompile(pattern)] = channel
} }
return router
} }
func (router *PatternChannelRouter) Route(text string) (channel string, ok bool) { func (router *PatternChannelRouter) Route(text string) (channel string, ok bool) {
@ -126,9 +128,8 @@ func NewObjectChannelRouter() *ObjectChannelRouter {
return &ObjectChannelRouter{} return &ObjectChannelRouter{}
} }
func (router *ObjectChannelRouter) AddRoute(f ObjectChannelHandler) *ObjectChannelRouter { func (router *ObjectChannelRouter) AddRoute(f ObjectChannelHandler) {
router.routes = append(router.routes, f) router.routes = append(router.routes, f)
return router
} }
func (router *ObjectChannelRouter) Route(obj interface{}) (channel string, ok bool) { func (router *ObjectChannelRouter) Route(obj interface{}) (channel string, ok bool) {

View File

@ -2,6 +2,7 @@ package bbgo
import ( import (
"context" "context"
"reflect"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -57,7 +58,6 @@ func (trader *Trader) ReportTrade() *TradeReporter {
return trader.tradeReporter return trader.tradeReporter
} }
// AttachStrategyOn attaches the single exchange strategy on an exchange session. // AttachStrategyOn attaches the single exchange strategy on an exchange session.
// Single exchange strategy is the default behavior. // Single exchange strategy is the default behavior.
func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader { func (trader *Trader) AttachStrategyOn(session string, strategies ...SingleExchangeStrategy) *Trader {
@ -113,7 +113,7 @@ func (trader *Trader) Run(ctx context.Context) error {
var orderExecutor OrderExecutor = baseOrderExecutor var orderExecutor OrderExecutor = baseOrderExecutor
// Since the risk controls are loaded from the config file // Since the risk controls are loaded from the config file
if riskControls := trader.riskControls ; riskControls != nil { if riskControls := trader.riskControls; riskControls != nil {
if trader.riskControls.SessionBasedRiskControl != nil { if trader.riskControls.SessionBasedRiskControl != nil {
control, ok := trader.riskControls.SessionBasedRiskControl[sessionName] control, ok := trader.riskControls.SessionBasedRiskControl[sessionName]
if ok { if ok {
@ -128,6 +128,19 @@ func (trader *Trader) Run(ctx context.Context) error {
} }
for _, strategy := range strategies { for _, strategy := range strategies {
rs := reflect.ValueOf(strategy)
if rs.Elem().Kind() == reflect.Struct {
rs = rs.Elem()
field := rs.FieldByName("Notifiability")
if field.IsValid() {
log.Infof("found Notifiability in strategy %T, configuring...", strategy)
if !field.CanSet() {
log.Panicf("strategy %T field Notifiability can not be set", strategy)
}
field.Set(reflect.ValueOf(trader.Notifiability))
}
}
err := strategy.Run(ctx, orderExecutor, session) err := strategy.Run(ctx, orderExecutor, session)
if err != nil { if err != nil {
return err return err
@ -262,8 +275,8 @@ func (trader *Trader) reportPnL() {
*/ */
// ReportPnL configure and set the PnLReporter with the given notifier // ReportPnL configure and set the PnLReporter with the given notifier
func (trader *Trader) ReportPnL(notifier Notifier) *PnLReporterManager { func (trader *Trader) ReportPnL() *PnLReporterManager {
return NewPnLReporter(notifier) return NewPnLReporter(&trader.Notifiability)
} }
type OrderExecutor interface { type OrderExecutor interface {

View File

@ -27,6 +27,7 @@ import (
// import built-in strategies // import built-in strategies
_ "github.com/c9s/bbgo/pkg/strategy/buyandhold" _ "github.com/c9s/bbgo/pkg/strategy/buyandhold"
_ "github.com/c9s/bbgo/pkg/strategy/pricealert"
_ "github.com/c9s/bbgo/pkg/strategy/xpuremaker" _ "github.com/c9s/bbgo/pkg/strategy/xpuremaker"
) )
@ -105,7 +106,7 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
trader := bbgo.NewTrader(environ) trader := bbgo.NewTrader(environ)
// configure notifiers // configure notifiers
notifierSet := &bbgo.Notifiability{ trader.Notifiability = bbgo.Notifiability{
SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil), SymbolChannelRouter: bbgo.NewPatternChannelRouter(nil),
SessionChannelRouter: bbgo.NewPatternChannelRouter(nil), SessionChannelRouter: bbgo.NewPatternChannelRouter(nil),
ObjectChannelRouter: bbgo.NewObjectChannelRouter(), ObjectChannelRouter: bbgo.NewObjectChannelRouter(),
@ -122,7 +123,7 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
log.Infof("adding slack notifier...") log.Infof("adding slack notifier...")
var notifier = slacknotifier.New(slackToken, conf.DefaultChannel) var notifier = slacknotifier.New(slackToken, conf.DefaultChannel)
notifierSet.AddNotifier(notifier) trader.AddNotifier(notifier)
} }
} }
@ -130,50 +131,48 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
if conf := userConfig.Notifications; conf != nil { if conf := userConfig.Notifications; conf != nil {
// configure routing here // configure routing here
if conf.SymbolChannels != nil { if conf.SymbolChannels != nil {
notifierSet.SymbolChannelRouter.AddRoute(conf.SymbolChannels) trader.SymbolChannelRouter.AddRoute(conf.SymbolChannels)
} }
if conf.SessionChannels != nil { if conf.SessionChannels != nil {
notifierSet.SessionChannelRouter.AddRoute(conf.SessionChannels) trader.SessionChannelRouter.AddRoute(conf.SessionChannels)
} }
if conf.Routing != nil { if conf.Routing != nil {
if conf.Routing.Trade == "$symbol" { if conf.Routing.Trade == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) { trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
trade, matched := obj.(*types.Trade) trade, matched := obj.(*types.Trade)
if !matched { if !matched {
return return
} }
channel, ok = notifierSet.SymbolChannelRouter.Route(trade.Symbol) channel, ok = trader.SymbolChannelRouter.Route(trade.Symbol)
return return
}) })
} }
if conf.Routing.Order == "$symbol" { if conf.Routing.Order == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) { trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
order, matched := obj.(*types.Order) order, matched := obj.(*types.Order)
if !matched { if !matched {
return return
} }
channel, ok = notifierSet.SymbolChannelRouter.Route(order.Symbol) channel, ok = trader.SymbolChannelRouter.Route(order.Symbol)
return return
}) })
} }
if conf.Routing.PnL == "$symbol" { if conf.Routing.PnL == "$symbol" {
notifierSet.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) { trader.ObjectChannelRouter.Route(func(obj interface{}) (channel string, ok bool) {
report, matched := obj.(*pnl.AverageCostPnlReport) report, matched := obj.(*pnl.AverageCostPnlReport)
if !matched { if !matched {
return return
} }
channel, ok = notifierSet.SymbolChannelRouter.Route(report.Symbol) channel, ok = trader.SymbolChannelRouter.Route(report.Symbol)
return return
}) })
} }
} }
} }
trader.AddNotifier(notifierSet)
trader.ReportTrade() trader.ReportTrade()
if userConfig.RiskControls != nil { if userConfig.RiskControls != nil {
@ -196,7 +195,7 @@ func runConfig(ctx context.Context, userConfig *bbgo.Config) error {
if len(report.AverageCostBySymbols) > 0 { if len(report.AverageCostBySymbols) > 0 {
log.Infof("setting up average cost pnl reporter on symbols: %v", report.AverageCostBySymbols) log.Infof("setting up average cost pnl reporter on symbols: %v", report.AverageCostBySymbols)
trader.ReportPnL(notifierSet). trader.ReportPnL().
AverageCostBySymbols(report.AverageCostBySymbols...). AverageCostBySymbols(report.AverageCostBySymbols...).
Of(report.Of...). Of(report.Of...).
When(report.When...) When(report.When...)

View File

@ -0,0 +1,40 @@
package pricealert
import (
"context"
"math"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/types"
)
func init() {
bbgo.RegisterExchangeStrategy("pricealert", &Strategy{})
}
type Strategy struct {
bbgo.Notifiability
Symbol string `json:"symbol"`
Interval string `json:"interval"`
MinChange float64 `json:"minChange"`
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
session.Stream.OnKLine(func(kline types.KLine) {
market, ok := session.Market(kline.Symbol)
if !ok {
return
}
if math.Abs(kline.GetChange()) > s.MinChange {
if channel, ok := s.RouteSymbol(s.Symbol); ok {
_ = s.NotifyTo(channel, "%s hit price %s, change %f", s.Symbol, market.FormatPrice(kline.Close), kline.GetChange())
} else {
_ = s.Notify("%s hit price %s, change %f", s.Symbol, market.FormatPrice(kline.Close), kline.GetChange())
}
}
})
return nil
}

View File

@ -17,12 +17,6 @@ type Strategy struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
} }
func New(symbol string) *Strategy {
return &Strategy{
Symbol: symbol,
}
}
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
session.Stream.OnKLineClosed(func(kline types.KLine) { session.Stream.OnKLineClosed(func(kline types.KLine) {