From fc322079ba61b9562a3d724ef475a5d60c9555fd Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 8 Nov 2024 08:37:08 +0800 Subject: [PATCH 1/9] xmaker: reset book after emitting reconnect --- pkg/strategy/xmaker/strategy.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index c3b31c78f..af2b3a8fb 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -655,8 +655,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.Symbol, time.Since(bookLastUpdateTime)) - s.sourceBook.Reset() s.sourceSession.MarketDataStream.Reconnect() + s.sourceBook.Reset() return err } @@ -665,8 +665,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.Symbol, time.Since(bookLastUpdateTime)) - s.sourceBook.Reset() s.sourceSession.MarketDataStream.Reconnect() + s.sourceBook.Reset() return err } From d7eef21a0097ce7fd0061024d4993133fdbd8f6e Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 8 Nov 2024 12:31:25 +0800 Subject: [PATCH 2/9] slacknotifier: cache slack user groups --- pkg/notifier/slacknotifier/slack.go | 74 +++++++++++++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 238a6bca7..09de2ef9a 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -3,7 +3,9 @@ package slacknotifier import ( "bytes" "context" + "errors" "fmt" + "regexp" "time" "golang.org/x/time/rate" @@ -29,6 +31,9 @@ type Notifier struct { taskC chan notifyTask liveNotePool *livenote.Pool + + userIdCache map[string]string + groupIdCache map[string]slack.UserGroup } type NotifyOption func(notifier *Notifier) @@ -39,12 +44,38 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie client: client, taskC: make(chan notifyTask, 100), liveNotePool: livenote.NewPool(100), + userIdCache: make(map[string]string, 30), + groupIdCache: make(map[string]slack.UserGroup, 50), } for _, o := range options { o(notifier) } + userGroups, err := client.GetUserGroupsContext(context.Background()) + if err != nil { + log.WithError(err).Error("failed to get the slack user groups") + } else { + for _, group := range userGroups { + notifier.groupIdCache[group.Name] = group + } + + // user groups: map[ + // Development Team:{ + // ID:S08004CQYQK + // TeamID:T036FASR3 + // IsUserGroup:true + // Name:Development Team + // Description:dev + // Handle:dev + // IsExternal:false + // DateCreate:"Fri Nov 8" + // DateUpdate:"Fri Nov 8" DateDelete:"Thu Jan 1" + // AutoType: CreatedBy:U036FASR5 UpdatedBy:U12345678 DeletedBy: + // Prefs:{Channels:[] Groups:[]} UserCount:1 Users:[]}] + log.Debugf("slack user groups: %+v", notifier.groupIdCache) + } + go notifier.worker() return notifier @@ -71,6 +102,34 @@ func (n *Notifier) worker() { } } +// userIdRegExp matches strings like <@U012AB3CD> +var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) + +// groupIdRegExp matches strings like +var groupIdRegExp = regexp.MustCompile(`^$`) + +func (n *Notifier) lookupUserID(ctx context.Context, username string) (string, error) { + if username == "" { + return "", errors.New("username is empty") + } + + // if the given username is already in slack user id format, we don't need to look up + if userIdRegExp.MatchString(username) { + return username, nil + } + + if id, exists := n.userIdCache[username]; exists { + return id, nil + } + + slackUser, err := n.client.GetUserInfoContext(ctx, username) + if err != nil { + return "", err + } + + return slackUser.ID, nil +} + func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { note := n.liveNotePool.Update(obj) ctx := context.Background() @@ -91,20 +150,25 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment)) var userIds []string - var mentions []*livenote.Mention - var comments []*livenote.Comment + var mentions []*livenote.OptionMention + var comments []*livenote.OptionComment for _, opt := range opts { switch val := opt.(type) { - case *livenote.Mention: + case *livenote.OptionMention: mentions = append(mentions, val) - userIds = append(userIds, val.User) - case *livenote.Comment: + userIds = append(userIds, val.Users...) + case *livenote.OptionComment: comments = append(comments, val) userIds = append(userIds, val.Users...) } } + // format: mention slack user + // <@U012AB3CD> + if note.MessageID != "" { + // If compare is enabled, we need to attach the comments + // UpdateMessageContext returns channel, timestamp, text, err _, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) if err != nil { From 4b24e9a480d1dd1113d33868dbc3b0e387f6ac9c Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 8 Nov 2024 12:40:02 +0800 Subject: [PATCH 3/9] slacknotifier: implement and improve translateHandle --- pkg/notifier/slacknotifier/slack.go | 46 +++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 09de2ef9a..e9050eebc 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -32,7 +32,7 @@ type Notifier struct { liveNotePool *livenote.Pool - userIdCache map[string]string + userIdCache map[string]*slack.User groupIdCache map[string]slack.UserGroup } @@ -44,7 +44,7 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie client: client, taskC: make(chan notifyTask, 100), liveNotePool: livenote.NewPool(100), - userIdCache: make(map[string]string, 30), + userIdCache: make(map[string]*slack.User, 30), groupIdCache: make(map[string]slack.UserGroup, 50), } @@ -108,26 +108,48 @@ var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) // groupIdRegExp matches strings like var groupIdRegExp = regexp.MustCompile(`^$`) -func (n *Notifier) lookupUserID(ctx context.Context, username string) (string, error) { - if username == "" { - return "", errors.New("username is empty") +func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, error) { + if handle == "" { + return "", errors.New("handle is empty") } - // if the given username is already in slack user id format, we don't need to look up - if userIdRegExp.MatchString(username) { - return username, nil + // trim the prefix '@' if we get a string like '@username' + if handle[0] == '@' { + handle = handle[1:] } - if id, exists := n.userIdCache[username]; exists { - return id, nil + // if the given handle is already in slack user id format, we don't need to look up + if userIdRegExp.MatchString(handle) || groupIdRegExp.MatchString(handle) { + return handle, nil } - slackUser, err := n.client.GetUserInfoContext(ctx, username) + if user, exists := n.userIdCache[handle]; exists { + return toUserHandle(user.ID), nil + } + + if group, exists := n.groupIdCache[handle]; exists { + return toSubteamHandle(group.ID), nil + } + + slackUser, err := n.client.GetUserInfoContext(ctx, handle) if err != nil { return "", err } - return slackUser.ID, nil + if slackUser != nil { + n.userIdCache[handle] = slackUser + return toUserHandle(slackUser.ID), nil + } + + return "", fmt.Errorf("handle %s not found", handle) +} + +func toUserHandle(id string) string { + return fmt.Sprintf("<@%s>", id) +} + +func toSubteamHandle(id string) string { + return fmt.Sprintf("", id) } func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { From d2b971d75e41b79fdc49f9305a1a7c0e992a5923 Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 8 Nov 2024 13:12:23 +0800 Subject: [PATCH 4/9] slacknotifier,livenote: fix slack user lookup and add one time mention --- pkg/livenote/options.go | 25 +++++++++-- pkg/notifier/slacknotifier/slack.go | 67 +++++++++++++++++++++++------ 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/pkg/livenote/options.go b/pkg/livenote/options.go index 216dddec0..cf120f46e 100644 --- a/pkg/livenote/options.go +++ b/pkg/livenote/options.go @@ -2,11 +2,30 @@ package livenote type Option interface{} -type Mention struct { - User string +type OptionCompare struct { + Value bool } -type Comment struct { +func CompareObject(value bool) *OptionCompare { + return &OptionCompare{Value: value} +} + +type OptionOneTimeMention struct { + Users []string +} + +func OneTimeMention(users ...string) *OptionOneTimeMention { + return &OptionOneTimeMention{Users: users} +} + +type OptionComment struct { Text string Users []string } + +func Comment(text string, users ...string) *OptionComment { + return &OptionComment{ + Text: text, + Users: users, + } +} diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index e9050eebc..642b05e0a 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "regexp" + "strings" "time" "golang.org/x/time/rate" @@ -108,6 +109,20 @@ var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) // groupIdRegExp matches strings like var groupIdRegExp = regexp.MustCompile(`^$`) +func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { + var tags []string + for _, handle := range handles { + tag, err := n.translateHandle(ctx, handle) + if err != nil { + return nil, err + } + + tags = append(tags, tag) + } + + return tags, nil +} + func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, error) { if handle == "" { return "", errors.New("handle is empty") @@ -133,7 +148,7 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, slackUser, err := n.client.GetUserInfoContext(ctx, handle) if err != nil { - return "", err + return "", fmt.Errorf("user handle %s not found: %v", handle, err) } if slackUser != nil { @@ -144,14 +159,6 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, return "", fmt.Errorf("handle %s not found", handle) } -func toUserHandle(id string) string { - return fmt.Sprintf("<@%s>", id) -} - -func toSubteamHandle(id string) string { - return fmt.Sprintf("", id) -} - func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { note := n.liveNotePool.Update(obj) ctx := context.Background() @@ -171,20 +178,31 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er var slackOpts []slack.MsgOption slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment)) - var userIds []string - var mentions []*livenote.OptionMention + var firstTimeHandles []string + var commentHandles []string + var mentions []*livenote.OptionOneTimeMention var comments []*livenote.OptionComment for _, opt := range opts { switch val := opt.(type) { - case *livenote.OptionMention: + case *livenote.OptionOneTimeMention: mentions = append(mentions, val) - userIds = append(userIds, val.Users...) + firstTimeHandles = append(firstTimeHandles, val.Users...) case *livenote.OptionComment: comments = append(comments, val) - userIds = append(userIds, val.Users...) + commentHandles = append(commentHandles, val.Users...) } } + firstTimeTags, err := n.translateHandles(context.Background(), firstTimeHandles) + if err != nil { + return err + } + + commentTags, err := n.translateHandles(context.Background(), commentHandles) + if err != nil { + return err + } + // format: mention slack user // <@U012AB3CD> @@ -197,6 +215,8 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er return err } + _ = commentTags + } else { respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) if err != nil { @@ -208,6 +228,13 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetChannelID(respCh) note.SetMessageID(respTs) + + _, _, err = n.client.PostMessageContext(ctx, channel, + slack.MsgOptionText(joinTags(firstTimeTags), false), + slack.MsgOptionTS(respTs)) + if err != nil { + return err + } } return nil @@ -310,3 +337,15 @@ func (n *Notifier) SendPhoto(buffer *bytes.Buffer) { func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { // TODO } + +func toUserHandle(id string) string { + return fmt.Sprintf("<@%s>", id) +} + +func toSubteamHandle(id string) string { + return fmt.Sprintf("", id) +} + +func joinTags(tags []string) string { + return strings.Join(tags, " ") +} From a2e17eab4d5153d76975c44e6cea3d18556a2643 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 14:31:12 +0800 Subject: [PATCH 5/9] slacknotifier: lookup user by email --- pkg/notifier/slacknotifier/slack.go | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 642b05e0a..7bb43772d 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -109,6 +109,8 @@ var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) // groupIdRegExp matches strings like var groupIdRegExp = regexp.MustCompile(`^$`) +var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") + func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { var tags []string for _, handle := range handles { @@ -146,9 +148,18 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, return toSubteamHandle(group.ID), nil } - slackUser, err := n.client.GetUserInfoContext(ctx, handle) - if err != nil { - return "", fmt.Errorf("user handle %s not found: %v", handle, err) + var slackUser *slack.User + var err error + if emailRegExp.MatchString(handle) { + slackUser, err = n.client.GetUserByEmailContext(ctx, handle) + if err != nil { + return "", fmt.Errorf("user %s not found: %v", handle, err) + } + } else { + slackUser, err = n.client.GetUserInfoContext(ctx, handle) + if err != nil { + return "", fmt.Errorf("user handle %s not found: %v", handle, err) + } } if slackUser != nil { From c4c6a7377470317ea49e274926f791f5e3efe611 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 15:24:45 +0800 Subject: [PATCH 6/9] slacknotifier: post comments in the liveobject threads --- config/example/livenote.yaml | 1 + pkg/notifier/slacknotifier/slack.go | 175 +++++++++++++++++----- pkg/strategy/example/livenote/strategy.go | 21 ++- 3 files changed, 158 insertions(+), 39 deletions(-) diff --git a/config/example/livenote.yaml b/config/example/livenote.yaml index 662aab4ac..b83142aba 100644 --- a/config/example/livenote.yaml +++ b/config/example/livenote.yaml @@ -14,3 +14,4 @@ exchangeStrategies: livenote: symbol: BTCUSDT interval: 5m + userID: U12345678 diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 7bb43772d..dad711459 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -18,14 +18,43 @@ import ( "github.com/slack-go/slack" ) -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 3) +var limiter = rate.NewLimiter(rate.Every(600*time.Millisecond), 3) + +// userIdRegExp matches strings like <@U012AB3CD> +var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) + +// groupIdRegExp matches strings like +var groupIdRegExp = regexp.MustCompile(`^$`) + +var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") type notifyTask struct { - Channel string - Opts []slack.MsgOption + channel string + + isUpdate bool + + // messageTs is the message timestamp, this is required when isUpdate is true + messageTs string + + // threadTs is the thread timestamp + threadTs string + + opts []slack.MsgOption } +func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) { + t.opts = append(t.opts, opts...) +} + +// Notifier is a slack notifier +// +// To use this notifier, you need to setup the slack app permissions: +// - channels:read +// - chat:write type Notifier struct { + ctx context.Context + cancel context.CancelFunc + client *slack.Client channel string @@ -39,11 +68,29 @@ type Notifier struct { type NotifyOption func(notifier *Notifier) +const defaultQueueSize = 500 + +func OptionContext(baseCtx context.Context) NotifyOption { + return func(notifier *Notifier) { + ctx, cancel := context.WithCancel(baseCtx) + notifier.ctx = ctx + notifier.cancel = cancel + } +} + +func OptionQueueSize(size int) NotifyOption { + return NotifyOption(func(notifier *Notifier) { + notifier.taskC = make(chan notifyTask, size) + }) +} + func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier { notifier := &Notifier{ + ctx: context.Background(), + cancel: func() {}, channel: channel, client: client, - taskC: make(chan notifyTask, 100), + taskC: make(chan notifyTask, defaultQueueSize), liveNotePool: livenote.NewPool(100), userIdCache: make(map[string]*slack.User, 30), groupIdCache: make(map[string]slack.UserGroup, 50), @@ -53,7 +100,7 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie o(notifier) } - userGroups, err := client.GetUserGroupsContext(context.Background()) + userGroups, err := client.GetUserGroupsContext(notifier.ctx) if err != nil { log.WithError(err).Error("failed to get the slack user groups") } else { @@ -77,39 +124,50 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie log.Debugf("slack user groups: %+v", notifier.groupIdCache) } - go notifier.worker() + go notifier.worker(notifier.ctx) return notifier } -func (n *Notifier) worker() { - ctx := context.Background() +func (n *Notifier) worker(ctx context.Context) { + defer n.cancel() + for { select { case <-ctx.Done(): return case task := <-n.taskC: - // ignore the wait error - _ = limiter.Wait(ctx) - - _, _, err := n.client.PostMessageContext(ctx, task.Channel, task.Opts...) - if err != nil { + if err := n.executeTask(ctx, task); err != nil { log.WithError(err). - WithField("channel", task.Channel). + WithField("channel", task.channel). Errorf("slack api error: %s", err.Error()) } } } } -// userIdRegExp matches strings like <@U012AB3CD> -var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) +func (n *Notifier) executeTask(ctx context.Context, task notifyTask) error { + // ignore the wait error + if err := limiter.Wait(ctx); err != nil { + log.WithError(err).Warnf("slack rate limiter error") + } -// groupIdRegExp matches strings like -var groupIdRegExp = regexp.MustCompile(`^$`) + if task.threadTs != "" { + task.addMsgOption(slack.MsgOptionTS(task.threadTs)) + } -var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") + if task.isUpdate && task.messageTs != "" { + task.addMsgOption(slack.MsgOptionUpdate(task.messageTs)) + } + + _, _, err := n.client.PostMessageContext(ctx, task.channel, task.opts...) + if err != nil { + return err + } + + return nil +} func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { var tags []string @@ -172,7 +230,7 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { note := n.liveNotePool.Update(obj) - ctx := context.Background() + ctx := n.ctx channel := note.ChannelID if channel == "" { @@ -191,15 +249,13 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er var firstTimeHandles []string var commentHandles []string - var mentions []*livenote.OptionOneTimeMention - var comments []*livenote.OptionComment + var comments []string for _, opt := range opts { switch val := opt.(type) { case *livenote.OptionOneTimeMention: - mentions = append(mentions, val) firstTimeHandles = append(firstTimeHandles, val.Users...) case *livenote.OptionComment: - comments = append(comments, val) + comments = append(comments, val.Text) commentHandles = append(commentHandles, val.Users...) } } @@ -221,12 +277,27 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er // If compare is enabled, we need to attach the comments // UpdateMessageContext returns channel, timestamp, text, err - _, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) + respCh, respTs, _, err := n.client.UpdateMessageContext(ctx, note.ChannelID, note.MessageID, slackOpts...) if err != nil { return err } - _ = commentTags + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } } else { respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) @@ -240,11 +311,32 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetChannelID(respCh) note.SetMessageID(respTs) - _, _, err = n.client.PostMessageContext(ctx, channel, - slack.MsgOptionText(joinTags(firstTimeTags), false), - slack.MsgOptionTS(respTs)) - if err != nil { - return err + if len(firstTimeTags) > 0 { + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(joinTags(firstTimeTags), false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } + + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) } } @@ -331,13 +423,20 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } + n.queueTask(context.Background(), notifyTask{ + channel: channel, + opts: opts, + }, 100*time.Millisecond) +} + +func (n *Notifier) queueTask(ctx context.Context, task notifyTask, timeout time.Duration) { select { - case n.taskC <- notifyTask{ - Channel: channel, - Opts: opts, - }: - case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): return + case <-time.After(timeout): + log.Warnf("slack notify task is dropped due to timeout %s", timeout) + return + case n.taskC <- task: } } @@ -360,3 +459,7 @@ func toSubteamHandle(id string) string { func joinTags(tags []string) string { return strings.Join(tags, " ") } + +func joinComments(comments []string) string { + return strings.Join(comments, "\n") +} diff --git a/pkg/strategy/example/livenote/strategy.go b/pkg/strategy/example/livenote/strategy.go index ab4489e56..341c906e2 100644 --- a/pkg/strategy/example/livenote/strategy.go +++ b/pkg/strategy/example/livenote/strategy.go @@ -7,6 +7,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" ) @@ -32,7 +33,10 @@ func init() { // Strategy is a struct that contains the settings of your strategy. // These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically. type Strategy struct { - Symbol string `json:"symbol"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + + UserID string `json:"userID"` } func (s *Strategy) ID() string { @@ -43,15 +47,26 @@ func (s *Strategy) InstanceID() string { return ID + "-" + s.Symbol } +func (s *Strategy) Defaults() error { + if s.Interval == "" { + s.Interval = types.Interval1m + } + + return nil +} + func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { - session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) + session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } // This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { callback := func(k types.KLine) { log.Info(k) - bbgo.PostLiveNote(&k) + bbgo.PostLiveNote(&k, + livenote.OneTimeMention(s.UserID), + livenote.Comment("please check the deposit", s.UserID), + livenote.CompareObject(true)) } // register our kline event handler From c289c2daf508580269b9250a754bf8e5eafad0ed Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 16:07:04 +0800 Subject: [PATCH 7/9] livenote: add test and generate diff summary for object change --- pkg/dynamic/compare_test.go | 47 ++++++++++++ pkg/livenote/livenote.go | 15 ++++ pkg/notifier/slacknotifier/slack.go | 89 ++++++++++++++++------- pkg/strategy/example/livenote/strategy.go | 2 +- 4 files changed, 125 insertions(+), 28 deletions(-) diff --git a/pkg/dynamic/compare_test.go b/pkg/dynamic/compare_test.go index 36200f545..a4bf3133d 100644 --- a/pkg/dynamic/compare_test.go +++ b/pkg/dynamic/compare_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/c9s/bbgo/pkg/fixedpoint" + . "github.com/c9s/bbgo/pkg/testing/testhelper" "github.com/c9s/bbgo/pkg/types" ) @@ -94,6 +95,52 @@ func Test_Compare(t *testing.T) { }, }, }, + { + name: "kline", + wantErr: assert.NoError, + a: types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60100), + }, + b: types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60200), + }, + want: []Diff{ + { + Field: "Close", + Before: "60200", + After: "60100", + }, + }, + }, + { + name: "kline ptr", + wantErr: assert.NoError, + a: &types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60100), + }, + b: &types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60200), + }, + want: []Diff{ + { + Field: "Close", + Before: "60200", + After: "60100", + }, + }, + }, { name: "deposit and order", wantErr: assert.NoError, diff --git a/pkg/livenote/livenote.go b/pkg/livenote/livenote.go index 3cac0c3cc..af01cef95 100644 --- a/pkg/livenote/livenote.go +++ b/pkg/livenote/livenote.go @@ -56,6 +56,21 @@ func NewPool(size int64) *Pool { } } +func (p *Pool) Get(obj Object) *LiveNote { + objID := obj.ObjectID() + + p.mu.Lock() + defer p.mu.Unlock() + + for _, note := range p.notes { + if note.ObjectID() == objID { + return note + } + } + + return nil +} + func (p *Pool) Update(obj Object) *LiveNote { objID := obj.ObjectID() diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index dad711459..ef938b05f 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -11,6 +11,7 @@ import ( "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/dynamic" "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" @@ -229,12 +230,49 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, } func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { - note := n.liveNotePool.Update(obj) - ctx := n.ctx + var slackOpts []slack.MsgOption - channel := note.ChannelID - if channel == "" { - channel = n.channel + var firstTimeHandles []string + var commentHandles []string + var comments []string + var shouldCompare bool + for _, opt := range opts { + switch val := opt.(type) { + case *livenote.OptionOneTimeMention: + firstTimeHandles = append(firstTimeHandles, val.Users...) + case *livenote.OptionComment: + comments = append(comments, val.Text) + commentHandles = append(commentHandles, val.Users...) + case *livenote.OptionCompare: + shouldCompare = val.Value + } + } + + var ctx = n.ctx + var curObj, prevObj any + if shouldCompare { + if prevNote := n.liveNotePool.Get(obj); prevNote != nil { + prevObj = prevNote.Object + } + } + + channel := n.channel + note := n.liveNotePool.Update(obj) + curObj = note.Object + + if shouldCompare && prevObj != nil { + diffs, err := dynamic.Compare(curObj, prevObj) + if err != nil { + log.WithError(err).Warnf("unable to compare objects: %T and %T", curObj, prevObj) + } else { + if comment := diffsToComment(curObj, diffs); len(comment) > 0 { + comments = append(comments, comment) + } + } + } + + if note.ChannelID != "" { + channel = note.ChannelID } var attachment slack.Attachment @@ -244,35 +282,18 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er return fmt.Errorf("livenote object does not support types.SlackAttachmentCreator interface") } - var slackOpts []slack.MsgOption slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment)) - var firstTimeHandles []string - var commentHandles []string - var comments []string - for _, opt := range opts { - switch val := opt.(type) { - case *livenote.OptionOneTimeMention: - firstTimeHandles = append(firstTimeHandles, val.Users...) - case *livenote.OptionComment: - comments = append(comments, val.Text) - commentHandles = append(commentHandles, val.Users...) - } - } - - firstTimeTags, err := n.translateHandles(context.Background(), firstTimeHandles) + firstTimeTags, err := n.translateHandles(n.ctx, firstTimeHandles) if err != nil { return err } - commentTags, err := n.translateHandles(context.Background(), commentHandles) + commentTags, err := n.translateHandles(n.ctx, commentHandles) if err != nil { return err } - // format: mention slack user - // <@U012AB3CD> - if note.MessageID != "" { // If compare is enabled, we need to attach the comments @@ -312,7 +333,7 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetMessageID(respTs) if len(firstTimeTags) > 0 { - n.queueTask(context.Background(), notifyTask{ + n.queueTask(n.ctx, notifyTask{ channel: respCh, threadTs: respTs, opts: []slack.MsgOption{ @@ -321,7 +342,7 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er }, }, 100*time.Millisecond) } - + if len(comments) > 0 { var text string if len(commentTags) > 0 { @@ -329,7 +350,7 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er } text += joinComments(comments) - n.queueTask(context.Background(), notifyTask{ + n.queueTask(n.ctx, notifyTask{ channel: respCh, threadTs: respTs, opts: []slack.MsgOption{ @@ -463,3 +484,17 @@ func joinTags(tags []string) string { func joinComments(comments []string) string { return strings.Join(comments, "\n") } + +func diffsToComment(obj any, diffs []dynamic.Diff) (text string) { + if len(diffs) == 0 { + return text + } + + text += fmt.Sprintf("%T updated\n", obj) + + for _, diff := range diffs { + text += fmt.Sprintf("- %s: `%s` transited to `%s`\n", diff.Field, diff.Before, diff.After) + } + + return text +} diff --git a/pkg/strategy/example/livenote/strategy.go b/pkg/strategy/example/livenote/strategy.go index 341c906e2..be0aa868a 100644 --- a/pkg/strategy/example/livenote/strategy.go +++ b/pkg/strategy/example/livenote/strategy.go @@ -65,7 +65,7 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se log.Info(k) bbgo.PostLiveNote(&k, livenote.OneTimeMention(s.UserID), - livenote.Comment("please check the deposit", s.UserID), + livenote.Comment("please check the deposit"), livenote.CompareObject(true)) } From 0b2fdd471eb7b2012784463b0ad36db3627ff9c7 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 16:52:09 +0800 Subject: [PATCH 8/9] livenote: support pin and expiry time --- pkg/livenote/livenote.go | 40 ++++++++++++++++++++++- pkg/livenote/options.go | 20 ++++++++++++ pkg/notifier/slacknotifier/slack.go | 25 ++++++++++++++ pkg/strategy/example/livenote/strategy.go | 6 +++- 4 files changed, 89 insertions(+), 2 deletions(-) diff --git a/pkg/livenote/livenote.go b/pkg/livenote/livenote.go index af01cef95..b9f6a1879 100644 --- a/pkg/livenote/livenote.go +++ b/pkg/livenote/livenote.go @@ -1,6 +1,9 @@ package livenote -import "sync" +import ( + "sync" + "time" +) type Object interface { ObjectID() string @@ -13,9 +16,15 @@ type LiveNote struct { ChannelID string `json:"channelId"` + Pin bool `json:"pin"` + + TimeToLive time.Duration `json:"timeToLive"` + Object Object cachedObjID string + + postedTime time.Time } func NewLiveNote(object Object) *LiveNote { @@ -33,6 +42,14 @@ func (n *LiveNote) ObjectID() string { return n.cachedObjID } +func (n *LiveNote) SetTimeToLive(du time.Duration) { + n.TimeToLive = du +} + +func (n *LiveNote) SetPostedTime(tt time.Time) { + n.postedTime = tt +} + func (n *LiveNote) SetObject(object Object) { n.Object = object } @@ -45,6 +62,19 @@ func (n *LiveNote) SetChannelID(channelID string) { n.ChannelID = channelID } +func (n *LiveNote) SetPin(enabled bool) { + n.Pin = enabled +} + +func (n *LiveNote) IsExpired(now time.Time) bool { + if n.postedTime.IsZero() || n.TimeToLive == 0 { + return false + } + + expiryTime := n.postedTime.Add(n.TimeToLive) + return now.After(expiryTime) +} + type Pool struct { notes map[string]*LiveNote mu sync.Mutex @@ -64,6 +94,10 @@ func (p *Pool) Get(obj Object) *LiveNote { for _, note := range p.notes { if note.ObjectID() == objID { + if note.IsExpired(time.Now()) { + return nil + } + return note } } @@ -79,6 +113,10 @@ func (p *Pool) Update(obj Object) *LiveNote { for _, note := range p.notes { if note.ObjectID() == objID { + if note.IsExpired(time.Now()) { + break + } + // update the object inside the note note.SetObject(obj) return note diff --git a/pkg/livenote/options.go b/pkg/livenote/options.go index cf120f46e..f3ce1736d 100644 --- a/pkg/livenote/options.go +++ b/pkg/livenote/options.go @@ -1,5 +1,7 @@ package livenote +import "time" + type Option interface{} type OptionCompare struct { @@ -29,3 +31,21 @@ func Comment(text string, users ...string) *OptionComment { Users: users, } } + +type OptionTimeToLive struct { + Duration time.Duration +} + +func TimeToLive(du time.Duration) *OptionTimeToLive { + return &OptionTimeToLive{Duration: du} +} + +type OptionPin struct { + Value bool +} + +func Pin(value bool) *OptionPin { + return &OptionPin{ + Value: value, + } +} diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index ef938b05f..f8f17492c 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -52,6 +52,8 @@ func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) { // To use this notifier, you need to setup the slack app permissions: // - channels:read // - chat:write +// +// When using "pins", you will need permission: "pins:write" type Notifier struct { ctx context.Context cancel context.CancelFunc @@ -236,6 +238,9 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er var commentHandles []string var comments []string var shouldCompare bool + var shouldPin bool + var ttl time.Duration = 0 + for _, opt := range opts { switch val := opt.(type) { case *livenote.OptionOneTimeMention: @@ -245,6 +250,10 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er commentHandles = append(commentHandles, val.Users...) case *livenote.OptionCompare: shouldCompare = val.Value + case *livenote.OptionPin: + shouldPin = val.Value + case *livenote.OptionTimeToLive: + ttl = val.Duration } } @@ -260,6 +269,10 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note := n.liveNotePool.Update(obj) curObj = note.Object + if ttl > 0 { + note.SetTimeToLive(ttl) + } + if shouldCompare && prevObj != nil { diffs, err := dynamic.Compare(curObj, prevObj) if err != nil { @@ -331,6 +344,18 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetChannelID(respCh) note.SetMessageID(respTs) + note.SetPostedTime(time.Now()) + + if shouldPin { + note.SetPin(true) + + if err := n.client.AddPinContext(ctx, respCh, slack.ItemRef{ + Channel: respCh, + Timestamp: respTs, + }); err != nil { + log.WithError(err).Warnf("unable to pin the slack message: %s", respTs) + } + } if len(firstTimeTags) > 0 { n.queueTask(n.ctx, notifyTask{ diff --git a/pkg/strategy/example/livenote/strategy.go b/pkg/strategy/example/livenote/strategy.go index be0aa868a..4e0db4dc3 100644 --- a/pkg/strategy/example/livenote/strategy.go +++ b/pkg/strategy/example/livenote/strategy.go @@ -2,6 +2,7 @@ package livenote import ( "context" + "time" "github.com/sirupsen/logrus" @@ -66,7 +67,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se bbgo.PostLiveNote(&k, livenote.OneTimeMention(s.UserID), livenote.Comment("please check the deposit"), - livenote.CompareObject(true)) + livenote.CompareObject(true), + livenote.TimeToLive(time.Minute), + // livenote.Pin(true), + ) } // register our kline event handler From 9b1060e16d26ea7b9e3c18627d800d40afb98771 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 16:56:45 +0800 Subject: [PATCH 9/9] livenote: add option channel --- pkg/livenote/options.go | 10 ++++++++++ pkg/notifier/slacknotifier/slack.go | 8 +++++++- 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/pkg/livenote/options.go b/pkg/livenote/options.go index f3ce1736d..ce5713d2c 100644 --- a/pkg/livenote/options.go +++ b/pkg/livenote/options.go @@ -4,6 +4,16 @@ import "time" type Option interface{} +type OptionChannel struct { + Channel string +} + +func Channel(channel string) *OptionChannel { + return &OptionChannel{ + Channel: channel, + } +} + type OptionCompare struct { Value bool } diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index f8f17492c..ba5f9d06c 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -241,6 +241,9 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er var shouldPin bool var ttl time.Duration = 0 + // load the default channel + channel := n.channel + for _, opt := range opts { switch val := opt.(type) { case *livenote.OptionOneTimeMention: @@ -254,6 +257,10 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er shouldPin = val.Value case *livenote.OptionTimeToLive: ttl = val.Duration + case *livenote.OptionChannel: + if val.Channel != "" { + channel = val.Channel + } } } @@ -265,7 +272,6 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er } } - channel := n.channel note := n.liveNotePool.Update(obj) curObj = note.Object