feature: send photo through telegram, register handler dynamically in strategy, fix canvas rendering

This commit is contained in:
zenix 2022-08-03 15:10:56 +09:00
parent 008814992f
commit 90e596f463
5 changed files with 213 additions and 19 deletions

View File

@ -1,6 +1,8 @@
package bbgo
import (
"bytes"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/util"
@ -12,6 +14,10 @@ var Notification = &Notifiability{
ObjectChannelRouter: NewObjectChannelRouter(),
}
func RegisterCommand(application, command string, handler func(string)) {
Notification.RegisterCommand(application, command, handler)
}
func Notify(obj interface{}, args ...interface{}) {
Notification.Notify(obj, args...)
}
@ -20,9 +26,21 @@ func NotifyTo(channel string, obj interface{}, args ...interface{}) {
Notification.NotifyTo(channel, obj, args...)
}
func SendPhoto(buffer *bytes.Buffer) {
Notification.SendPhoto(buffer)
}
func SendPhotoTo(channel string, buffer *bytes.Buffer) {
Notification.SendPhotoTo(channel, buffer)
}
type Notifier interface {
NotifyTo(channel string, obj interface{}, args ...interface{})
Notify(obj interface{}, args ...interface{})
SendPhotoTo(channel string, buffer *bytes.Buffer)
SendPhoto(buffer *bytes.Buffer)
RegisterCommand(command string, handler func(string))
ID() string
}
type NullNotifier struct{}
@ -31,6 +49,16 @@ func (n *NullNotifier) NotifyTo(channel string, obj interface{}, args ...interfa
func (n *NullNotifier) Notify(obj interface{}, args ...interface{}) {}
func (n *NullNotifier) SendPhoto(buffer *bytes.Buffer) {}
func (n *NullNotifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {}
func (n *NullNotifier) RegisterCommand(command string, handler func(string)) {}
func (n *NullNotifier) ID() string {
return "null"
}
type Notifiability struct {
notifiers []Notifier
SessionChannelRouter *PatternChannelRouter `json:"-"`
@ -83,3 +111,23 @@ func (m *Notifiability) NotifyTo(channel string, obj interface{}, args ...interf
n.NotifyTo(channel, obj, args...)
}
}
func (m *Notifiability) SendPhoto(buffer *bytes.Buffer) {
for _, n := range m.notifiers {
n.SendPhoto(buffer)
}
}
func (m *Notifiability) SendPhotoTo(channel string, buffer *bytes.Buffer) {
for _, n := range m.notifiers {
n.SendPhotoTo(channel, buffer)
}
}
func (m *Notifiability) RegisterCommand(application, command string, handler func(string)) {
for _, n := range m.notifiers {
if application == n.ID() {
n.RegisterCommand(command, handler)
}
}
}

View File

@ -1,6 +1,7 @@
package slacknotifier
import (
"bytes"
"context"
"fmt"
"time"
@ -150,6 +151,22 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
}
}
func (n *Notifier) SendPhoto(buffer *bytes.Buffer) {
n.SendPhotoTo(n.channel, buffer)
}
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
// TODO
}
func (n *Notifier) ID() string {
return "slack"
}
func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) {
// TODO
}
/*
func (n *Notifier) NotifyTrade(trade *types.Trade) {
_, _, err := n.client.PostMessageContext(context.Background(), n.TradeChannel,

View File

@ -1,6 +1,7 @@
package telegramnotifier
import (
"bytes"
"fmt"
"reflect"
"strconv"
@ -49,6 +50,17 @@ func New(bot *telebot.Bot, options ...Option) *Notifier {
return notifier
}
func (n *Notifier) ID() string {
return "telegram"
}
func (n *Notifier) RegisterCommand(command string, simplehandler func(string)) {
handler := func(msg *telebot.Message) {
simplehandler(msg.Text)
}
n.bot.Handle(command, handler)
}
func (n *Notifier) Notify(obj interface{}, args ...interface{}) {
n.NotifyTo("", obj, args...)
}
@ -129,6 +141,48 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
}
}
func (n *Notifier) SendPhoto(buffer *bytes.Buffer) {
n.SendPhotoTo("", buffer)
}
func photoFromBuffer(buffer *bytes.Buffer) telebot.InputMedia {
reader := bytes.NewReader(buffer.Bytes())
return &telebot.Photo{
File: telebot.FromReader(reader),
}
}
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
if n.broadcast {
if n.Subscribers == nil {
return
}
for chatID := range n.Subscribers {
chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10))
if err != nil {
log.WithError(err).Error("can not get chat by ID")
continue
}
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("failed to send message")
}
}
} else if n.Chats != nil {
for _, chat := range n.Chats {
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("telegram send error")
}
}
}
}
func (n *Notifier) AddChat(c *telebot.Chat) {
if n.Chats == nil {
n.Chats = make(map[int64]*telebot.Chat)

View File

@ -2,6 +2,7 @@ package drift
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
@ -283,7 +284,7 @@ func (s *DriftMA) TestUpdate(v float64) *DriftMA {
return out
}
func (s *Strategy) initIndicators() error {
func (s *Strategy) initIndicators(kline *types.KLine, priceLines *types.Queue) error {
s.ma = &indicator.SMA{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevHigh = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
s.stdevLow = &indicator.StdDev{IntervalWindow: types.IntervalWindow{Interval: s.Interval, Window: s.HLRangeWindow}}
@ -327,6 +328,10 @@ func (s *Strategy) initIndicators() error {
s.drift.Update(source)
s.trendLine.Update(source)
s.atr.PushK(kline)
priceLines.Update(source)
}
if kline != nil && klines != nil {
kline.Set(&(*klines)[len(*klines)-1])
}
klines, ok = store.KLinesOfInterval(types.Interval1m)
if !ok {
@ -491,12 +496,13 @@ func (s *Strategy) initTickerFunctions(ctx context.Context) {
}
func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) {
func (s *Strategy) DrawIndicators(time types.Time, priceLine types.SeriesExtend, zeroPoints types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID(), s.Interval)
Length := priceLine.Length()
if Length > 300 {
Length = 300
}
log.Infof("draw indicators with %d data", Length)
mean := priceLine.Mean(Length)
highestPrice := priceLine.Minus(mean).Abs().Highest(Length)
highestDrift := s.drift.Abs().Highest(Length)
@ -510,6 +516,31 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
canvas.Plot("zero", types.NumberSeries(mean), time, Length)
canvas.Plot("price", priceLine, time, Length)
canvas.Plot("zeroPoint", zeroPoints, time, Length)
return canvas
}
func (s *Strategy) DrawPNL(profit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length())
} else {
canvas.PlotRaw("pnl %", profit, profit.Length())
}
return canvas
}
func (s *Strategy) DrawCumPNL(cumProfit types.Series) *types.Canvas {
canvas := types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length())
} else {
canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length())
}
return canvas
}
func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit types.Series, cumProfit types.Series, zeroPoints types.Series) {
canvas := s.DrawIndicators(time, priceLine, zeroPoints)
f, err := os.Create(s.CanvasPath)
if err != nil {
log.WithError(err).Errorf("cannot create on %s", s.CanvasPath)
@ -520,12 +551,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
log.WithError(err).Errorf("cannot render in drift")
}
canvas = types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("pnl % (with Fee Deducted)", profit, profit.Length())
} else {
canvas.PlotRaw("pnl %", profit, profit.Length())
}
canvas = s.DrawPNL(profit)
f, err = os.Create(s.GraphPNLPath)
if err != nil {
log.WithError(err).Errorf("open pnl")
@ -536,12 +562,7 @@ func (s *Strategy) Draw(time types.Time, priceLine types.SeriesExtend, profit ty
log.WithError(err).Errorf("render pnl")
}
canvas = types.NewCanvas(s.InstanceID())
if s.GraphPNLDeductFee {
canvas.PlotRaw("cummulative pnl % (with Fee Deducted)", cumProfit, cumProfit.Length())
} else {
canvas.PlotRaw("cummulative pnl %", cumProfit, cumProfit.Length())
}
canvas = s.DrawCumPNL(cumProfit)
f, err = os.Create(s.GraphCumPNLPath)
if err != nil {
log.WithError(err).Errorf("open cumpnl")
@ -662,6 +683,11 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
if s.Position == nil {
s.Position = types.NewPositionFromMarket(s.Market)
s.p = types.NewPositionFromMarket(s.Market)
} else {
s.p = types.NewPositionFromMarket(s.Market)
s.p.Base = s.Position.Base
s.p.Quote = s.Position.Quote
s.p.AverageCost = s.Position.AverageCost
}
if s.ProfitStats == nil {
s.ProfitStats = types.NewProfitStats(s.Market)
@ -818,14 +844,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.lowestPrice = s.sellPrice
})
if err := s.initIndicators(); err != nil {
dynamicKLine := &types.KLine{}
priceLine := types.NewQueue(300)
if err := s.initIndicators(dynamicKLine, priceLine); err != nil {
log.WithError(err).Errorf("initIndicator failed")
return nil
}
s.initTickerFunctions(ctx)
dynamicKLine := &types.KLine{}
priceLine := types.NewQueue(300)
zeroPoints := types.NewQueue(300)
stoploss := s.StopLoss.Float64()
// default value: use 1m kline
@ -833,6 +859,36 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
s.TrailingStopLossType = "kline"
}
bbgo.RegisterCommand("telegram", "/draw", func(msg string) {
canvas := s.DrawIndicators(dynamicKLine.StartTime, priceLine, zeroPoints)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render indicators in drift")
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("telegram", "/pnl", func(msg string) {
canvas := s.DrawPNL(&profit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render pnl in drift")
return
}
bbgo.SendPhoto(&buffer)
})
bbgo.RegisterCommand("telegram", "/cumpnl", func(msg string) {
canvas := s.DrawCumPNL(&cumProfit)
var buffer bytes.Buffer
if err := canvas.Render(chart.PNG, &buffer); err != nil {
log.WithError(err).Errorf("cannot render cumpnl in drift")
return
}
bbgo.SendPhoto(&buffer)
})
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if s.Status != types.StrategyStatusRunning {
return

View File

@ -1192,16 +1192,31 @@ func NewCanvas(title string, intervals ...Interval) *Canvas {
return out
}
func expand(a []float64, length int, defaultVal float64) []float64 {
l := len(a)
if l >= length {
return a
}
for i := 0; i < length-l; i++ {
a = append(a, defaultVal)
}
return a
}
func (canvas *Canvas) Plot(tag string, a Series, endTime Time, length int) {
var timeline []time.Time
e := endTime.Time()
if a.Length() == 0 {
return
}
oldest := a.Index(a.Length() - 1)
for i := length - 1; i >= 0; i-- {
shiftedT := e.Add(-time.Duration(i*canvas.Interval.Minutes()) * time.Minute)
timeline = append(timeline, shiftedT)
}
canvas.Series = append(canvas.Series, chart.TimeSeries{
Name: tag,
YValues: Reverse(a, length),
YValues: expand(Reverse(a, length), length, oldest),
XValues: timeline,
})
}
@ -1211,10 +1226,14 @@ func (canvas *Canvas) PlotRaw(tag string, a Series, length int) {
for i := 0; i < length; i++ {
x = append(x, float64(i))
}
if a.Length() == 0 {
return
}
oldest := a.Index(a.Length() - 1)
canvas.Series = append(canvas.Series, chart.ContinuousSeries{
Name: tag,
XValues: x,
YValues: Reverse(a, length),
YValues: expand(Reverse(a, length), length, oldest),
})
}