From 4a28843a0a0cfbb6ff6292b9f80f3b1155d7d7f5 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 8 Aug 2023 12:38:23 +0800 Subject: [PATCH] deposit2transfer: fix mutex lock --- config/deposit2transfer.yaml | 2 +- pkg/strategy/deposit2transfer/strategy.go | 19 ++++++++++++------- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/config/deposit2transfer.yaml b/config/deposit2transfer.yaml index 2f4cef67d..94cc742d2 100644 --- a/config/deposit2transfer.yaml +++ b/config/deposit2transfer.yaml @@ -9,4 +9,4 @@ exchangeStrategies: ## assets are the assets you want to transfer into the margin account assets: - - BTC + - USDT diff --git a/pkg/strategy/deposit2transfer/strategy.go b/pkg/strategy/deposit2transfer/strategy.go index 5f60b1ded..b5bf94938 100644 --- a/pkg/strategy/deposit2transfer/strategy.go +++ b/pkg/strategy/deposit2transfer/strategy.go @@ -39,7 +39,7 @@ type Strategy struct { Assets []string `json:"assets"` - Interval types.Interval `json:"interval"` + Interval types.Duration `json:"interval"` marginTransferService marginTransferService depositHistoryService types.ExchangeTransferService @@ -56,8 +56,8 @@ func (s *Strategy) ID() string { func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {} func (s *Strategy) Defaults() error { - if s.Interval == "" { - s.Interval = types.Interval1m + if s.Interval == 0 { + s.Interval = types.Duration(5 * time.Minute) } return nil @@ -87,7 +87,9 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se return errDepositHistoryNotSupport } - go s.tickWatcher(ctx, s.Interval.Duration()) + session.UserDataStream.OnStart(func() { + go s.tickWatcher(ctx, s.Interval.Duration()) + }) return nil } @@ -97,6 +99,7 @@ func (s *Strategy) tickWatcher(ctx context.Context, interval time.Duration) { defer ticker.Stop() s.checkDeposits(ctx) + for { select { case <-ctx.Done(): @@ -121,6 +124,7 @@ func (s *Strategy) checkDeposits(ctx context.Context) { if len(succeededDeposits) == 0 { log.Infof("no %s deposit found", asset) + continue } for _, d := range succeededDeposits { @@ -163,10 +167,10 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio }) s.mu.Lock() - defer s.mu.Lock() + defer s.mu.Unlock() for _, deposit := range deposits { - log.Infof("scanning deposit: %+v", deposit) + log.Infof("checking deposit: %+v", deposit) if deposit.Asset != asset { continue @@ -177,9 +181,10 @@ func (s *Strategy) scanDepositHistory(ctx context.Context, asset string, duratio s.watchingDeposits[deposit.TransactionID] = deposit } else { switch deposit.Status { + case types.DepositSuccess: // ignore all deposits that are already success - log.Infof("ignored succeess deposit: %s", deposit.TransactionID) + log.Infof("ignored succeess deposit: %s %+v", deposit.TransactionID, deposit) continue case types.DepositCredited, types.DepositPending: