From a9c51e824d027f8e2770d895f8c4dc255bf5dadd Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 17:16:42 +0800 Subject: [PATCH 1/6] bbgo: load slack notifier queue size from config --- pkg/bbgo/config.go | 1 + pkg/bbgo/environment.go | 7 ++++++- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/pkg/bbgo/config.go b/pkg/bbgo/config.go index 697a733e2..35aad9822 100644 --- a/pkg/bbgo/config.go +++ b/pkg/bbgo/config.go @@ -65,6 +65,7 @@ func (m *ExchangeStrategyMount) Map() (map[string]interface{}, error) { type SlackNotification struct { DefaultChannel string `json:"defaultChannel,omitempty" yaml:"defaultChannel,omitempty"` ErrorChannel string `json:"errorChannel,omitempty" yaml:"errorChannel,omitempty"` + QueueSize int `json:"queueSize,omitempty" yaml:"queueSize,omitempty"` } type SlackNotificationRouting struct { diff --git a/pkg/bbgo/environment.go b/pkg/bbgo/environment.go index fc8d245d2..b77322c15 100644 --- a/pkg/bbgo/environment.go +++ b/pkg/bbgo/environment.go @@ -862,7 +862,12 @@ func (environ *Environment) setupSlack(userConfig *Config, slackToken string, pe var client = slack.New(slackToken, slackOpts...) - var notifier = slacknotifier.New(client, conf.DefaultChannel) + var notifierOpts []slacknotifier.NotifyOption + if conf.QueueSize > 0 { + notifierOpts = append(notifierOpts, slacknotifier.OptionQueueSize(conf.QueueSize)) + } + + var notifier = slacknotifier.New(client, conf.DefaultChannel, notifierOpts...) Notification.AddNotifier(notifier) // allocate a store, so that we can save the chatID for the owner From 85cb99802fbef1bdbc4fe167061bbb5346f19f01 Mon Sep 17 00:00:00 2001 From: c9s Date: Mon, 11 Nov 2024 17:41:29 +0800 Subject: [PATCH 2/6] binance: store raw deposit status --- pkg/exchange/binance/exchange.go | 7 ++++++- pkg/types/deposit.go | 2 ++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 132ea5b3f..9fb017dde 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -622,12 +622,16 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, } for _, d := range records { - // 0(0:pending,6: credited but cannot withdraw, 1:success) + // 0(0:pending,6: credited but cannot withdraw, 7=Wrong Deposit,8=Waiting User confirm, 1:success) // set the default status status := types.DepositStatus(fmt.Sprintf("code: %d", d.Status)) // https://www.binance.com/en/support/faq/115003736451 switch d.Status { + + case binanceapi.DepositStatusWrong: + status = types.DepositRejected + case binanceapi.DepositStatusPending: status = types.DepositPending @@ -647,6 +651,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, AddressTag: d.AddressTag, TransactionID: d.TxId, Status: status, + RawStatus: strconv.Itoa(int(d.Status)), UnlockConfirm: d.UnlockConfirm, Confirmation: d.ConfirmTimes, }) diff --git a/pkg/types/deposit.go b/pkg/types/deposit.go index e137fd81a..bb2fa9de3 100644 --- a/pkg/types/deposit.go +++ b/pkg/types/deposit.go @@ -39,6 +39,8 @@ type Deposit struct { TransactionID string `json:"transactionID" db:"txn_id"` Status DepositStatus `json:"status"` + RawStatus string `json:"rawStatus"` + // Required confirm for unlock balance UnlockConfirm int `json:"unlockConfirm"` From 9e2cb4bd7f4bee42ad4cf88c32478b7f981b7b87 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Nov 2024 15:48:17 +0800 Subject: [PATCH 3/6] integrate livenote and slack alert into deposit2transfer --- pkg/exchange/binance/exchange.go | 1 + pkg/strategy/deposit2transfer/strategy.go | 53 +++++++++++++++++++---- pkg/types/deposit.go | 36 +++++++++++++-- 3 files changed, 78 insertions(+), 12 deletions(-) diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index 9fb017dde..320e62830 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -654,6 +654,7 @@ func (e *Exchange) QueryDepositHistory(ctx context.Context, asset string, since, RawStatus: strconv.Itoa(int(d.Status)), UnlockConfirm: d.UnlockConfirm, Confirmation: d.ConfirmTimes, + Network: d.Network, }) } diff --git a/pkg/strategy/deposit2transfer/strategy.go b/pkg/strategy/deposit2transfer/strategy.go index 5b779020b..ae01da938 100644 --- a/pkg/strategy/deposit2transfer/strategy.go +++ b/pkg/strategy/deposit2transfer/strategy.go @@ -14,6 +14,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/livenote" "github.com/c9s/bbgo/pkg/types" ) @@ -40,6 +41,10 @@ func init() { bbgo.RegisterStrategy(ID, &Strategy{}) } +type SlackAlert struct { + Mentions []string `json:"mentions"` +} + type Strategy struct { Environment *bbgo.Environment @@ -48,6 +53,8 @@ type Strategy struct { Interval types.Duration `json:"interval"` TransferDelay types.Duration `json:"transferDelay"` + SlackAlert *SlackAlert `json:"slackAlert"` + marginTransferService marginTransferService depositHistoryService types.ExchangeTransferService @@ -137,7 +144,7 @@ func (s *Strategy) tickWatcher(ctx context.Context, interval time.Duration) { } func (s *Strategy) checkDeposits(ctx context.Context) { - accountLimiter := rate.NewLimiter(rate.Every(3*time.Second), 1) + accountLimiter := rate.NewLimiter(rate.Every(5*time.Second), 1) for _, asset := range s.Assets { logger := s.logger.WithField("asset", asset) @@ -204,16 +211,39 @@ func (s *Strategy) checkDeposits(ctx context.Context) { d.Amount.String(), d.Asset, amount.String(), d.Asset) + if s.SlackAlert != nil { + bbgo.PostLiveNote(&d, + livenote.Comment(fmt.Sprintf("Transferring deposit asset %s %s into the margin account", amount.String(), d.Asset)), + ) + } + err2 := retry.GeneralBackoff(ctx, func() error { return s.marginTransferService.TransferMarginAccountAsset(ctx, d.Asset, amount, types.TransferIn) }) if err2 != nil { logger.WithError(err2).Errorf("unable to transfer deposit asset into the margin account") + + if s.SlackAlert != nil { + bbgo.PostLiveNote(&d, + livenote.Comment(fmt.Sprintf("Margin account transfer error: %+v", err2)), + ) + } } } } } +func (s *Strategy) addWatchingDeposit(deposit types.Deposit) { + s.watchingDeposits[deposit.TransactionID] = deposit + + if s.SlackAlert != nil { + bbgo.PostLiveNote(&deposit, + livenote.CompareObject(true), + livenote.OneTimeMention(s.SlackAlert.Mentions...), + ) + } +} + func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duration time.Duration) ([]types.Deposit, error) { logger := s.logger.WithField("asset", asset) logger.Debugf("scanning %s deposit history...", asset) @@ -239,6 +269,7 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio s.mu.Lock() defer s.mu.Unlock() + // update the watching deposits for _, deposit := range deposits { logger.Debugf("checking deposit: %+v", deposit) @@ -246,18 +277,22 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio continue } + // if the deposit record is already in the watch list, update it if _, ok := s.watchingDeposits[deposit.TransactionID]; ok { - // if the deposit record is in the watch list, update it - s.watchingDeposits[deposit.TransactionID] = deposit + s.addWatchingDeposit(deposit) } else { + // if the deposit record is not in the watch list, we need to check the status + // here the deposit is outside the watching list switch deposit.Status { case types.DepositSuccess: + // if the deposit is in success status, we need to check if it's newer than the latest deposit time + // this usually happens when the deposit is credited to the account very quickly if depositTime, ok := s.lastAssetDepositTimes[asset]; ok { // if it's newer than the latest deposit time, then we just add it the monitoring list if deposit.Time.After(depositTime) { logger.Infof("adding new success deposit: %s", deposit.TransactionID) - s.watchingDeposits[deposit.TransactionID] = deposit + s.addWatchingDeposit(deposit) } } else { // ignore all initial deposits that are already in success status @@ -266,7 +301,7 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio case types.DepositCredited, types.DepositPending: logger.Infof("adding pending deposit: %s", deposit.TransactionID) - s.watchingDeposits[deposit.TransactionID] = deposit + s.addWatchingDeposit(deposit) } } } @@ -281,10 +316,12 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio } var succeededDeposits []types.Deposit - for _, deposit := range s.watchingDeposits { - if deposit.Status == types.DepositSuccess { - logger.Infof("found pending -> success deposit: %+v", deposit) + // find the succeeded deposits + for _, deposit := range s.watchingDeposits { + switch deposit.Status { + case types.DepositSuccess: + logger.Infof("found pending -> success deposit: %+v", deposit) current, required := deposit.GetCurrentConfirmation() if required > 0 && deposit.UnlockConfirm > 0 && current < deposit.UnlockConfirm { logger.Infof("deposit %s unlock confirm %d is not reached, current: %d, required: %d, skip this round", deposit.TransactionID, deposit.UnlockConfirm, current, required) diff --git a/pkg/types/deposit.go b/pkg/types/deposit.go index bb2fa9de3..dcd58e9c6 100644 --- a/pkg/types/deposit.go +++ b/pkg/types/deposit.go @@ -46,6 +46,8 @@ type Deposit struct { // Confirmation format = "current/required", for example: "7/16" Confirmation string `json:"confirmation"` + + Network string `json:"network,omitempty"` } func (d Deposit) GetCurrentConfirmation() (current int, required int) { @@ -109,9 +111,35 @@ func (d *Deposit) SlackAttachment() slack.Attachment { }) } + if len(d.Confirmation) > 0 { + text := d.Confirmation + if d.UnlockConfirm > 0 { + text = fmt.Sprintf("%s (unlock %d)", d.Confirmation, d.UnlockConfirm) + } + fields = append(fields, slack.AttachmentField{ + Title: "Confirmation", + Value: text, + Short: false, + }) + } + + if len(d.Network) > 0 { + fields = append(fields, slack.AttachmentField{ + Title: "Network", + Value: d.Network, + Short: false, + }) + } + + fields = append(fields, slack.AttachmentField{ + Title: "Amount", + Value: d.Amount.String() + " " + d.Asset, + Short: false, + }) + return slack.Attachment{ Color: depositStatusSlackColor(d.Status), - Title: fmt.Sprintf("Deposit %s %s To %s", d.Amount.String(), d.Asset, d.Address), + Title: fmt.Sprintf("Deposit %s %s To %s (%s)", d.Amount.String(), d.Asset, d.Address, d.Exchange), // TitleLink: "", Pretext: "", Text: "", @@ -119,9 +147,9 @@ func (d *Deposit) SlackAttachment() slack.Attachment { // ServiceIcon: "", // FromURL: "", // OriginalURL: "", - Fields: fields, - Footer: fmt.Sprintf("Apply Time: %s", d.Time.Time().Format(time.RFC3339)), - // FooterIcon: "", + Fields: fields, + Footer: fmt.Sprintf("Apply Time: %s", d.Time.Time().Format(time.RFC3339)), + FooterIcon: ExchangeFooterIcon(d.Exchange), } } From f37fab08a01e86fb5d0e7e54fbf4c3d7194962aa Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Nov 2024 15:49:29 +0800 Subject: [PATCH 4/6] deposit2transfer: pass customized channel --- pkg/strategy/deposit2transfer/strategy.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/pkg/strategy/deposit2transfer/strategy.go b/pkg/strategy/deposit2transfer/strategy.go index ae01da938..7a44a7660 100644 --- a/pkg/strategy/deposit2transfer/strategy.go +++ b/pkg/strategy/deposit2transfer/strategy.go @@ -42,6 +42,7 @@ func init() { } type SlackAlert struct { + Channel string `json:"channel"` Mentions []string `json:"mentions"` } @@ -213,6 +214,7 @@ func (s *Strategy) checkDeposits(ctx context.Context) { if s.SlackAlert != nil { bbgo.PostLiveNote(&d, + livenote.Channel(s.SlackAlert.Channel), livenote.Comment(fmt.Sprintf("Transferring deposit asset %s %s into the margin account", amount.String(), d.Asset)), ) } @@ -225,6 +227,7 @@ func (s *Strategy) checkDeposits(ctx context.Context) { if s.SlackAlert != nil { bbgo.PostLiveNote(&d, + livenote.Channel(s.SlackAlert.Channel), livenote.Comment(fmt.Sprintf("Margin account transfer error: %+v", err2)), ) } @@ -238,6 +241,7 @@ func (s *Strategy) addWatchingDeposit(deposit types.Deposit) { if s.SlackAlert != nil { bbgo.PostLiveNote(&deposit, + livenote.Channel(s.SlackAlert.Channel), livenote.CompareObject(true), livenote.OneTimeMention(s.SlackAlert.Mentions...), ) From 41b1991843bef7b3d461a96d67879fb2012931a8 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Nov 2024 15:53:17 +0800 Subject: [PATCH 5/6] deposit2transfer: adjust default scan interval --- pkg/strategy/deposit2transfer/strategy.go | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/pkg/strategy/deposit2transfer/strategy.go b/pkg/strategy/deposit2transfer/strategy.go index 7a44a7660..79547430d 100644 --- a/pkg/strategy/deposit2transfer/strategy.go +++ b/pkg/strategy/deposit2transfer/strategy.go @@ -59,7 +59,8 @@ type Strategy struct { marginTransferService marginTransferService depositHistoryService types.ExchangeTransferService - session *bbgo.ExchangeSession + session *bbgo.ExchangeSession + watchingDeposits map[string]types.Deposit mu sync.Mutex @@ -76,7 +77,7 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {} func (s *Strategy) Defaults() error { if s.Interval == 0 { - s.Interval = types.Duration(5 * time.Minute) + s.Interval = types.Duration(3 * time.Minute) } if s.TransferDelay == 0 { @@ -295,12 +296,12 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio if depositTime, ok := s.lastAssetDepositTimes[asset]; ok { // if it's newer than the latest deposit time, then we just add it the monitoring list if deposit.Time.After(depositTime) { - logger.Infof("adding new success deposit: %s", deposit.TransactionID) + logger.Infof("adding new succeedded deposit: %s", deposit.TransactionID) s.addWatchingDeposit(deposit) } } else { // ignore all initial deposits that are already in success status - logger.Infof("ignored succeess deposit: %s %+v", deposit.TransactionID, deposit) + logger.Infof("ignored expired succeedded deposit: %s %+v", deposit.TransactionID, deposit) } case types.DepositCredited, types.DepositPending: @@ -321,7 +322,7 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio var succeededDeposits []types.Deposit - // find the succeeded deposits + // find and move out succeeded deposits for _, deposit := range s.watchingDeposits { switch deposit.Status { case types.DepositSuccess: From ee7beeced6a55f71fbb5c18f6273c6a92783b8ea Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 12 Nov 2024 16:09:51 +0800 Subject: [PATCH 6/6] deposit2transfer: add pin support --- pkg/strategy/deposit2transfer/strategy.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/strategy/deposit2transfer/strategy.go b/pkg/strategy/deposit2transfer/strategy.go index 79547430d..16ec0e1c9 100644 --- a/pkg/strategy/deposit2transfer/strategy.go +++ b/pkg/strategy/deposit2transfer/strategy.go @@ -44,6 +44,7 @@ func init() { type SlackAlert struct { Channel string `json:"channel"` Mentions []string `json:"mentions"` + Pin bool `json:"pin"` } type Strategy struct { @@ -243,6 +244,7 @@ func (s *Strategy) addWatchingDeposit(deposit types.Deposit) { if s.SlackAlert != nil { bbgo.PostLiveNote(&deposit, livenote.Channel(s.SlackAlert.Channel), + livenote.Pin(s.SlackAlert.Pin), livenote.CompareObject(true), livenote.OneTimeMention(s.SlackAlert.Mentions...), )