diff --git a/pkg/notifier/telegramnotifier/telegram.go b/pkg/notifier/telegramnotifier/telegram.go index b13c3c1a0..e2018b4ca 100644 --- a/pkg/notifier/telegramnotifier/telegram.go +++ b/pkg/notifier/telegramnotifier/telegram.go @@ -2,19 +2,29 @@ package telegramnotifier import ( "bytes" + "context" "fmt" "reflect" "strconv" "time" "github.com/sirupsen/logrus" + "golang.org/x/time/rate" "gopkg.in/tucnak/telebot.v2" "github.com/c9s/bbgo/pkg/types" ) +var apiLimiter = rate.NewLimiter(rate.Every(1*time.Second), 1) + var log = logrus.WithField("service", "telegram") +type notifyTask struct { + message string + texts []string + photoBuffer *bytes.Buffer +} + type Notifier struct { bot *telebot.Bot @@ -25,6 +35,8 @@ type Notifier struct { Chats map[int64]*telebot.Chat `json:"chats"` broadcast bool + + taskC chan notifyTask } type Option func(notifier *Notifier) @@ -41,15 +53,84 @@ func New(bot *telebot.Bot, options ...Option) *Notifier { bot: bot, Chats: make(map[int64]*telebot.Chat), Subscribers: make(map[int64]time.Time), + taskC: make(chan notifyTask, 100), } for _, o := range options { o(notifier) } + go notifier.worker() + return notifier } +func (n *Notifier) worker() { + ctx := context.Background() + for { + select { + case <-ctx.Done(): + return + case task := <-n.taskC: + apiLimiter.Wait(ctx) + n.consume(task) + } + } +} + +func (n *Notifier) consume(task notifyTask) { + if n.broadcast { + if n.Subscribers == nil { + return + } + if task.message != "" { + n.Broadcast(task.message) + } + for _, text := range task.texts { + n.Broadcast(text) + } + if task.photoBuffer == nil { + return + } + + for chatID := range n.Subscribers { + chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10)) + if err != nil { + log.WithError(err).Error("can not get chat by ID") + continue + } + album := telebot.Album{ + photoFromBuffer(task.photoBuffer), + } + if _, err := n.bot.SendAlbum(chat, album); err != nil { + log.WithError(err).Error("failed to send message") + } + } + } else if n.Chats != nil { + for _, chat := range n.Chats { + if task.message != "" { + if _, err := n.bot.Send(chat, task.message); err != nil { + log.WithError(err).Error("telegram send error") + } + } + + for _, text := range task.texts { + if _, err := n.bot.Send(chat, text); err != nil { + log.WithError(err).Error("telegram send error") + } + } + if task.photoBuffer != nil { + album := telebot.Album{ + photoFromBuffer(task.photoBuffer), + } + if _, err := n.bot.SendAlbum(chat, album); err != nil { + log.WithError(err).Error("telegram send error") + } + } + } + } +} + func (n *Notifier) Notify(obj interface{}, args ...interface{}) { n.NotifyTo("", obj, args...) } @@ -110,23 +191,13 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } - if n.broadcast { - n.Broadcast(message) - for _, text := range texts { - n.Broadcast(text) - } - } else if n.Chats != nil { - for _, chat := range n.Chats { - if _, err := n.bot.Send(chat, message); err != nil { - log.WithError(err).Error("telegram send error") - } - - for _, text := range texts { - if _, err := n.bot.Send(chat, text); err != nil { - log.WithError(err).Error("telegram send error") - } - } - } + select { + case n.taskC <- notifyTask{ + texts: texts, + message: message, + }: + default: + log.Error("[telegram] cannot send task to notify") } } @@ -142,33 +213,12 @@ func photoFromBuffer(buffer *bytes.Buffer) telebot.InputMedia { } func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { - if n.broadcast { - if n.Subscribers == nil { - return - } - - for chatID := range n.Subscribers { - chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10)) - if err != nil { - log.WithError(err).Error("can not get chat by ID") - continue - } - album := telebot.Album{ - photoFromBuffer(buffer), - } - if _, err := n.bot.SendAlbum(chat, album); err != nil { - log.WithError(err).Error("failed to send message") - } - } - } else if n.Chats != nil { - for _, chat := range n.Chats { - album := telebot.Album{ - photoFromBuffer(buffer), - } - if _, err := n.bot.SendAlbum(chat, album); err != nil { - log.WithError(err).Error("telegram send error") - } - } + select { + case n.taskC <- notifyTask{ + photoBuffer: buffer, + }: + case <-time.After(50 * time.Millisecond): + return } }