diff --git a/config/example/livenote.yaml b/config/example/livenote.yaml index 662aab4ac..b83142aba 100644 --- a/config/example/livenote.yaml +++ b/config/example/livenote.yaml @@ -14,3 +14,4 @@ exchangeStrategies: livenote: symbol: BTCUSDT interval: 5m + userID: U12345678 diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 7bb43772d..dad711459 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -18,14 +18,43 @@ import ( "github.com/slack-go/slack" ) -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 3) +var limiter = rate.NewLimiter(rate.Every(600*time.Millisecond), 3) + +// userIdRegExp matches strings like <@U012AB3CD> +var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) + +// groupIdRegExp matches strings like +var groupIdRegExp = regexp.MustCompile(`^$`) + +var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") type notifyTask struct { - Channel string - Opts []slack.MsgOption + channel string + + isUpdate bool + + // messageTs is the message timestamp, this is required when isUpdate is true + messageTs string + + // threadTs is the thread timestamp + threadTs string + + opts []slack.MsgOption } +func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) { + t.opts = append(t.opts, opts...) +} + +// Notifier is a slack notifier +// +// To use this notifier, you need to setup the slack app permissions: +// - channels:read +// - chat:write type Notifier struct { + ctx context.Context + cancel context.CancelFunc + client *slack.Client channel string @@ -39,11 +68,29 @@ type Notifier struct { type NotifyOption func(notifier *Notifier) +const defaultQueueSize = 500 + +func OptionContext(baseCtx context.Context) NotifyOption { + return func(notifier *Notifier) { + ctx, cancel := context.WithCancel(baseCtx) + notifier.ctx = ctx + notifier.cancel = cancel + } +} + +func OptionQueueSize(size int) NotifyOption { + return NotifyOption(func(notifier *Notifier) { + notifier.taskC = make(chan notifyTask, size) + }) +} + func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier { notifier := &Notifier{ + ctx: context.Background(), + cancel: func() {}, channel: channel, client: client, - taskC: make(chan notifyTask, 100), + taskC: make(chan notifyTask, defaultQueueSize), liveNotePool: livenote.NewPool(100), userIdCache: make(map[string]*slack.User, 30), groupIdCache: make(map[string]slack.UserGroup, 50), @@ -53,7 +100,7 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie o(notifier) } - userGroups, err := client.GetUserGroupsContext(context.Background()) + userGroups, err := client.GetUserGroupsContext(notifier.ctx) if err != nil { log.WithError(err).Error("failed to get the slack user groups") } else { @@ -77,39 +124,50 @@ func New(client *slack.Client, channel string, options ...NotifyOption) *Notifie log.Debugf("slack user groups: %+v", notifier.groupIdCache) } - go notifier.worker() + go notifier.worker(notifier.ctx) return notifier } -func (n *Notifier) worker() { - ctx := context.Background() +func (n *Notifier) worker(ctx context.Context) { + defer n.cancel() + for { select { case <-ctx.Done(): return case task := <-n.taskC: - // ignore the wait error - _ = limiter.Wait(ctx) - - _, _, err := n.client.PostMessageContext(ctx, task.Channel, task.Opts...) - if err != nil { + if err := n.executeTask(ctx, task); err != nil { log.WithError(err). - WithField("channel", task.Channel). + WithField("channel", task.channel). Errorf("slack api error: %s", err.Error()) } } } } -// userIdRegExp matches strings like <@U012AB3CD> -var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) +func (n *Notifier) executeTask(ctx context.Context, task notifyTask) error { + // ignore the wait error + if err := limiter.Wait(ctx); err != nil { + log.WithError(err).Warnf("slack rate limiter error") + } -// groupIdRegExp matches strings like -var groupIdRegExp = regexp.MustCompile(`^$`) + if task.threadTs != "" { + task.addMsgOption(slack.MsgOptionTS(task.threadTs)) + } -var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") + if task.isUpdate && task.messageTs != "" { + task.addMsgOption(slack.MsgOptionUpdate(task.messageTs)) + } + + _, _, err := n.client.PostMessageContext(ctx, task.channel, task.opts...) + if err != nil { + return err + } + + return nil +} func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { var tags []string @@ -172,7 +230,7 @@ func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { note := n.liveNotePool.Update(obj) - ctx := context.Background() + ctx := n.ctx channel := note.ChannelID if channel == "" { @@ -191,15 +249,13 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er var firstTimeHandles []string var commentHandles []string - var mentions []*livenote.OptionOneTimeMention - var comments []*livenote.OptionComment + var comments []string for _, opt := range opts { switch val := opt.(type) { case *livenote.OptionOneTimeMention: - mentions = append(mentions, val) firstTimeHandles = append(firstTimeHandles, val.Users...) case *livenote.OptionComment: - comments = append(comments, val) + comments = append(comments, val.Text) commentHandles = append(commentHandles, val.Users...) } } @@ -221,12 +277,27 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er // If compare is enabled, we need to attach the comments // UpdateMessageContext returns channel, timestamp, text, err - _, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) + respCh, respTs, _, err := n.client.UpdateMessageContext(ctx, note.ChannelID, note.MessageID, slackOpts...) if err != nil { return err } - _ = commentTags + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } } else { respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) @@ -240,11 +311,32 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetChannelID(respCh) note.SetMessageID(respTs) - _, _, err = n.client.PostMessageContext(ctx, channel, - slack.MsgOptionText(joinTags(firstTimeTags), false), - slack.MsgOptionTS(respTs)) - if err != nil { - return err + if len(firstTimeTags) > 0 { + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(joinTags(firstTimeTags), false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } + + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) } } @@ -331,13 +423,20 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } + n.queueTask(context.Background(), notifyTask{ + channel: channel, + opts: opts, + }, 100*time.Millisecond) +} + +func (n *Notifier) queueTask(ctx context.Context, task notifyTask, timeout time.Duration) { select { - case n.taskC <- notifyTask{ - Channel: channel, - Opts: opts, - }: - case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): return + case <-time.After(timeout): + log.Warnf("slack notify task is dropped due to timeout %s", timeout) + return + case n.taskC <- task: } } @@ -360,3 +459,7 @@ func toSubteamHandle(id string) string { func joinTags(tags []string) string { return strings.Join(tags, " ") } + +func joinComments(comments []string) string { + return strings.Join(comments, "\n") +} diff --git a/pkg/strategy/example/livenote/strategy.go b/pkg/strategy/example/livenote/strategy.go index ab4489e56..341c906e2 100644 --- a/pkg/strategy/example/livenote/strategy.go +++ b/pkg/strategy/example/livenote/strategy.go @@ -7,6 +7,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" ) @@ -32,7 +33,10 @@ func init() { // Strategy is a struct that contains the settings of your strategy. // These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically. type Strategy struct { - Symbol string `json:"symbol"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + + UserID string `json:"userID"` } func (s *Strategy) ID() string { @@ -43,15 +47,26 @@ func (s *Strategy) InstanceID() string { return ID + "-" + s.Symbol } +func (s *Strategy) Defaults() error { + if s.Interval == "" { + s.Interval = types.Interval1m + } + + return nil +} + func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { - session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) + session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } // This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { callback := func(k types.KLine) { log.Info(k) - bbgo.PostLiveNote(&k) + bbgo.PostLiveNote(&k, + livenote.OneTimeMention(s.UserID), + livenote.Comment("please check the deposit", s.UserID), + livenote.CompareObject(true)) } // register our kline event handler