slacknotifier: post comments in the liveobject threads

This commit is contained in:
c9s 2024-11-11 15:24:45 +08:00
parent a2e17eab4d
commit c4c6a73774
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 158 additions and 39 deletions

View File

@ -14,3 +14,4 @@ exchangeStrategies:
livenote: livenote:
symbol: BTCUSDT symbol: BTCUSDT
interval: 5m interval: 5m
userID: U12345678

View File

@ -18,14 +18,43 @@ import (
"github.com/slack-go/slack" "github.com/slack-go/slack"
) )
var limiter = rate.NewLimiter(rate.Every(1*time.Second), 3) var limiter = rate.NewLimiter(rate.Every(600*time.Millisecond), 3)
// userIdRegExp matches strings like <@U012AB3CD>
var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`)
// groupIdRegExp matches strings like <!subteam^ID>
var groupIdRegExp = regexp.MustCompile(`^<!subteam\^(.+?)>$`)
var emailRegExp = regexp.MustCompile("`^(?P<name>[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P<domain>[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm")
type notifyTask struct { type notifyTask struct {
Channel string channel string
Opts []slack.MsgOption
isUpdate bool
// messageTs is the message timestamp, this is required when isUpdate is true
messageTs string
// threadTs is the thread timestamp
threadTs string
opts []slack.MsgOption
} }
func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) {
t.opts = append(t.opts, opts...)
}
// Notifier is a slack notifier
//
// To use this notifier, you need to setup the slack app permissions:
// - channels:read
// - chat:write
type Notifier struct { type Notifier struct {
ctx context.Context
cancel context.CancelFunc
client *slack.Client client *slack.Client
channel string channel string
@ -39,11 +68,29 @@ type Notifier struct {
type NotifyOption func(notifier *Notifier) type NotifyOption func(notifier *Notifier)
const defaultQueueSize = 500
func OptionContext(baseCtx context.Context) NotifyOption {
return func(notifier *Notifier) {
ctx, cancel := context.WithCancel(baseCtx)
notifier.ctx = ctx
notifier.cancel = cancel
}
}
func OptionQueueSize(size int) NotifyOption {
return NotifyOption(func(notifier *Notifier) {
notifier.taskC = make(chan notifyTask, size)
})
}
func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier { func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier {
notifier := &Notifier{ notifier := &Notifier{
ctx: context.Background(),
cancel: func() {},
channel: channel, channel: channel,
client: client, client: client,
taskC: make(chan notifyTask, 100), taskC: make(chan notifyTask, defaultQueueSize),
liveNotePool: livenote.NewPool(100), liveNotePool: livenote.NewPool(100),
userIdCache: make(map[string]*slack.User, 30), userIdCache: make(map[string]*slack.User, 30),
groupIdCache: make(map[string]slack.UserGroup, 50), groupIdCache: make(map[string]slack.UserGroup, 50),
@ -53,7 +100,7 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie
o(notifier) o(notifier)
} }
userGroups, err := client.GetUserGroupsContext(context.Background()) userGroups, err := client.GetUserGroupsContext(notifier.ctx)
if err != nil { if err != nil {
log.WithError(err).Error("failed to get the slack user groups") log.WithError(err).Error("failed to get the slack user groups")
} else { } else {
@ -77,39 +124,50 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie
log.Debugf("slack user groups: %+v", notifier.groupIdCache) log.Debugf("slack user groups: %+v", notifier.groupIdCache)
} }
go notifier.worker() go notifier.worker(notifier.ctx)
return notifier return notifier
} }
func (n *Notifier) worker() { func (n *Notifier) worker(ctx context.Context) {
ctx := context.Background() defer n.cancel()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case task := <-n.taskC: case task := <-n.taskC:
// ignore the wait error if err := n.executeTask(ctx, task); err != nil {
_ = limiter.Wait(ctx)
_, _, err := n.client.PostMessageContext(ctx, task.Channel, task.Opts...)
if err != nil {
log.WithError(err). log.WithError(err).
WithField("channel", task.Channel). WithField("channel", task.channel).
Errorf("slack api error: %s", err.Error()) Errorf("slack api error: %s", err.Error())
} }
} }
} }
} }
// userIdRegExp matches strings like <@U012AB3CD> func (n *Notifier) executeTask(ctx context.Context, task notifyTask) error {
var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) // ignore the wait error
if err := limiter.Wait(ctx); err != nil {
log.WithError(err).Warnf("slack rate limiter error")
}
// groupIdRegExp matches strings like <!subteam^ID> if task.threadTs != "" {
var groupIdRegExp = regexp.MustCompile(`^<!subteam\^(.+?)>$`) task.addMsgOption(slack.MsgOptionTS(task.threadTs))
}
var emailRegExp = regexp.MustCompile("`^(?P<name>[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P<domain>[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") if task.isUpdate && task.messageTs != "" {
task.addMsgOption(slack.MsgOptionUpdate(task.messageTs))
}
_, _, err := n.client.PostMessageContext(ctx, task.channel, task.opts...)
if err != nil {
return err
}
return nil
}
func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) {
var tags []string var tags []string
@ -172,7 +230,7 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string,
func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error {
note := n.liveNotePool.Update(obj) note := n.liveNotePool.Update(obj)
ctx := context.Background() ctx := n.ctx
channel := note.ChannelID channel := note.ChannelID
if channel == "" { if channel == "" {
@ -191,15 +249,13 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er
var firstTimeHandles []string var firstTimeHandles []string
var commentHandles []string var commentHandles []string
var mentions []*livenote.OptionOneTimeMention var comments []string
var comments []*livenote.OptionComment
for _, opt := range opts { for _, opt := range opts {
switch val := opt.(type) { switch val := opt.(type) {
case *livenote.OptionOneTimeMention: case *livenote.OptionOneTimeMention:
mentions = append(mentions, val)
firstTimeHandles = append(firstTimeHandles, val.Users...) firstTimeHandles = append(firstTimeHandles, val.Users...)
case *livenote.OptionComment: case *livenote.OptionComment:
comments = append(comments, val) comments = append(comments, val.Text)
commentHandles = append(commentHandles, val.Users...) commentHandles = append(commentHandles, val.Users...)
} }
} }
@ -221,12 +277,27 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er
// If compare is enabled, we need to attach the comments // If compare is enabled, we need to attach the comments
// UpdateMessageContext returns channel, timestamp, text, err // UpdateMessageContext returns channel, timestamp, text, err
_, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) respCh, respTs, _, err := n.client.UpdateMessageContext(ctx, note.ChannelID, note.MessageID, slackOpts...)
if err != nil { if err != nil {
return err return err
} }
_ = commentTags if len(comments) > 0 {
var text string
if len(commentTags) > 0 {
text = joinTags(commentTags) + " "
}
text += joinComments(comments)
n.queueTask(context.Background(), notifyTask{
channel: respCh,
threadTs: respTs,
opts: []slack.MsgOption{
slack.MsgOptionText(text, false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
}
} else { } else {
respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...)
@ -240,11 +311,32 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er
note.SetChannelID(respCh) note.SetChannelID(respCh)
note.SetMessageID(respTs) note.SetMessageID(respTs)
_, _, err = n.client.PostMessageContext(ctx, channel, if len(firstTimeTags) > 0 {
slack.MsgOptionText(joinTags(firstTimeTags), false), n.queueTask(context.Background(), notifyTask{
slack.MsgOptionTS(respTs)) channel: respCh,
if err != nil { threadTs: respTs,
return err opts: []slack.MsgOption{
slack.MsgOptionText(joinTags(firstTimeTags), false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
}
if len(comments) > 0 {
var text string
if len(commentTags) > 0 {
text = joinTags(commentTags) + " "
}
text += joinComments(comments)
n.queueTask(context.Background(), notifyTask{
channel: respCh,
threadTs: respTs,
opts: []slack.MsgOption{
slack.MsgOptionText(text, false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
} }
} }
@ -331,13 +423,20 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
} }
n.queueTask(context.Background(), notifyTask{
channel: channel,
opts: opts,
}, 100*time.Millisecond)
}
func (n *Notifier) queueTask(ctx context.Context, task notifyTask, timeout time.Duration) {
select { select {
case n.taskC <- notifyTask{ case <-ctx.Done():
Channel: channel,
Opts: opts,
}:
case <-time.After(50 * time.Millisecond):
return return
case <-time.After(timeout):
log.Warnf("slack notify task is dropped due to timeout %s", timeout)
return
case n.taskC <- task:
} }
} }
@ -360,3 +459,7 @@ func toSubteamHandle(id string) string {
func joinTags(tags []string) string { func joinTags(tags []string) string {
return strings.Join(tags, " ") return strings.Join(tags, " ")
} }
func joinComments(comments []string) string {
return strings.Join(comments, "\n")
}

View File

@ -7,6 +7,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/livenote"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -32,7 +33,10 @@ func init() {
// Strategy is a struct that contains the settings of your strategy. // Strategy is a struct that contains the settings of your strategy.
// These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically. // These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically.
type Strategy struct { type Strategy struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
UserID string `json:"userID"`
} }
func (s *Strategy) ID() string { func (s *Strategy) ID() string {
@ -43,15 +47,26 @@ func (s *Strategy) InstanceID() string {
return ID + "-" + s.Symbol return ID + "-" + s.Symbol
} }
func (s *Strategy) Defaults() error {
if s.Interval == "" {
s.Interval = types.Interval1m
}
return nil
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
} }
// This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed // This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
callback := func(k types.KLine) { callback := func(k types.KLine) {
log.Info(k) log.Info(k)
bbgo.PostLiveNote(&k) bbgo.PostLiveNote(&k,
livenote.OneTimeMention(s.UserID),
livenote.Comment("please check the deposit", s.UserID),
livenote.CompareObject(true))
} }
// register our kline event handler // register our kline event handler