Merge pull request #1826 from c9s/c9s/xmaker/isolated-market-data-stream

FEATURE: [xmaker] allocate isolated market data stream
This commit is contained in:
c9s 2024-11-16 14:21:46 +08:00 committed by GitHub
commit b4a677a612
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -259,11 +259,6 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
panic(fmt.Errorf("source session %s is not defined", s.SourceExchange)) panic(fmt.Errorf("source session %s is not defined", s.SourceExchange))
} }
sourceSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
// TODO: fix depth20 stream for binance
// Depth: s.SourceDepthLevel,
})
sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) sourceSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
makerSession, ok := sessions[s.MakerExchange] makerSession, ok := sessions[s.MakerExchange]
@ -273,12 +268,6 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
if s.EnableArbitrage {
makerSession.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
Depth: types.DepthLevelMedium,
})
}
for _, sig := range s.SignalConfigList { for _, sig := range s.SignalConfigList {
if sig.TradeVolumeWindowSignal != nil { if sig.TradeVolumeWindowSignal != nil {
sourceSession.Subscribe(types.MarketTradeChannel, s.Symbol, types.SubscribeOptions{}) sourceSession.Subscribe(types.MarketTradeChannel, s.Symbol, types.SubscribeOptions{})
@ -1046,6 +1035,10 @@ func aggregatePriceVolumeSliceWithPriceFilter(
// tryArbitrage tries to arbitrage between the source and maker exchange // tryArbitrage tries to arbitrage between the source and maker exchange
func (s *Strategy) tryArbitrage(ctx context.Context, quote *Quote, makerBalances, hedgeBalances types.BalanceMap) (bool, error) { func (s *Strategy) tryArbitrage(ctx context.Context, quote *Quote, makerBalances, hedgeBalances types.BalanceMap) (bool, error) {
if s.makerBook == nil {
return false, nil
}
marginBidPrice := quote.BestBidPrice.Mul(fixedpoint.One.Sub(quote.BidMargin)) marginBidPrice := quote.BestBidPrice.Mul(fixedpoint.One.Sub(quote.BidMargin))
marginAskPrice := quote.BestAskPrice.Mul(fixedpoint.One.Add(quote.AskMargin)) marginAskPrice := quote.BestAskPrice.Mul(fixedpoint.One.Add(quote.AskMargin))
@ -1730,11 +1723,35 @@ func (s *Strategy) CrossRun(
s.ProfitStats.ProfitStats = profitStats s.ProfitStats.ProfitStats = profitStats
} }
if s.EnableArbitrage {
makerMarketStream := s.makerSession.Exchange.NewStream()
makerMarketStream.SetPublicOnly()
makerMarketStream.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
Speed: types.SpeedLow,
})
s.makerBook = types.NewStreamBook(s.Symbol, s.makerSession.ExchangeName) s.makerBook = types.NewStreamBook(s.Symbol, s.makerSession.ExchangeName)
s.makerBook.BindStream(s.makerSession.MarketDataStream) s.makerBook.BindStream(s.makerSession.MarketDataStream)
if err := makerMarketStream.Connect(ctx); err != nil {
return err
}
}
sourceMarketStream := s.sourceSession.Exchange.NewStream()
sourceMarketStream.SetPublicOnly()
sourceMarketStream.Subscribe(types.BookChannel, s.Symbol, types.SubscribeOptions{
Depth: types.DepthLevelFull,
Speed: types.SpeedLow,
})
s.sourceBook = types.NewStreamBook(s.Symbol, s.sourceSession.ExchangeName) s.sourceBook = types.NewStreamBook(s.Symbol, s.sourceSession.ExchangeName)
s.sourceBook.BindStream(s.sourceSession.MarketDataStream) s.sourceBook.BindStream(sourceMarketStream)
if err := sourceMarketStream.Connect(ctx); err != nil {
return err
}
if s.EnableSignalMargin { if s.EnableSignalMargin {
s.logger.Infof("signal margin is enabled") s.logger.Infof("signal margin is enabled")