xfunding: fix critical section for usedQuoteInvestment

This commit is contained in:
c9s 2023-03-25 02:53:55 +08:00
parent 0f49f9fbe5
commit 300506f9f9
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -332,7 +332,9 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
} }
}) })
// s.futuresSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {})) s.futuresSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(kline types.KLine) {
s.queryAndDetectPremiumIndex(ctx, binanceFutures)
}))
if binanceStream, ok := s.futuresSession.UserDataStream.(*binance.Stream); ok { if binanceStream, ok := s.futuresSession.UserDataStream.(*binance.Stream); ok {
binanceStream.OnAccountUpdateEvent(func(e *binance.AccountUpdateEvent) { binanceStream.OnAccountUpdateEvent(func(e *binance.AccountUpdateEvent) {
@ -359,17 +361,25 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
return return
case <-ticker.C: case <-ticker.C:
s.queryAndDetectPremiumIndex(ctx, binanceFutures) s.syncSpotAccount(ctx)
s.sync(ctx)
} }
} }
}() }()
// TODO: use go routine and time.Ticker to trigger spot sync and futures sync go func() {
/* ticker := time.NewTicker(10 * time.Second)
s.spotSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) { defer ticker.Stop()
}))
*/ for {
select {
case <-ctx.Done():
return
case <-ticker.C:
s.syncFuturesAccount(ctx)
}
}
}()
return nil return nil
} }
@ -388,14 +398,21 @@ func (s *Strategy) queryAndDetectPremiumIndex(ctx context.Context, binanceFuture
} }
} }
func (s *Strategy) sync(ctx context.Context) { func (s *Strategy) syncSpotAccount(ctx context.Context) {
switch s.getPositionState() { switch s.getPositionState() {
case PositionOpening: case PositionOpening:
s.increaseSpotPosition(ctx) s.increaseSpotPosition(ctx)
case PositionClosing:
s.syncSpotPosition(ctx)
}
}
func (s *Strategy) syncFuturesAccount(ctx context.Context) {
switch s.getPositionState() {
case PositionOpening:
s.syncFuturesPosition(ctx) s.syncFuturesPosition(ctx)
case PositionClosing: case PositionClosing:
s.reduceFuturesPosition(ctx) s.reduceFuturesPosition(ctx)
s.syncSpotPosition(ctx)
} }
} }
@ -635,8 +652,10 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) {
} }
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() usedQuoteInvestment := s.State.UsedQuoteInvestment
if s.State.UsedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 { s.mu.Unlock()
if usedQuoteInvestment.Compare(s.QuoteInvestment) >= 0 {
// stop increase the position // stop increase the position
s.setPositionState(PositionReady) s.setPositionState(PositionReady)
@ -653,7 +672,7 @@ func (s *Strategy) increaseSpotPosition(ctx context.Context) {
return return
} }
leftQuota := s.QuoteInvestment.Sub(s.State.UsedQuoteInvestment) leftQuota := s.QuoteInvestment.Sub(usedQuoteInvestment)
orderPrice := ticker.Buy orderPrice := ticker.Buy
orderQuantity := fixedpoint.Min(s.IncrementalQuoteQuantity, leftQuota).Div(orderPrice) orderQuantity := fixedpoint.Min(s.IncrementalQuoteQuantity, leftQuota).Div(orderPrice)