diff --git a/pkg/bbgo/notification.go b/pkg/bbgo/notification.go index db63d7448..e1807a3d1 100644 --- a/pkg/bbgo/notification.go +++ b/pkg/bbgo/notification.go @@ -1,6 +1,8 @@ package bbgo import ( + "bytes" + "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/util" @@ -12,6 +14,10 @@ var Notification = &Notifiability{ ObjectChannelRouter: NewObjectChannelRouter(), } +func RegisterCommand(application, command string, handler func(string)) { + Notification.RegisterCommand(application, command, handler) +} + func Notify(obj interface{}, args ...interface{}) { Notification.Notify(obj, args...) } @@ -20,9 +26,21 @@ func NotifyTo(channel string, obj interface{}, args ...interface{}) { Notification.NotifyTo(channel, obj, args...) } +func SendPhoto(buffer *bytes.Buffer) { + Notification.SendPhoto(buffer) +} + +func SendPhotoTo(channel string, buffer *bytes.Buffer) { + Notification.SendPhotoTo(channel, buffer) +} + type Notifier interface { NotifyTo(channel string, obj interface{}, args ...interface{}) Notify(obj interface{}, args ...interface{}) + SendPhotoTo(channel string, buffer *bytes.Buffer) + SendPhoto(buffer *bytes.Buffer) + RegisterCommand(command string, handler func(string)) + ID() string } type NullNotifier struct{} @@ -31,6 +49,16 @@ func (n *NullNotifier) NotifyTo(channel string, obj interface{}, args ...interfa func (n *NullNotifier) Notify(obj interface{}, args ...interface{}) {} +func (n *NullNotifier) SendPhoto(buffer *bytes.Buffer) {} + +func (n *NullNotifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {} + +func (n *NullNotifier) RegisterCommand(command string, handler func(string)) {} + +func (n *NullNotifier) ID() string { + return "null" +} + type Notifiability struct { notifiers []Notifier SessionChannelRouter *PatternChannelRouter `json:"-"` @@ -83,3 +111,23 @@ func (m *Notifiability) NotifyTo(channel string, obj interface{}, args ...interf n.NotifyTo(channel, obj, args...) } } + +func (m *Notifiability) SendPhoto(buffer *bytes.Buffer) { + for _, n := range m.notifiers { + n.SendPhoto(buffer) + } +} + +func (m *Notifiability) SendPhotoTo(channel string, buffer *bytes.Buffer) { + for _, n := range m.notifiers { + n.SendPhotoTo(channel, buffer) + } +} + +func (m *Notifiability) RegisterCommand(application, command string, handler func(string)) { + for _, n := range m.notifiers { + if application == n.ID() { + n.RegisterCommand(command, handler) + } + } +} diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 69e229257..529d214a7 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -1,6 +1,7 @@ package slacknotifier import ( + "bytes" "context" "fmt" "time" @@ -150,6 +151,22 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } } +func (n *Notifier) SendPhoto(buffer *bytes.Buffer) { + n.SendPhotoTo(n.channel, buffer) +} + +func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { + // TODO +} + +func (n *Notifier) ID() string { + return "slack" +} + +func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) { + // TODO +} + /* func (n *Notifier) NotifyTrade(trade *types.Trade) { _, _, err := n.client.PostMessageContext(context.Background(), n.TradeChannel, diff --git a/pkg/notifier/telegramnotifier/telegram.go b/pkg/notifier/telegramnotifier/telegram.go index 36927319e..a456bb30c 100644 --- a/pkg/notifier/telegramnotifier/telegram.go +++ b/pkg/notifier/telegramnotifier/telegram.go @@ -1,6 +1,7 @@ package telegramnotifier import ( + "bytes" "fmt" "reflect" "strconv" @@ -49,6 +50,17 @@ func New(bot *telebot.Bot, options ...Option) *Notifier { return notifier } +func (n *Notifier) ID() string { + return "telegram" +} + +func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) { + handler := func(msg *telebot.Message) { + simplehandler(msg.Text) + } + n.bot.Handle(command, handler) +} + func (n *Notifier) Notify(obj interface{}, args ...interface{}) { n.NotifyTo("", obj, args...) } @@ -129,6 +141,48 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } } +func (n *Notifier) SendPhoto(buffer *bytes.Buffer) { + n.SendPhotoTo("", buffer) +} + +func photoFromBuffer(buffer *bytes.Buffer) telebot.InputMedia { + reader := bytes.NewReader(buffer.Bytes()) + return &telebot.Photo{ + File: telebot.FromReader(reader), + } +} + +func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { + if n.broadcast { + if n.Subscribers == nil { + return + } + + for chatID := range n.Subscribers { + chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10)) + if err != nil { + log.WithError(err).Error("can not get chat by ID") + continue + } + album := telebot.Album{ + photoFromBuffer(buffer), + } + if _, err := n.bot.SendAlbum(chat, album); err != nil { + log.WithError(err).Error("failed to send message") + } + } + } else if n.Chats != nil { + for _, chat := range n.Chats { + album := telebot.Album{ + photoFromBuffer(buffer), + } + if _, err := n.bot.SendAlbum(chat, album); err != nil { + log.WithError(err).Error("telegram send error") + } + } + } +} + func (n *Notifier) AddChat(c *telebot.Chat) { if n.Chats == nil { n.Chats = make(map[int64]*telebot.Chat) diff --git a/pkg/strategy/drift/strategy.go b/pkg/strategy/drift/strategy.go index 02634741e..611851c96 100644 --- a/pkg/strategy/drift/strategy.go +++ b/pkg/strategy/drift/strategy.go @@ -2,6 +2,7 @@ package drift import ( "bufio" + "bytes" "context" "encoding/json" "errors" @@ -283,7 +284,7 @@ func (s *DriftMA) TestUpdate(v float64) *DriftMA { return out } -func (s *Strategy) initIndicators() error { +func (s *Strategy) initIndicators(kline *types.KLine, priceLines *types.Queue) error { s.ma = &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} s.stdevHigh = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} s.stdevLow = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} @@ -327,6 +328,10 @@ func (s *Strategy) initIndicators() error { s.drift.Update(source) s.trendLine.Update(source) s.atr.PushK(kline) + priceLines.Update(source) + } + if kline != nil && klines != nil { + kline.Set(&(*klines)[len(*klines)-1]) } klines, ok = store.KLinesOfInterval(types.Interval1m) if !ok { @@ -491,12 +496,13 @@ func (s *Strategy) initTickerFunctions(ctx context.Context) { } -func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) { +func (s *Strategy) DrawIndicators(time types.Time, priceLine types.SeriesExtend, zeroPoints types.Series) *types.Canvas { canvas := types.NewCanvas(s.InstanceID(), s.Interval) Length := priceLine.Length() if Length > 300 { Length = 300 } + log.Infof("draw indicators with %d data", Length) mean := priceLine.Mean(Length) highestPrice := priceLine.Minus(mean).Abs().Highest(Length) highestDrift := s.drift.Abs().Highest(Length) @@ -510,6 +516,31 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty canvas.Plot("zero", types.NumberSeries(mean), time, Length) canvas.Plot("price", priceLine, time, Length) canvas.Plot("zeroPoint", zeroPoints, time, Length) + return canvas +} + +func (s *Strategy) DrawPNL(profit types.Series) *types.Canvas { + canvas := types.NewCanvas(s.InstanceID()) + if s.GraphPNLDeductFee { + canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length()) + } else { + canvas.PlotRaw("pnl %", profit, profit.Length()) + } + return canvas +} + +func (s *Strategy) DrawCumPNL(cumProfit types.Series) *types.Canvas { + canvas := types.NewCanvas(s.InstanceID()) + if s.GraphPNLDeductFee { + canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length()) + } else { + canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length()) + } + return canvas +} + +func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) { + canvas := s.DrawIndicators(time, priceLine, zeroPoints) f, err := os.Create(s.CanvasPath) if err != nil { log.WithError(err).Errorf("cannot create on %s", s.CanvasPath) @@ -520,12 +551,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty log.WithError(err).Errorf("cannot render in drift") } - canvas = types.NewCanvas(s.InstanceID()) - if s.GraphPNLDeductFee { - canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length()) - } else { - canvas.PlotRaw("pnl %", profit, profit.Length()) - } + canvas = s.DrawPNL(profit) f, err = os.Create(s.GraphPNLPath) if err != nil { log.WithError(err).Errorf("open pnl") @@ -536,12 +562,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty log.WithError(err).Errorf("render pnl") } - canvas = types.NewCanvas(s.InstanceID()) - if s.GraphPNLDeductFee { - canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length()) - } else { - canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length()) - } + canvas = s.DrawCumPNL(cumProfit) f, err = os.Create(s.GraphCumPNLPath) if err != nil { log.WithError(err).Errorf("open cumpnl") @@ -662,6 +683,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se if s.Position == nil { s.Position = types.NewPositionFromMarket(s.Market) s.p = types.NewPositionFromMarket(s.Market) + } else { + s.p = types.NewPositionFromMarket(s.Market) + s.p.Base = s.Position.Base + s.p.Quote = s.Position.Quote + s.p.AverageCost = s.Position.AverageCost } if s.ProfitStats == nil { s.ProfitStats = types.NewProfitStats(s.Market) @@ -818,14 +844,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.lowestPrice = s.sellPrice }) - if err := s.initIndicators(); err != nil { + dynamicKLine := &types.KLine{} + priceLine := types.NewQueue(300) + if err := s.initIndicators(dynamicKLine, priceLine); err != nil { log.WithError(err).Errorf("initIndicator failed") return nil } s.initTickerFunctions(ctx) - dynamicKLine := &types.KLine{} - priceLine := types.NewQueue(300) zeroPoints := types.NewQueue(300) stoploss := s.StopLoss.Float64() // default value: use 1m kline @@ -833,6 +859,36 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se s.TrailingStopLossType = "kline" } + bbgo.RegisterCommand("telegram", "/draw", func(msg string) { + canvas := s.DrawIndicators(dynamicKLine.StartTime, priceLine, zeroPoints) + var buffer bytes.Buffer + if err := canvas.Render(chart.PNG, &buffer); err != nil { + log.WithError(err).Errorf("cannot render indicators in drift") + return + } + bbgo.SendPhoto(&buffer) + }) + + bbgo.RegisterCommand("telegram", "/pnl", func(msg string) { + canvas := s.DrawPNL(&profit) + var buffer bytes.Buffer + if err := canvas.Render(chart.PNG, &buffer); err != nil { + log.WithError(err).Errorf("cannot render pnl in drift") + return + } + bbgo.SendPhoto(&buffer) + }) + + bbgo.RegisterCommand("telegram", "/cumpnl", func(msg string) { + canvas := s.DrawCumPNL(&cumProfit) + var buffer bytes.Buffer + if err := canvas.Render(chart.PNG, &buffer); err != nil { + log.WithError(err).Errorf("cannot render cumpnl in drift") + return + } + bbgo.SendPhoto(&buffer) + }) + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { if s.Status != types.StrategyStatusRunning { return diff --git a/pkg/types/indicator.go b/pkg/types/indicator.go index 5c67fbc21..6c92cc738 100644 --- a/pkg/types/indicator.go +++ b/pkg/types/indicator.go @@ -1192,16 +1192,31 @@ func NewCanvas(title string, intervals ...Interval) *Canvas { return out } +func expand(a []float64, length int, defaultVal float64) []float64 { + l := len(a) + if l >= length { + return a + } + for i := 0; i < length-l; i++ { + a = append(a, defaultVal) + } + return a +} + func (canvas *Canvas) Plot(tag string, a Series, endTime Time, length int) { var timeline []time.Time e := endTime.Time() + if a.Length() == 0 { + return + } + oldest := a.Index(a.Length() - 1) for i := length - 1; i >= 0; i-- { shiftedT := e.Add(-time.Duration(i*canvas.Interval.Minutes()) * time.Minute) timeline = append(timeline, shiftedT) } canvas.Series = append(canvas.Series, chart.TimeSeries{ Name: tag, - YValues: Reverse(a, length), + YValues: expand(Reverse(a, length), length, oldest), XValues: timeline, }) } @@ -1211,10 +1226,14 @@ func (canvas *Canvas) PlotRaw(tag string, a Series, length int) { for i := 0; i < length; i++ { x = append(x, float64(i)) } + if a.Length() == 0 { + return + } + oldest := a.Index(a.Length() - 1) canvas.Series = append(canvas.Series, chart.ContinuousSeries{ Name: tag, XValues: x, - YValues: Reverse(a, length), + YValues: expand(Reverse(a, length), length, oldest), }) }