feature: telegram notify to become async

This commit is contained in:
zenix 2022-10-17 18:38:03 +09:00
parent 763bb45842
commit 8a66e5b218

View File

@ -2,19 +2,29 @@ package telegramnotifier
import (
"bytes"
"context"
"fmt"
"reflect"
"strconv"
"time"
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"gopkg.in/tucnak/telebot.v2"
"github.com/c9s/bbgo/pkg/types"
)
var apiLimiter = rate.NewLimiter(rate.Every(1*time.Second), 1)
var log = logrus.WithField("service", "telegram")
type notifyTask struct {
message string
texts []string
photoBuffer *bytes.Buffer
}
type Notifier struct {
bot *telebot.Bot
@ -25,6 +35,8 @@ type Notifier struct {
Chats map[int64]*telebot.Chat `json:"chats"`
broadcast bool
taskC chan notifyTask
}
type Option func(notifier *Notifier)
@ -41,15 +53,84 @@ func New(bot *telebot.Bot, options ...Option) *Notifier {
bot: bot,
Chats: make(map[int64]*telebot.Chat),
Subscribers: make(map[int64]time.Time),
taskC: make(chan notifyTask, 100),
}
for _, o := range options {
o(notifier)
}
go notifier.worker()
return notifier
}
func (n *Notifier) worker() {
ctx := context.Background()
for {
select {
case <-ctx.Done():
return
case task := <-n.taskC:
apiLimiter.Wait(ctx)
n.consume(task)
}
}
}
func (n *Notifier) consume(task notifyTask) {
if n.broadcast {
if n.Subscribers == nil {
return
}
if task.message != "" {
n.Broadcast(task.message)
}
for _, text := range task.texts {
n.Broadcast(text)
}
if task.photoBuffer == nil {
return
}
for chatID := range n.Subscribers {
chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10))
if err != nil {
log.WithError(err).Error("can not get chat by ID")
continue
}
album := telebot.Album{
photoFromBuffer(task.photoBuffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("failed to send message")
}
}
} else if n.Chats != nil {
for _, chat := range n.Chats {
if task.message != "" {
if _, err := n.bot.Send(chat, task.message); err != nil {
log.WithError(err).Error("telegram send error")
}
}
for _, text := range task.texts {
if _, err := n.bot.Send(chat, text); err != nil {
log.WithError(err).Error("telegram send error")
}
}
if task.photoBuffer != nil {
album := telebot.Album{
photoFromBuffer(task.photoBuffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("telegram send error")
}
}
}
}
}
func (n *Notifier) Notify(obj interface{}, args ...interface{}) {
n.NotifyTo("", obj, args...)
}
@ -110,23 +191,13 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
}
if n.broadcast {
n.Broadcast(message)
for _, text := range texts {
n.Broadcast(text)
}
} else if n.Chats != nil {
for _, chat := range n.Chats {
if _, err := n.bot.Send(chat, message); err != nil {
log.WithError(err).Error("telegram send error")
}
for _, text := range texts {
if _, err := n.bot.Send(chat, text); err != nil {
log.WithError(err).Error("telegram send error")
}
}
}
select {
case n.taskC <- notifyTask{
texts: texts,
message: message,
}:
default:
log.Error("[telegram] cannot send task to notify")
}
}
@ -142,33 +213,12 @@ func photoFromBuffer(buffer *bytes.Buffer) telebot.InputMedia {
}
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
if n.broadcast {
if n.Subscribers == nil {
return
}
for chatID := range n.Subscribers {
chat, err := n.bot.ChatByID(strconv.FormatInt(chatID, 10))
if err != nil {
log.WithError(err).Error("can not get chat by ID")
continue
}
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("failed to send message")
}
}
} else if n.Chats != nil {
for _, chat := range n.Chats {
album := telebot.Album{
photoFromBuffer(buffer),
}
if _, err := n.bot.SendAlbum(chat, album); err != nil {
log.WithError(err).Error("telegram send error")
}
}
select {
case n.taskC <- notifyTask{
photoBuffer: buffer,
}:
case <-time.After(50 * time.Millisecond):
return
}
}