feature: send photo through telegram, register handler dynamically in strategy, fix canvas rendering

This commit is contained in:
zenix 2022-08-03 15:10:56 +09:00
parent 214e7259ed
commit 4117a83cd1
5 changed files with 213 additions and 19 deletions

View File

@ -1,6 +1,8 @@
package bbgo package bbgo
import ( import (
"bytes"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
@ -12,6 +14,10 @@ var Notification = &Notifiability{
ObjectChannelRouter: NewObjectChannelRouter(), ObjectChannelRouter: NewObjectChannelRouter(),
} }
func RegisterCommand(application, command string, handler func(string)) {
Notification.RegisterCommand(application, command, handler)
}
func Notify(obj interface{}, args ...interface{}) { func Notify(obj interface{}, args ...interface{}) {
Notification.Notify(obj, args...) Notification.Notify(obj, args...)
} }
@ -20,9 +26,21 @@ func NotifyTo(channel string, obj interface{}, args ...interface{}) {
Notification.NotifyTo(channel, obj, args...) Notification.NotifyTo(channel, obj, args...)
} }
func SendPhoto(buffer *bytes.Buffer) {
Notification.SendPhoto(buffer)
}
func SendPhotoTo(channel string, buffer *bytes.Buffer) {
Notification.SendPhotoTo(channel, buffer)
}
type Notifier interface { type Notifier interface {
NotifyTo(channel string, obj interface{}, args ...interface{}) NotifyTo(channel string, obj interface{}, args ...interface{})
Notify(obj interface{}, args ...interface{}) Notify(obj interface{}, args ...interface{})
SendPhotoTo(channel string, buffer *bytes.Buffer)
SendPhoto(buffer *bytes.Buffer)
RegisterCommand(command string, handler func(string))
ID() string
} }
type NullNotifier struct{} type NullNotifier struct{}
@ -31,6 +49,16 @@ func (n *NullNotifier) NotifyTo(channel string, obj interface{}, args ...interfa
func (n *NullNotifier) Notify(obj interface{}, args ...interface{}) {} func (n *NullNotifier) Notify(obj interface{}, args ...interface{}) {}
func (n *NullNotifier) SendPhoto(buffer *bytes.Buffer) {}
func (n *NullNotifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {}
func (n *NullNotifier) RegisterCommand(command string, handler func(string)) {}
func (n *NullNotifier) ID() string {
return "null"
}
type Notifiability struct { type Notifiability struct {
notifiers []Notifier notifiers []Notifier
SessionChannelRouter *PatternChannelRouter `json:"-"` SessionChannelRouter *PatternChannelRouter `json:"-"`
@ -83,3 +111,23 @@ func (m *Notifiability) NotifyTo(channel string, obj interface{}, args ...interf
n.NotifyTo(channel, obj, args...) n.NotifyTo(channel, obj, args...)
} }
} }
func (m *Notifiability) SendPhoto(buffer *bytes.Buffer) {
for _, n := range m.notifiers {
n.SendPhoto(buffer)
}
}
func (m *Notifiability) SendPhotoTo(channel string, buffer *bytes.Buffer) {
for _, n := range m.notifiers {
n.SendPhotoTo(channel, buffer)
}
}
func (m *Notifiability) RegisterCommand(application, command string, handler func(string)) {
for _, n := range m.notifiers {
if application == n.ID() {
n.RegisterCommand(command, handler)
}
}
}

View File

@ -1,6 +1,7 @@
package slacknotifier package slacknotifier
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"time" "time"
@ -150,6 +151,22 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
} }
} }
func (n *Notifier) SendPhoto(buffer *bytes.Buffer) {
n.SendPhotoTo(n.channel, buffer)
}
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
// TODO
}
func (n *Notifier) ID() string {
return "slack"
}
func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) {
// TODO
}
/* /*
func (n *Notifier) NotifyTrade(trade *types.Trade) { func (n *Notifier) NotifyTrade(trade *types.Trade) {
_, _, err := n.client.PostMessageContext(context.Background(), n.TradeChannel, _, _, err := n.client.PostMessageContext(context.Background(), n.TradeChannel,

View File

@ -1,6 +1,7 @@
package telegramnotifier package telegramnotifier
import ( import (
"bytes"
"fmt" "fmt"
"reflect" "reflect"
"strconv" "strconv"
@ -49,6 +50,17 @@ func New(bot *telebot.Bot, options ...Option) *Notifier {
return notifier return notifier
} }
func (n *Notifier) ID() string {
return "telegram"
}
func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) {
handler := func(msg *telebot.Message) {
simplehandler(msg.Text)
}
n.bot.Handle(command, handler)
}
func (n *Notifier) Notify(obj interface{}, args ...interface{}) { func (n *Notifier) Notify(obj interface{}, args ...interface{}) {
n.NotifyTo("", obj, args...) n.NotifyTo("", obj, args...)
} }
@ -129,6 +141,48 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
} }
} }
func (n *Notifier) SendPhoto(buffer *bytes.Buffer) {
n.SendPhotoTo("", buffer)
}
func photoFromBuffer(buffer *bytes.Buffer) telebot.InputMedia {
reader := bytes.NewReader(buffer.Bytes())
return &telebot.Photo{
File: telebot.FromReader(reader),
}
}
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
if n.broadcast {
if n.Subscribers == nil {
return
}
for chatID := range n.Subscribers {
chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10))
if err != nil {
log.WithError(err).Error("can not get chat by ID")
continue
}
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("failed to send message")
}
}
} else if n.Chats != nil {
for _, chat := range n.Chats {
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("telegram send error")
}
}
}
}
func (n *Notifier) AddChat(c *telebot.Chat) { func (n *Notifier) AddChat(c *telebot.Chat) {
if n.Chats == nil { if n.Chats == nil {
n.Chats = make(map[int64]*telebot.Chat) n.Chats = make(map[int64]*telebot.Chat)

View File

@ -2,6 +2,7 @@ package drift
import ( import (
"bufio" "bufio"
"bytes"
"context" "context"
"encoding/json" "encoding/json"
"errors" "errors"
@ -283,7 +284,7 @@ func (s *DriftMA) TestUpdate(v float64) *DriftMA {
return out return out
} }
func (s *Strategy) initIndicators() error { func (s *Strategy) initIndicators(kline *types.KLine, priceLines *types.Queue) error {
s.ma = &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} s.ma = &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevHigh = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} s.stdevHigh = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevLow = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}} s.stdevLow = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
@ -327,6 +328,10 @@ func (s *Strategy) initIndicators() error {
s.drift.Update(source) s.drift.Update(source)
s.trendLine.Update(source) s.trendLine.Update(source)
s.atr.PushK(kline) s.atr.PushK(kline)
priceLines.Update(source)
}
if kline != nil && klines != nil {
kline.Set(&(*klines)[len(*klines)-1])
} }
klines, ok = store.KLinesOfInterval(types.Interval1m) klines, ok = store.KLinesOfInterval(types.Interval1m)
if !ok { if !ok {
@ -491,12 +496,13 @@ func (s *Strategy) initTickerFunctions(ctx context.Context) {
} }
func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) { func (s *Strategy) DrawIndicators(time types.Time, priceLine types.SeriesExtend, zeroPoints types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID(), s.Interval) canvas := types.NewCanvas(s.InstanceID(), s.Interval)
Length := priceLine.Length() Length := priceLine.Length()
if Length > 300 { if Length > 300 {
Length = 300 Length = 300
} }
log.Infof("draw indicators with %d data", Length)
mean := priceLine.Mean(Length) mean := priceLine.Mean(Length)
highestPrice := priceLine.Minus(mean).Abs().Highest(Length) highestPrice := priceLine.Minus(mean).Abs().Highest(Length)
highestDrift := s.drift.Abs().Highest(Length) highestDrift := s.drift.Abs().Highest(Length)
@ -510,6 +516,31 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
canvas.Plot("zero", types.NumberSeries(mean), time, Length) canvas.Plot("zero", types.NumberSeries(mean), time, Length)
canvas.Plot("price", priceLine, time, Length) canvas.Plot("price", priceLine, time, Length)
canvas.Plot("zeroPoint", zeroPoints, time, Length) canvas.Plot("zeroPoint", zeroPoints, time, Length)
return canvas
}
func (s *Strategy) DrawPNL(profit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length())
} else {
canvas.PlotRaw("pnl %", profit, profit.Length())
}
return canvas
}
func (s *Strategy) DrawCumPNL(cumProfit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length())
} else {
canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length())
}
return canvas
}
func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) {
canvas := s.DrawIndicators(time, priceLine, zeroPoints)
f, err := os.Create(s.CanvasPath) f, err := os.Create(s.CanvasPath)
if err != nil { if err != nil {
log.WithError(err).Errorf("cannot create on %s", s.CanvasPath) log.WithError(err).Errorf("cannot create on %s", s.CanvasPath)
@ -520,12 +551,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
log.WithError(err).Errorf("cannot render in drift") log.WithError(err).Errorf("cannot render in drift")
} }
canvas = types.NewCanvas(s.InstanceID()) canvas = s.DrawPNL(profit)
if s.GraphPNLDeductFee {
canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length())
} else {
canvas.PlotRaw("pnl %", profit, profit.Length())
}
f, err = os.Create(s.GraphPNLPath) f, err = os.Create(s.GraphPNLPath)
if err != nil { if err != nil {
log.WithError(err).Errorf("open pnl") log.WithError(err).Errorf("open pnl")
@ -536,12 +562,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
log.WithError(err).Errorf("render pnl") log.WithError(err).Errorf("render pnl")
} }
canvas = types.NewCanvas(s.InstanceID()) canvas = s.DrawCumPNL(cumProfit)
if s.GraphPNLDeductFee {
canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length())
} else {
canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length())
}
f, err = os.Create(s.GraphCumPNLPath) f, err = os.Create(s.GraphCumPNLPath)
if err != nil { if err != nil {
log.WithError(err).Errorf("open cumpnl") log.WithError(err).Errorf("open cumpnl")
@ -662,6 +683,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
if s.Position == nil { if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market) s.Position = types.NewPositionFromMarket(s.Market)
s.p = types.NewPositionFromMarket(s.Market) s.p = types.NewPositionFromMarket(s.Market)
} else {
s.p = types.NewPositionFromMarket(s.Market)
s.p.Base = s.Position.Base
s.p.Quote = s.Position.Quote
s.p.AverageCost = s.Position.AverageCost
} }
if s.ProfitStats == nil { if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market) s.ProfitStats = types.NewProfitStats(s.Market)
@ -818,14 +844,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.lowestPrice = s.sellPrice s.lowestPrice = s.sellPrice
}) })
if err := s.initIndicators(); err != nil { dynamicKLine := &types.KLine{}
priceLine := types.NewQueue(300)
if err := s.initIndicators(dynamicKLine, priceLine); err != nil {
log.WithError(err).Errorf("initIndicator failed") log.WithError(err).Errorf("initIndicator failed")
return nil return nil
} }
s.initTickerFunctions(ctx) s.initTickerFunctions(ctx)
dynamicKLine := &types.KLine{}
priceLine := types.NewQueue(300)
zeroPoints := types.NewQueue(300) zeroPoints := types.NewQueue(300)
stoploss := s.StopLoss.Float64() stoploss := s.StopLoss.Float64()
// default value: use 1m kline // default value: use 1m kline
@ -833,6 +859,36 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.TrailingStopLossType = "kline" s.TrailingStopLossType = "kline"
} }
bbgo.RegisterCommand("telegram", "/draw", func(msg string) {
canvas := s.DrawIndicators(dynamicKLine.StartTime, priceLine, zeroPoints)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render indicators in drift")
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("telegram", "/pnl", func(msg string) {
canvas := s.DrawPNL(&profit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render pnl in drift")
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("telegram", "/cumpnl", func(msg string) {
canvas := s.DrawCumPNL(&cumProfit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render cumpnl in drift")
return
}
bbgo.SendPhoto(&buffer)
})
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if s.Status != types.StrategyStatusRunning { if s.Status != types.StrategyStatusRunning {
return return

View File

@ -1192,16 +1192,31 @@ func NewCanvas(title string, intervals ...Interval) *Canvas {
return out return out
} }
func expand(a []float64, length int, defaultVal float64) []float64 {
l := len(a)
if l >= length {
return a
}
for i := 0; i < length-l; i++ {
a = append(a, defaultVal)
}
return a
}
func (canvas *Canvas) Plot(tag string, a Series, endTime Time, length int) { func (canvas *Canvas) Plot(tag string, a Series, endTime Time, length int) {
var timeline []time.Time var timeline []time.Time
e := endTime.Time() e := endTime.Time()
if a.Length() == 0 {
return
}
oldest := a.Index(a.Length() - 1)
for i := length - 1; i >= 0; i-- { for i := length - 1; i >= 0; i-- {
shiftedT := e.Add(-time.Duration(i*canvas.Interval.Minutes()) * time.Minute) shiftedT := e.Add(-time.Duration(i*canvas.Interval.Minutes()) * time.Minute)
timeline = append(timeline, shiftedT) timeline = append(timeline, shiftedT)
} }
canvas.Series = append(canvas.Series, chart.TimeSeries{ canvas.Series = append(canvas.Series, chart.TimeSeries{
Name: tag, Name: tag,
YValues: Reverse(a, length), YValues: expand(Reverse(a, length), length, oldest),
XValues: timeline, XValues: timeline,
}) })
} }
@ -1211,10 +1226,14 @@ func (canvas *Canvas) PlotRaw(tag string, a Series, length int) {
for i := 0; i < length; i++ { for i := 0; i < length; i++ {
x = append(x, float64(i)) x = append(x, float64(i))
} }
if a.Length() == 0 {
return
}
oldest := a.Index(a.Length() - 1)
canvas.Series = append(canvas.Series, chart.ContinuousSeries{ canvas.Series = append(canvas.Series, chart.ContinuousSeries{
Name: tag, Name: tag,
XValues: x, XValues: x,
YValues: Reverse(a, length), YValues: expand(Reverse(a, length), length, oldest),
}) })
} }