Merge pull request #1807 from c9s/c9s/xalign/improvements
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

FEATURE: [livenote] object comparison, comments and mentions
This commit is contained in:
c9s 2024-11-11 17:05:16 +08:00 committed by GitHub
commit 2eca15bb61
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 521 additions and 47 deletions

View File

@ -14,3 +14,4 @@ exchangeStrategies:
livenote: livenote:
symbol: BTCUSDT symbol: BTCUSDT
interval: 5m interval: 5m
userID: U12345678

View File

@ -8,6 +8,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -94,6 +95,52 @@ func Test_Compare(t *testing.T) {
}, },
}, },
}, },
{
name: "kline",
wantErr: assert.NoError,
a: types.KLine{
Open: Number(60000),
High: Number(61000),
Low: Number(59500),
Close: Number(60100),
},
b: types.KLine{
Open: Number(60000),
High: Number(61000),
Low: Number(59500),
Close: Number(60200),
},
want: []Diff{
{
Field: "Close",
Before: "60200",
After: "60100",
},
},
},
{
name: "kline ptr",
wantErr: assert.NoError,
a: &types.KLine{
Open: Number(60000),
High: Number(61000),
Low: Number(59500),
Close: Number(60100),
},
b: &types.KLine{
Open: Number(60000),
High: Number(61000),
Low: Number(59500),
Close: Number(60200),
},
want: []Diff{
{
Field: "Close",
Before: "60200",
After: "60100",
},
},
},
{ {
name: "deposit and order", name: "deposit and order",
wantErr: assert.NoError, wantErr: assert.NoError,

View File

@ -1,6 +1,9 @@
package livenote package livenote
import "sync" import (
"sync"
"time"
)
type Object interface { type Object interface {
ObjectID() string ObjectID() string
@ -13,9 +16,15 @@ type LiveNote struct {
ChannelID string `json:"channelId"` ChannelID string `json:"channelId"`
Pin bool `json:"pin"`
TimeToLive time.Duration `json:"timeToLive"`
Object Object Object Object
cachedObjID string cachedObjID string
postedTime time.Time
} }
func NewLiveNote(object Object) *LiveNote { func NewLiveNote(object Object) *LiveNote {
@ -33,6 +42,14 @@ func (n *LiveNote) ObjectID() string {
return n.cachedObjID return n.cachedObjID
} }
func (n *LiveNote) SetTimeToLive(du time.Duration) {
n.TimeToLive = du
}
func (n *LiveNote) SetPostedTime(tt time.Time) {
n.postedTime = tt
}
func (n *LiveNote) SetObject(object Object) { func (n *LiveNote) SetObject(object Object) {
n.Object = object n.Object = object
} }
@ -45,6 +62,19 @@ func (n *LiveNote) SetChannelID(channelID string) {
n.ChannelID = channelID n.ChannelID = channelID
} }
func (n *LiveNote) SetPin(enabled bool) {
n.Pin = enabled
}
func (n *LiveNote) IsExpired(now time.Time) bool {
if n.postedTime.IsZero() || n.TimeToLive == 0 {
return false
}
expiryTime := n.postedTime.Add(n.TimeToLive)
return now.After(expiryTime)
}
type Pool struct { type Pool struct {
notes map[string]*LiveNote notes map[string]*LiveNote
mu sync.Mutex mu sync.Mutex
@ -56,6 +86,25 @@ func NewPool(size int64) *Pool {
} }
} }
func (p *Pool) Get(obj Object) *LiveNote {
objID := obj.ObjectID()
p.mu.Lock()
defer p.mu.Unlock()
for _, note := range p.notes {
if note.ObjectID() == objID {
if note.IsExpired(time.Now()) {
return nil
}
return note
}
}
return nil
}
func (p *Pool) Update(obj Object) *LiveNote { func (p *Pool) Update(obj Object) *LiveNote {
objID := obj.ObjectID() objID := obj.ObjectID()
@ -64,6 +113,10 @@ func (p *Pool) Update(obj Object) *LiveNote {
for _, note := range p.notes { for _, note := range p.notes {
if note.ObjectID() == objID { if note.ObjectID() == objID {
if note.IsExpired(time.Now()) {
break
}
// update the object inside the note // update the object inside the note
note.SetObject(obj) note.SetObject(obj)
return note return note

View File

@ -1,12 +1,61 @@
package livenote package livenote
import "time"
type Option interface{} type Option interface{}
type Mention struct { type OptionChannel struct {
User string Channel string
} }
type Comment struct { func Channel(channel string) *OptionChannel {
return &OptionChannel{
Channel: channel,
}
}
type OptionCompare struct {
Value bool
}
func CompareObject(value bool) *OptionCompare {
return &OptionCompare{Value: value}
}
type OptionOneTimeMention struct {
Users []string
}
func OneTimeMention(users ...string) *OptionOneTimeMention {
return &OptionOneTimeMention{Users: users}
}
type OptionComment struct {
Text string Text string
Users []string Users []string
} }
func Comment(text string, users ...string) *OptionComment {
return &OptionComment{
Text: text,
Users: users,
}
}
type OptionTimeToLive struct {
Duration time.Duration
}
func TimeToLive(du time.Duration) *OptionTimeToLive {
return &OptionTimeToLive{Duration: du}
}
type OptionPin struct {
Value bool
}
func Pin(value bool) *OptionPin {
return &OptionPin{
Value: value,
}
}

View File

@ -3,11 +3,15 @@ package slacknotifier
import ( import (
"bytes" "bytes"
"context" "context"
"errors"
"fmt" "fmt"
"regexp"
"strings"
"time" "time"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/dynamic"
"github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/livenote"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -15,69 +19,279 @@ import (
"github.com/slack-go/slack" "github.com/slack-go/slack"
) )
var limiter = rate.NewLimiter(rate.Every(1*time.Second), 3) var limiter = rate.NewLimiter(rate.Every(600*time.Millisecond), 3)
// userIdRegExp matches strings like <@U012AB3CD>
var userIdRegExp = regexp.MustCompile(`^<@(.+?)>$`)
// groupIdRegExp matches strings like <!subteam^ID>
var groupIdRegExp = regexp.MustCompile(`^<!subteam\^(.+?)>$`)
var emailRegExp = regexp.MustCompile("`^(?P<name>[a-zA-Z0-9.!#$%&'*+/=?^_ \\x60{|}~-]+)@(?P<domain>[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?(?:\\.[a-zA-Z0-9](?:[a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)*)$`gm")
type notifyTask struct { type notifyTask struct {
Channel string channel string
Opts []slack.MsgOption
isUpdate bool
// messageTs is the message timestamp, this is required when isUpdate is true
messageTs string
// threadTs is the thread timestamp
threadTs string
opts []slack.MsgOption
} }
func (t *notifyTask) addMsgOption(opts ...slack.MsgOption) {
t.opts = append(t.opts, opts...)
}
// Notifier is a slack notifier
//
// To use this notifier, you need to setup the slack app permissions:
// - channels:read
// - chat:write
//
// When using "pins", you will need permission: "pins:write"
type Notifier struct { type Notifier struct {
ctx context.Context
cancel context.CancelFunc
client *slack.Client client *slack.Client
channel string channel string
taskC chan notifyTask taskC chan notifyTask
liveNotePool *livenote.Pool liveNotePool *livenote.Pool
userIdCache map[string]*slack.User
groupIdCache map[string]slack.UserGroup
} }
type NotifyOption func(notifier *Notifier) type NotifyOption func(notifier *Notifier)
const defaultQueueSize = 500
func OptionContext(baseCtx context.Context) NotifyOption {
return func(notifier *Notifier) {
ctx, cancel := context.WithCancel(baseCtx)
notifier.ctx = ctx
notifier.cancel = cancel
}
}
func OptionQueueSize(size int) NotifyOption {
return NotifyOption(func(notifier *Notifier) {
notifier.taskC = make(chan notifyTask, size)
})
}
func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier { func New(client *slack.Client, channel string, options ...NotifyOption) *Notifier {
notifier := &Notifier{ notifier := &Notifier{
ctx: context.Background(),
cancel: func() {},
channel: channel, channel: channel,
client: client, client: client,
taskC: make(chan notifyTask, 100), taskC: make(chan notifyTask, defaultQueueSize),
liveNotePool: livenote.NewPool(100), liveNotePool: livenote.NewPool(100),
userIdCache: make(map[string]*slack.User, 30),
groupIdCache: make(map[string]slack.UserGroup, 50),
} }
for _, o := range options { for _, o := range options {
o(notifier) o(notifier)
} }
go notifier.worker() userGroups, err := client.GetUserGroupsContext(notifier.ctx)
if err != nil {
log.WithError(err).Error("failed to get the slack user groups")
} else {
for _, group := range userGroups {
notifier.groupIdCache[group.Name] = group
}
// user groups: map[
// Development Team:{
// ID:S08004CQYQK
// TeamID:T036FASR3
// IsUserGroup:true
// Name:Development Team
// Description:dev
// Handle:dev
// IsExternal:false
// DateCreate:"Fri Nov 8"
// DateUpdate:"Fri Nov 8" DateDelete:"Thu Jan 1"
// AutoType: CreatedBy:U036FASR5 UpdatedBy:U12345678 DeletedBy:
// Prefs:{Channels:[] Groups:[]} UserCount:1 Users:[]}]
log.Debugf("slack user groups: %+v", notifier.groupIdCache)
}
go notifier.worker(notifier.ctx)
return notifier return notifier
} }
func (n *Notifier) worker() { func (n *Notifier) worker(ctx context.Context) {
ctx := context.Background() defer n.cancel()
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case task := <-n.taskC: case task := <-n.taskC:
// ignore the wait error if err := n.executeTask(ctx, task); err != nil {
_ = limiter.Wait(ctx)
_, _, err := n.client.PostMessageContext(ctx, task.Channel, task.Opts...)
if err != nil {
log.WithError(err). log.WithError(err).
WithField("channel", task.Channel). WithField("channel", task.channel).
Errorf("slack api error: %s", err.Error()) Errorf("slack api error: %s", err.Error())
} }
} }
} }
} }
func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error { func (n *Notifier) executeTask(ctx context.Context, task notifyTask) error {
note := n.liveNotePool.Update(obj) // ignore the wait error
ctx := context.Background() if err := limiter.Wait(ctx); err != nil {
log.WithError(err).Warnf("slack rate limiter error")
}
channel := note.ChannelID if task.threadTs != "" {
if channel == "" { task.addMsgOption(slack.MsgOptionTS(task.threadTs))
channel = n.channel }
if task.isUpdate && task.messageTs != "" {
task.addMsgOption(slack.MsgOptionUpdate(task.messageTs))
}
_, _, err := n.client.PostMessageContext(ctx, task.channel, task.opts...)
if err != nil {
return err
}
return nil
}
func (n *Notifier) translateHandles(ctx context.Context, handles []string) ([]string, error) {
var tags []string
for _, handle := range handles {
tag, err := n.translateHandle(ctx, handle)
if err != nil {
return nil, err
}
tags = append(tags, tag)
}
return tags, nil
}
func (n *Notifier) translateHandle(ctx context.Context, handle string) (string, error) {
if handle == "" {
return "", errors.New("handle is empty")
}
// trim the prefix '@' if we get a string like '@username'
if handle[0] == '@' {
handle = handle[1:]
}
// if the given handle is already in slack user id format, we don't need to look up
if userIdRegExp.MatchString(handle) || groupIdRegExp.MatchString(handle) {
return handle, nil
}
if user, exists := n.userIdCache[handle]; exists {
return toUserHandle(user.ID), nil
}
if group, exists := n.groupIdCache[handle]; exists {
return toSubteamHandle(group.ID), nil
}
var slackUser *slack.User
var err error
if emailRegExp.MatchString(handle) {
slackUser, err = n.client.GetUserByEmailContext(ctx, handle)
if err != nil {
return "", fmt.Errorf("user %s not found: %v", handle, err)
}
} else {
slackUser, err = n.client.GetUserInfoContext(ctx, handle)
if err != nil {
return "", fmt.Errorf("user handle %s not found: %v", handle, err)
}
}
if slackUser != nil {
n.userIdCache[handle] = slackUser
return toUserHandle(slackUser.ID), nil
}
return "", fmt.Errorf("handle %s not found", handle)
}
func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) error {
var slackOpts []slack.MsgOption
var firstTimeHandles []string
var commentHandles []string
var comments []string
var shouldCompare bool
var shouldPin bool
var ttl time.Duration = 0
// load the default channel
channel := n.channel
for _, opt := range opts {
switch val := opt.(type) {
case *livenote.OptionOneTimeMention:
firstTimeHandles = append(firstTimeHandles, val.Users...)
case *livenote.OptionComment:
comments = append(comments, val.Text)
commentHandles = append(commentHandles, val.Users...)
case *livenote.OptionCompare:
shouldCompare = val.Value
case *livenote.OptionPin:
shouldPin = val.Value
case *livenote.OptionTimeToLive:
ttl = val.Duration
case *livenote.OptionChannel:
if val.Channel != "" {
channel = val.Channel
}
}
}
var ctx = n.ctx
var curObj, prevObj any
if shouldCompare {
if prevNote := n.liveNotePool.Get(obj); prevNote != nil {
prevObj = prevNote.Object
}
}
note := n.liveNotePool.Update(obj)
curObj = note.Object
if ttl > 0 {
note.SetTimeToLive(ttl)
}
if shouldCompare && prevObj != nil {
diffs, err := dynamic.Compare(curObj, prevObj)
if err != nil {
log.WithError(err).Warnf("unable to compare objects: %T and %T", curObj, prevObj)
} else {
if comment := diffsToComment(curObj, diffs); len(comment) > 0 {
comments = append(comments, comment)
}
}
}
if note.ChannelID != "" {
channel = note.ChannelID
} }
var attachment slack.Attachment var attachment slack.Attachment
@ -87,30 +301,44 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er
return fmt.Errorf("livenote object does not support types.SlackAttachmentCreator interface") return fmt.Errorf("livenote object does not support types.SlackAttachmentCreator interface")
} }
var slackOpts []slack.MsgOption
slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment)) slackOpts = append(slackOpts, slack.MsgOptionAttachments(attachment))
var userIds []string firstTimeTags, err := n.translateHandles(n.ctx, firstTimeHandles)
var mentions []*livenote.Mention if err != nil {
var comments []*livenote.Comment return err
for _, opt := range opts { }
switch val := opt.(type) {
case *livenote.Mention: commentTags, err := n.translateHandles(n.ctx, commentHandles)
mentions = append(mentions, val) if err != nil {
userIds = append(userIds, val.User) return err
case *livenote.Comment:
comments = append(comments, val)
userIds = append(userIds, val.Users...)
}
} }
if note.MessageID != "" { if note.MessageID != "" {
// If compare is enabled, we need to attach the comments
// UpdateMessageContext returns channel, timestamp, text, err // UpdateMessageContext returns channel, timestamp, text, err
_, _, _, err := n.client.UpdateMessageContext(ctx, channel, note.MessageID, slackOpts...) respCh, respTs, _, err := n.client.UpdateMessageContext(ctx, note.ChannelID, note.MessageID, slackOpts...)
if err != nil { if err != nil {
return err return err
} }
if len(comments) > 0 {
var text string
if len(commentTags) > 0 {
text = joinTags(commentTags) + " "
}
text += joinComments(comments)
n.queueTask(context.Background(), notifyTask{
channel: respCh,
threadTs: respTs,
opts: []slack.MsgOption{
slack.MsgOptionText(text, false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
}
} else { } else {
respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...) respCh, respTs, err := n.client.PostMessageContext(ctx, channel, slackOpts...)
if err != nil { if err != nil {
@ -122,6 +350,46 @@ func (n *Notifier) PostLiveNote(obj livenote.Object, opts ...livenote.Option) er
note.SetChannelID(respCh) note.SetChannelID(respCh)
note.SetMessageID(respTs) note.SetMessageID(respTs)
note.SetPostedTime(time.Now())
if shouldPin {
note.SetPin(true)
if err := n.client.AddPinContext(ctx, respCh, slack.ItemRef{
Channel: respCh,
Timestamp: respTs,
}); err != nil {
log.WithError(err).Warnf("unable to pin the slack message: %s", respTs)
}
}
if len(firstTimeTags) > 0 {
n.queueTask(n.ctx, notifyTask{
channel: respCh,
threadTs: respTs,
opts: []slack.MsgOption{
slack.MsgOptionText(joinTags(firstTimeTags), false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
}
if len(comments) > 0 {
var text string
if len(commentTags) > 0 {
text = joinTags(commentTags) + " "
}
text += joinComments(comments)
n.queueTask(n.ctx, notifyTask{
channel: respCh,
threadTs: respTs,
opts: []slack.MsgOption{
slack.MsgOptionText(text, false),
slack.MsgOptionTS(respTs),
},
}, 100*time.Millisecond)
}
} }
return nil return nil
@ -207,13 +475,20 @@ func (n *Notifier) NotifyTo(channel string, obj interface{}, args ...interface{}
} }
n.queueTask(context.Background(), notifyTask{
channel: channel,
opts: opts,
}, 100*time.Millisecond)
}
func (n *Notifier) queueTask(ctx context.Context, task notifyTask, timeout time.Duration) {
select { select {
case n.taskC <- notifyTask{ case <-ctx.Done():
Channel: channel,
Opts: opts,
}:
case <-time.After(50 * time.Millisecond):
return return
case <-time.After(timeout):
log.Warnf("slack notify task is dropped due to timeout %s", timeout)
return
case n.taskC <- task:
} }
} }
@ -224,3 +499,33 @@ func (n *Notifier) SendPhoto(buffer *bytes.Buffer) {
func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) { func (n *Notifier) SendPhotoTo(channel string, buffer *bytes.Buffer) {
// TODO // TODO
} }
func toUserHandle(id string) string {
return fmt.Sprintf("<@%s>", id)
}
func toSubteamHandle(id string) string {
return fmt.Sprintf("<!subteam^%s>", id)
}
func joinTags(tags []string) string {
return strings.Join(tags, " ")
}
func joinComments(comments []string) string {
return strings.Join(comments, "\n")
}
func diffsToComment(obj any, diffs []dynamic.Diff) (text string) {
if len(diffs) == 0 {
return text
}
text += fmt.Sprintf("%T updated\n", obj)
for _, diff := range diffs {
text += fmt.Sprintf("- %s: `%s` transited to `%s`\n", diff.Field, diff.Before, diff.After)
}
return text
}

View File

@ -2,11 +2,13 @@ package livenote
import ( import (
"context" "context"
"time"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/livenote"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
@ -32,7 +34,10 @@ func init() {
// Strategy is a struct that contains the settings of your strategy. // Strategy is a struct that contains the settings of your strategy.
// These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically. // These settings will be loaded from the BBGO YAML config file "bbgo.yaml" automatically.
type Strategy struct { type Strategy struct {
Symbol string `json:"symbol"` Symbol string `json:"symbol"`
Interval types.Interval `json:"interval"`
UserID string `json:"userID"`
} }
func (s *Strategy) ID() string { func (s *Strategy) ID() string {
@ -43,15 +48,29 @@ func (s *Strategy) InstanceID() string {
return ID + "-" + s.Symbol return ID + "-" + s.Symbol
} }
func (s *Strategy) Defaults() error {
if s.Interval == "" {
s.Interval = types.Interval1m
}
return nil
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
} }
// This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed // This strategy simply spent all available quote currency to buy the symbol whenever kline gets closed
func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
callback := func(k types.KLine) { callback := func(k types.KLine) {
log.Info(k) log.Info(k)
bbgo.PostLiveNote(&k) bbgo.PostLiveNote(&k,
livenote.OneTimeMention(s.UserID),
livenote.Comment("please check the deposit"),
livenote.CompareObject(true),
livenote.TimeToLive(time.Minute),
// livenote.Pin(true),
)
} }
// register our kline event handler // register our kline event handler

View File

@ -655,8 +655,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.Symbol, s.Symbol,
time.Since(bookLastUpdateTime)) time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect() s.sourceSession.MarketDataStream.Reconnect()
s.sourceBook.Reset()
return err return err
} }
@ -665,8 +665,8 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.Symbol, s.Symbol,
time.Since(bookLastUpdateTime)) time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect() s.sourceSession.MarketDataStream.Reconnect()
s.sourceBook.Reset()
return err return err
} }