From 300506f9f956b6a90eb6ac234b4ccb324e0f2a78 Mon Sep 17 00:00:00 2001 From: c9s Date: Sat, 25 Mar 2023 02:53:55 +0800 Subject: [PATCH] xfunding: fix critical section for usedQuoteInvestment --- pkg/strategy/xfunding/strategy.go | 45 ++++++++++++++++++++++--------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/pkg/strategy/xfunding/strategy.go b/pkg/strategy/xfunding/strategy.go index 78648cac2..01f6f4b99 100644 --- a/pkg/strategy/xfunding/strategy.go +++ b/pkg/strategy/xfunding/strategy.go @@ -332,7 +332,9 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order } }) - // s.futuresSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {})) + s.futuresSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) { + s.queryAndDetectPremiumIndex(ctx, binanceFutures) + })) if binanceStream, ok := s.futuresSession.UserDataStream.(*binance.Stream); ok { binanceStream.OnAccountUpdateEvent(func(e *binance.AccountUpdateEvent) { @@ -359,17 +361,25 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order return case <-ticker.C: - s.queryAndDetectPremiumIndex(ctx, binanceFutures) - s.sync(ctx) + s.syncSpotAccount(ctx) } } }() - // TODO: use go routine and time.Ticker to trigger spot sync and futures sync - /* - s.spotSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) { - })) - */ + go func() { + ticker := time.NewTicker(10 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + s.syncFuturesAccount(ctx) + } + } + }() return nil } @@ -388,14 +398,21 @@ func (s *Strategy) queryAndDetectPremiumIndex(ctx context.Context, binanceFuture } } -func (s *Strategy) sync(ctx context.Context) { +func (s *Strategy) syncSpotAccount(ctx context.Context) { switch s.getPositionState() { case PositionOpening: s.increaseSpotPosition(ctx) + case PositionClosing: + s.syncSpotPosition(ctx) + } +} + +func (s *Strategy) syncFuturesAccount(ctx context.Context) { + switch s.getPositionState() { + case PositionOpening: s.syncFuturesPosition(ctx) case PositionClosing: s.reduceFuturesPosition(ctx) - s.syncSpotPosition(ctx) } } @@ -635,8 +652,10 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) { } s.mu.Lock() - defer s.mu.Unlock() - if s.State.UsedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 { + usedQuoteInvestment := s.State.UsedQuoteInvestment + s.mu.Unlock() + + if usedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 { // stop increase the position s.setPositionState(PositionReady) @@ -653,7 +672,7 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) { return } - leftQuota := s.QuoteInvestment.Sub(s.State.UsedQuoteInvestment) + leftQuota := s.QuoteInvestment.Sub(usedQuoteInvestment) orderPrice := ticker.Buy orderQuantity := fixedpoint.Min(s.IncrementalQuoteQuantity, leftQuota).Div(orderPrice)