diff --git a/config/example/livenote.yaml b/config/example/livenote.yaml index 662aab4ac..b83142aba 100644 --- a/config/example/livenote.yaml +++ b/config/example/livenote.yaml @@ -14,3 +14,4 @@ exchangeStrategies: livenote: symbol: BTCUSDT interval: 5m + userID: U12345678 diff --git a/pkg/dynamic/compare_test.go b/pkg/dynamic/compare_test.go index 36200f545..a4bf3133d 100644 --- a/pkg/dynamic/compare_test.go +++ b/pkg/dynamic/compare_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/c9s/bbgo/pkg/fixedpoint" + . "github.com/c9s/bbgo/pkg/testing/testhelper" "github.com/c9s/bbgo/pkg/types" ) @@ -94,6 +95,52 @@ func Test_Compare(t *testing.T) { }, }, }, + { + name: "kline", + wantErr: assert.NoError, + a: types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60100), + }, + b: types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60200), + }, + want: []Diff{ + { + Field: "Close", + Before: "60200", + After: "60100", + }, + }, + }, + { + name: "kline ptr", + wantErr: assert.NoError, + a: &types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60100), + }, + b: &types.KLine{ + Open: Number(60000), + High: Number(61000), + Low: Number(59500), + Close: Number(60200), + }, + want: []Diff{ + { + Field: "Close", + Before: "60200", + After: "60100", + }, + }, + }, { name: "deposit and order", wantErr: assert.NoError, diff --git a/pkg/livenote/livenote.go b/pkg/livenote/livenote.go index 3cac0c3cc..b9f6a1879 100644 --- a/pkg/livenote/livenote.go +++ b/pkg/livenote/livenote.go @@ -1,6 +1,9 @@ package livenote -import "sync" +import ( + "sync" + "time" +) type Object interface { ObjectID() string @@ -13,9 +16,15 @@ type LiveNote struct { ChannelID string `json:"channelId"` + Pin bool `json:"pin"` + + TimeToLive time.Duration `json:"timeToLive"` + Object Object cachedObjID string + + postedTime time.Time } func NewLiveNote(object Object) *LiveNote { @@ -33,6 +42,14 @@ func (n *LiveNote) ObjectID() string { return n.cachedObjID } +func (n *LiveNote) SetTimeToLive(du time.Duration) { + n.TimeToLive = du +} + +func (n *LiveNote) SetPostedTime(tt time.Time) { + n.postedTime = tt +} + func (n *LiveNote) SetObject(object Object) { n.Object = object } @@ -45,6 +62,19 @@ func (n *LiveNote) SetChannelID(channelID string) { n.ChannelID = channelID } +func (n *LiveNote) SetPin(enabled bool) { + n.Pin = enabled +} + +func (n *LiveNote) IsExpired(now time.Time) bool { + if n.postedTime.IsZero() || n.TimeToLive == 0 { + return false + } + + expiryTime := n.postedTime.Add(n.TimeToLive) + return now.After(expiryTime) +} + type Pool struct { notes map[string]*LiveNote mu sync.Mutex @@ -56,6 +86,25 @@ func NewPool(size int64) *Pool { } } +func (p *Pool) Get(obj Object) *LiveNote { + objID := obj.ObjectID() + + p.mu.Lock() + defer p.mu.Unlock() + + for _, note := range p.notes { + if note.ObjectID() == objID { + if note.IsExpired(time.Now()) { + return nil + } + + return note + } + } + + return nil +} + func (p *Pool) Update(obj Object) *LiveNote { objID := obj.ObjectID() @@ -64,6 +113,10 @@ func (p *Pool) Update(obj Object) *LiveNote { for _, note := range p.notes { if note.ObjectID() == objID { + if note.IsExpired(time.Now()) { + break + } + // update the object inside the note note.SetObject(obj) return note diff --git a/pkg/livenote/options.go b/pkg/livenote/options.go index 216dddec0..ce5713d2c 100644 --- a/pkg/livenote/options.go +++ b/pkg/livenote/options.go @@ -1,12 +1,61 @@ package livenote +import "time" + type Option interface{} -type Mention struct { - User string +type OptionChannel struct { + Channel string } -type Comment struct { +func Channel(channel string) *OptionChannel { + return &OptionChannel{ + Channel: channel, + } +} + +type OptionCompare struct { + Value bool +} + +func CompareObject(value bool) *OptionCompare { + return &OptionCompare{Value: value} +} + +type OptionOneTimeMention struct { + Users []string +} + +func OneTimeMention(users ...string) *OptionOneTimeMention { + return &OptionOneTimeMention{Users: users} +} + +type OptionComment struct { Text string Users []string } + +func Comment(text string, users ...string) *OptionComment { + return &OptionComment{ + Text: text, + Users: users, + } +} + +type OptionTimeToLive struct { + Duration time.Duration +} + +func TimeToLive(du time.Duration) *OptionTimeToLive { + return &OptionTimeToLive{Duration: du} +} + +type OptionPin struct { + Value bool +} + +func Pin(value bool) *OptionPin { + return &OptionPin{ + Value: value, + } +} diff --git a/pkg/notifier/slacknotifier/slack.go b/pkg/notifier/slacknotifier/slack.go index 238a6bca7..ba5f9d06c 100644 --- a/pkg/notifier/slacknotifier/slack.go +++ b/pkg/notifier/slacknotifier/slack.go @@ -3,11 +3,15 @@ package slacknotifier import ( "bytes" "context" + "errors" "fmt" + "regexp" + "strings" "time" "golang.org/x/time/rate" + "github.com/c9s/bbgo/pkg/dynamic" "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" @@ -15,69 +19,279 @@ import ( "github.com/slack-go/slack" ) -var limiter = rate.NewLimiter(rate.Every(1*time.Second), 3) +var limiter = rate.NewLimiter(rate.Every(600*time.Millisecond), 3) + +// userIdRegExp matches strings like <@U012AB3CD> +var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`) + +// groupIdRegExp matches strings like +var groupIdRegExp = regexp.MustCompile(`^$`) + +var emailRegExp = regexp.MustCompile("`^(?P[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm") type notifyTask struct { - Channel string - Opts []slack.MsgOption + channel string + + isUpdate bool + + // messageTs is the message timestamp, this is required when isUpdate is true + messageTs string + + // threadTs is the thread timestamp + threadTs string + + opts []slack.MsgOption } +func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) { + t.opts = append(t.opts, opts...) +} + +// Notifier is a slack notifier +// +// To use this notifier, you need to setup the slack app permissions: +// - channels:read +// - chat:write +// +// When using "pins", you will need permission: "pins:write" type Notifier struct { + ctx context.Context + cancel context.CancelFunc + client *slack.Client channel string taskC chan notifyTask liveNotePool *livenote.Pool + + userIdCache map[string]*slack.User + groupIdCache map[string]slack.UserGroup } type NotifyOption func(notifier *Notifier) +const defaultQueueSize = 500 + +func OptionContext(baseCtx context.Context) NotifyOption { + return func(notifier *Notifier) { + ctx, cancel := context.WithCancel(baseCtx) + notifier.ctx = ctx + notifier.cancel = cancel + } +} + +func OptionQueueSize(size int) NotifyOption { + return NotifyOption(func(notifier *Notifier) { + notifier.taskC = make(chan notifyTask, size) + }) +} + func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier { notifier := &Notifier{ + ctx: context.Background(), + cancel: func() {}, channel: channel, client: client, - taskC: make(chan notifyTask, 100), + taskC: make(chan notifyTask, defaultQueueSize), liveNotePool: livenote.NewPool(100), + userIdCache: make(map[string]*slack.User, 30), + groupIdCache: make(map[string]slack.UserGroup, 50), } for _, o := range options { o(notifier) } - go notifier.worker() + userGroups, err := client.GetUserGroupsContext(notifier.ctx) + if err != nil { + log.WithError(err).Error("failed to get the slack user groups") + } else { + for _, group := range userGroups { + notifier.groupIdCache[group.Name] = group + } + + // user groups: map[ + // Development Team:{ + // ID:S08004CQYQK + // TeamID:T036FASR3 + // IsUserGroup:true + // Name:Development Team + // Description:dev + // Handle:dev + // IsExternal:false + // DateCreate:"Fri Nov 8" + // DateUpdate:"Fri Nov 8" DateDelete:"Thu Jan 1" + // AutoType: CreatedBy:U036FASR5 UpdatedBy:U12345678 DeletedBy: + // Prefs:{Channels:[] Groups:[]} UserCount:1 Users:[]}] + log.Debugf("slack user groups: %+v", notifier.groupIdCache) + } + + go notifier.worker(notifier.ctx) return notifier } -func (n *Notifier) worker() { - ctx := context.Background() +func (n *Notifier) worker(ctx context.Context) { + defer n.cancel() + for { select { case <-ctx.Done(): return case task := <-n.taskC: - // ignore the wait error - _ = limiter.Wait(ctx) - - _, _, err := n.client.PostMessageContext(ctx, task.Channel, task.Opts...) - if err != nil { + if err := n.executeTask(ctx, task); err != nil { log.WithError(err). - WithField("channel", task.Channel). + WithField("channel", task.channel). Errorf("slack api error: %s", err.Error()) } } } } -func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { - note := n.liveNotePool.Update(obj) - ctx := context.Background() +func (n *Notifier) executeTask(ctx context.Context, task notifyTask) error { + // ignore the wait error + if err := limiter.Wait(ctx); err != nil { + log.WithError(err).Warnf("slack rate limiter error") + } - channel := note.ChannelID - if channel == "" { - channel = n.channel + if task.threadTs != "" { + task.addMsgOption(slack.MsgOptionTS(task.threadTs)) + } + + if task.isUpdate && task.messageTs != "" { + task.addMsgOption(slack.MsgOptionUpdate(task.messageTs)) + } + + _, _, err := n.client.PostMessageContext(ctx, task.channel, task.opts...) + if err != nil { + return err + } + + return nil +} + +func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) { + var tags []string + for _, handle := range handles { + tag, err := n.translateHandle(ctx, handle) + if err != nil { + return nil, err + } + + tags = append(tags, tag) + } + + return tags, nil +} + +func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, error) { + if handle == "" { + return "", errors.New("handle is empty") + } + + // trim the prefix '@' if we get a string like '@username' + if handle[0] == '@' { + handle = handle[1:] + } + + // if the given handle is already in slack user id format, we don't need to look up + if userIdRegExp.MatchString(handle) || groupIdRegExp.MatchString(handle) { + return handle, nil + } + + if user, exists := n.userIdCache[handle]; exists { + return toUserHandle(user.ID), nil + } + + if group, exists := n.groupIdCache[handle]; exists { + return toSubteamHandle(group.ID), nil + } + + var slackUser *slack.User + var err error + if emailRegExp.MatchString(handle) { + slackUser, err = n.client.GetUserByEmailContext(ctx, handle) + if err != nil { + return "", fmt.Errorf("user %s not found: %v", handle, err) + } + } else { + slackUser, err = n.client.GetUserInfoContext(ctx, handle) + if err != nil { + return "", fmt.Errorf("user handle %s not found: %v", handle, err) + } + } + + if slackUser != nil { + n.userIdCache[handle] = slackUser + return toUserHandle(slackUser.ID), nil + } + + return "", fmt.Errorf("handle %s not found", handle) +} + +func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { + var slackOpts []slack.MsgOption + + var firstTimeHandles []string + var commentHandles []string + var comments []string + var shouldCompare bool + var shouldPin bool + var ttl time.Duration = 0 + + // load the default channel + channel := n.channel + + for _, opt := range opts { + switch val := opt.(type) { + case *livenote.OptionOneTimeMention: + firstTimeHandles = append(firstTimeHandles, val.Users...) + case *livenote.OptionComment: + comments = append(comments, val.Text) + commentHandles = append(commentHandles, val.Users...) + case *livenote.OptionCompare: + shouldCompare = val.Value + case *livenote.OptionPin: + shouldPin = val.Value + case *livenote.OptionTimeToLive: + ttl = val.Duration + case *livenote.OptionChannel: + if val.Channel != "" { + channel = val.Channel + } + } + } + + var ctx = n.ctx + var curObj, prevObj any + if shouldCompare { + if prevNote := n.liveNotePool.Get(obj); prevNote != nil { + prevObj = prevNote.Object + } + } + + note := n.liveNotePool.Update(obj) + curObj = note.Object + + if ttl > 0 { + note.SetTimeToLive(ttl) + } + + if shouldCompare && prevObj != nil { + diffs, err := dynamic.Compare(curObj, prevObj) + if err != nil { + log.WithError(err).Warnf("unable to compare objects: %T and %T", curObj, prevObj) + } else { + if comment := diffsToComment(curObj, diffs); len(comment) > 0 { + comments = append(comments, comment) + } + } + } + + if note.ChannelID != "" { + channel = note.ChannelID } var attachment slack.Attachment @@ -87,30 +301,44 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er return fmt.Errorf("livenote object does not support types.SlackAttachmentCreator interface") } - var slackOpts []slack.MsgOption slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment)) - var userIds []string - var mentions []*livenote.Mention - var comments []*livenote.Comment - for _, opt := range opts { - switch val := opt.(type) { - case *livenote.Mention: - mentions = append(mentions, val) - userIds = append(userIds, val.User) - case *livenote.Comment: - comments = append(comments, val) - userIds = append(userIds, val.Users...) - } + firstTimeTags, err := n.translateHandles(n.ctx, firstTimeHandles) + if err != nil { + return err + } + + commentTags, err := n.translateHandles(n.ctx, commentHandles) + if err != nil { + return err } if note.MessageID != "" { + // If compare is enabled, we need to attach the comments + // UpdateMessageContext returns channel, timestamp, text, err - _, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) + respCh, respTs, _, err := n.client.UpdateMessageContext(ctx, note.ChannelID, note.MessageID, slackOpts...) if err != nil { return err } + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(context.Background(), notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } + } else { respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) if err != nil { @@ -122,6 +350,46 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er note.SetChannelID(respCh) note.SetMessageID(respTs) + note.SetPostedTime(time.Now()) + + if shouldPin { + note.SetPin(true) + + if err := n.client.AddPinContext(ctx, respCh, slack.ItemRef{ + Channel: respCh, + Timestamp: respTs, + }); err != nil { + log.WithError(err).Warnf("unable to pin the slack message: %s", respTs) + } + } + + if len(firstTimeTags) > 0 { + n.queueTask(n.ctx, notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(joinTags(firstTimeTags), false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } + + if len(comments) > 0 { + var text string + if len(commentTags) > 0 { + text = joinTags(commentTags) + " " + } + + text += joinComments(comments) + n.queueTask(n.ctx, notifyTask{ + channel: respCh, + threadTs: respTs, + opts: []slack.MsgOption{ + slack.MsgOptionText(text, false), + slack.MsgOptionTS(respTs), + }, + }, 100*time.Millisecond) + } } return nil @@ -207,13 +475,20 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{} } + n.queueTask(context.Background(), notifyTask{ + channel: channel, + opts: opts, + }, 100*time.Millisecond) +} + +func (n *Notifier) queueTask(ctx context.Context, task notifyTask, timeout time.Duration) { select { - case n.taskC <- notifyTask{ - Channel: channel, - Opts: opts, - }: - case <-time.After(50 * time.Millisecond): + case <-ctx.Done(): return + case <-time.After(timeout): + log.Warnf("slack notify task is dropped due to timeout %s", timeout) + return + case n.taskC <- task: } } @@ -224,3 +499,33 @@ func (n *Notifier) SendPhoto(buffer *bytes.Buffer) { func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { // TODO } + +func toUserHandle(id string) string { + return fmt.Sprintf("<@%s>", id) +} + +func toSubteamHandle(id string) string { + return fmt.Sprintf("", id) +} + +func joinTags(tags []string) string { + return strings.Join(tags, " ") +} + +func joinComments(comments []string) string { + return strings.Join(comments, "\n") +} + +func diffsToComment(obj any, diffs []dynamic.Diff) (text string) { + if len(diffs) == 0 { + return text + } + + text += fmt.Sprintf("%T updated\n", obj) + + for _, diff := range diffs { + text += fmt.Sprintf("- %s: `%s` transited to `%s`\n", diff.Field, diff.Before, diff.After) + } + + return text +} diff --git a/pkg/strategy/example/livenote/strategy.go b/pkg/strategy/example/livenote/strategy.go index ab4489e56..4e0db4dc3 100644 --- a/pkg/strategy/example/livenote/strategy.go +++ b/pkg/strategy/example/livenote/strategy.go @@ -2,11 +2,13 @@ package livenote import ( "context" + "time" "github.com/sirupsen/logrus" "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" ) @@ -32,7 +34,10 @@ func init() { // Strategy is a struct that contains the settings of your strategy. // These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically. type Strategy struct { - Symbol string `json:"symbol"` + Symbol string `json:"symbol"` + Interval types.Interval `json:"interval"` + + UserID string `json:"userID"` } func (s *Strategy) ID() string { @@ -43,15 +48,29 @@ func (s *Strategy) InstanceID() string { return ID + "-" + s.Symbol } +func (s *Strategy) Defaults() error { + if s.Interval == "" { + s.Interval = types.Interval1m + } + + return nil +} + func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { - session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) + session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) } // This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { callback := func(k types.KLine) { log.Info(k) - bbgo.PostLiveNote(&k) + bbgo.PostLiveNote(&k, + livenote.OneTimeMention(s.UserID), + livenote.Comment("please check the deposit"), + livenote.CompareObject(true), + livenote.TimeToLive(time.Minute), + // livenote.Pin(true), + ) } // register our kline event handler diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index c3b31c78f..af2b3a8fb 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -655,8 +655,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.Symbol, time.Since(bookLastUpdateTime)) - s.sourceBook.Reset() s.sourceSession.MarketDataStream.Reconnect() + s.sourceBook.Reset() return err } @@ -665,8 +665,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error { s.Symbol, time.Since(bookLastUpdateTime)) - s.sourceBook.Reset() s.sourceSession.MarketDataStream.Reconnect() + s.sourceBook.Reset() return err }