xdepthmaker: support countery party 5 hedge and force full replenish

This commit is contained in:
c9s 2024-09-27 18:29:44 +08:00
parent 5dcd375279
commit e8c063c09b
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -16,6 +16,8 @@ import (
"github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry" "github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/pricesolver"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/strategy/xmaker" "github.com/c9s/bbgo/pkg/strategy/xmaker"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
@ -185,6 +187,8 @@ type HedgeStrategy string
const ( const (
HedgeStrategyMarket HedgeStrategy = "market" HedgeStrategyMarket HedgeStrategy = "market"
HedgeStrategyBboCounterParty1 HedgeStrategy = "bbo-counter-party-1" HedgeStrategyBboCounterParty1 HedgeStrategy = "bbo-counter-party-1"
HedgeStrategyBboCounterParty3 HedgeStrategy = "bbo-counter-party-3"
HedgeStrategyBboCounterParty5 HedgeStrategy = "bbo-counter-party-5"
HedgeStrategyBboQueue1 HedgeStrategy = "bbo-queue-1" HedgeStrategyBboQueue1 HedgeStrategy = "bbo-queue-1"
) )
@ -269,10 +273,14 @@ type Strategy struct {
lastSourcePrice fixedpoint.MutexValue lastSourcePrice fixedpoint.MutexValue
stopC chan struct{} stopC chan struct{}
fullReplenishTriggerC sigchan.Chan
logger logrus.FieldLogger logger logrus.FieldLogger
makerConnectivity, hedgerConnectivity *types.Connectivity
connectivityGroup *types.ConnectivityGroup connectivityGroup *types.ConnectivityGroup
priceSolver *pricesolver.SimplePriceSolver
} }
func (s *Strategy) ID() string { func (s *Strategy) ID() string {
@ -318,8 +326,10 @@ func (s *Strategy) CrossSubscribe(sessions map[string]*bbgo.ExchangeSession) {
}) })
hedgeSession.Subscribe(types.KLineChannel, s.HedgeSymbol, types.SubscribeOptions{Interval: "1m"}) hedgeSession.Subscribe(types.KLineChannel, s.HedgeSymbol, types.SubscribeOptions{Interval: "1m"})
hedgeSession.Subscribe(types.KLineChannel, hedgeSession.Exchange.PlatformFeeCurrency()+"USDT", types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"}) makerSession.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: "1m"})
makerSession.Subscribe(types.KLineChannel, makerSession.Exchange.PlatformFeeCurrency()+"USDT", types.SubscribeOptions{Interval: "1m"})
} }
func (s *Strategy) Validate() error { func (s *Strategy) Validate() error {
@ -396,6 +406,7 @@ func (s *Strategy) Defaults() error {
} }
func (s *Strategy) quoteWorker(ctx context.Context) { func (s *Strategy) quoteWorker(ctx context.Context) {
updateTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200)) updateTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
defer updateTicker.Stop() defer updateTicker.Stop()
@ -421,6 +432,10 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol) log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return return
case <-s.fullReplenishTriggerC:
// force trigger full replenish
s.updateQuote(ctx, 0)
case <-fullReplenishTicker.C: case <-fullReplenishTicker.C:
s.updateQuote(ctx, 0) s.updateQuote(ctx, 0)
lastOrderReplenishTime = time.Now() lastOrderReplenishTime = time.Now()
@ -431,7 +446,7 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
return return
} }
if time.Since(lastOrderReplenishTime) < 15*time.Second { if time.Since(lastOrderReplenishTime) < 10*time.Second {
continue continue
} }
@ -565,15 +580,37 @@ func (s *Strategy) CrossRun(
s.sourceBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName) s.sourceBook = types.NewStreamBook(s.HedgeSymbol, s.hedgeSession.ExchangeName)
s.sourceBook.BindStream(s.hedgeSession.MarketDataStream) s.sourceBook.BindStream(s.hedgeSession.MarketDataStream)
s.priceSolver = pricesolver.NewSimplePriceResolver(s.makerSession.Markets())
s.priceSolver.BindStream(s.hedgeSession.MarketDataStream)
s.priceSolver.BindStream(s.makerSession.MarketDataStream)
if err := s.priceSolver.UpdateFromTickers(ctx, s.makerSession.Exchange,
s.Symbol, s.makerSession.Exchange.PlatformFeeCurrency()+"USDT"); err != nil {
return err
}
if err := s.priceSolver.UpdateFromTickers(ctx, s.hedgeSession.Exchange, s.HedgeSymbol); err != nil {
return err
}
s.makerSession.MarketDataStream.OnKLineClosed(types.KLineWith(s.Symbol, types.Interval1m, func(k types.KLine) {
s.priceSolver.Update(k.Symbol, k.Close)
feeToken := s.makerSession.Exchange.PlatformFeeCurrency()
if feePrice, ok := s.priceSolver.ResolvePrice(feeToken, "USDT"); ok {
s.Position.SetFeeAverageCost(feeToken, feePrice)
}
}))
s.stopC = make(chan struct{}) s.stopC = make(chan struct{})
s.fullReplenishTriggerC = sigchan.New(1)
makerConnectivity := types.NewConnectivity() s.makerConnectivity = types.NewConnectivity()
makerConnectivity.Bind(s.makerSession.UserDataStream) s.makerConnectivity.Bind(s.makerSession.UserDataStream)
hedgerConnectivity := types.NewConnectivity() s.hedgerConnectivity = types.NewConnectivity()
hedgerConnectivity.Bind(s.hedgeSession.UserDataStream) s.hedgerConnectivity.Bind(s.hedgeSession.UserDataStream)
connGroup := types.NewConnectivityGroup(makerConnectivity, hedgerConnectivity) connGroup := types.NewConnectivityGroup(s.makerConnectivity, s.hedgerConnectivity)
s.connectivityGroup = connGroup s.connectivityGroup = connGroup
if s.RecoverTrade { if s.RecoverTrade {
@ -598,6 +635,8 @@ func (s *Strategy) CrossRun(
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
bbgo.Notify("Shutting down %s: %s", ID, s.Symbol)
close(s.stopC) close(s.stopC)
// wait for the quoter to stop // wait for the quoter to stop
@ -615,8 +654,13 @@ func (s *Strategy) CrossRun(
log.WithError(err).Errorf("unable to cancel all orders") log.WithError(err).Errorf("unable to cancel all orders")
} }
// process collected trades
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()
bbgo.Sync(ctx, s) bbgo.Sync(ctx, s)
bbgo.Notify("%s: %s position", ID, s.Symbol, s.Position)
bbgo.Notify("Shutdown %s: %s position", ID, s.Symbol, s.Position)
}) })
return nil return nil
@ -640,11 +684,19 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) error {
quantity = fixedpoint.Min(s.HedgeMaxOrderQuantity, quantity) quantity = fixedpoint.Min(s.HedgeMaxOrderQuantity, quantity)
} }
defer func() {
s.fullReplenishTriggerC.Emit()
}()
switch s.HedgeStrategy { switch s.HedgeStrategy {
case HedgeStrategyMarket: case HedgeStrategyMarket:
return s.executeHedgeMarket(ctx, side, quantity) return s.executeHedgeMarket(ctx, side, quantity)
case HedgeStrategyBboCounterParty1: case HedgeStrategyBboCounterParty1:
return s.executeHedgeBboCounterParty1(ctx, side, quantity) return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 1, quantity)
case HedgeStrategyBboCounterParty3:
return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 3, quantity)
case HedgeStrategyBboCounterParty5:
return s.executeHedgeBboCounterPartyWithIndex(ctx, side, 5, quantity)
case HedgeStrategyBboQueue1: case HedgeStrategyBboQueue1:
return s.executeHedgeBboQueue1(ctx, side, quantity) return s.executeHedgeBboQueue1(ctx, side, quantity)
default: default:
@ -652,14 +704,17 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) error {
} }
} }
func (s *Strategy) executeHedgeBboCounterParty1( func (s *Strategy) executeHedgeBboCounterPartyWithIndex(
ctx context.Context, ctx context.Context,
side types.SideType, side types.SideType,
idx int,
quantity fixedpoint.Value, quantity fixedpoint.Value,
) error { ) error {
price := s.lastSourcePrice.Get() price := s.lastSourcePrice.Get()
if sourcePrice := s.getSourceBboPrice(side.Reverse()); sourcePrice.Sign() > 0 {
price = sourcePrice sideBook := s.sourceBook.SideBook(side.Reverse())
if pv, ok := sideBook.ElemOrLast(idx); ok {
price = pv.Price
} }
if price.IsZero() { if price.IsZero() {
@ -1073,6 +1128,15 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
return return
} }
// if it's disconnected or context is canceled, then return
select {
case <-ctx.Done():
return
case <-s.makerConnectivity.DisconnectedC():
return
default:
}
bestBid, bestAsk, hasPrice := s.sourceBook.BestBidAndAsk() bestBid, bestAsk, hasPrice := s.sourceBook.BestBidAndAsk()
if !hasPrice { if !hasPrice {
return return
@ -1131,7 +1195,7 @@ func (s *Strategy) updateQuote(ctx context.Context, maxLayer int) {
_, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...) _, err = s.MakerOrderExecutor.SubmitOrders(ctx, submitOrders...)
if err != nil { if err != nil {
s.logger.WithError(err).Errorf("order error: %s", err.Error()) s.logger.WithError(err).Errorf("submit order error: %s", err.Error())
return return
} }
} }
@ -1146,10 +1210,7 @@ func (s *Strategy) cleanUpOpenOrders(ctx context.Context, session *bbgo.Exchange
return nil return nil
} }
log.Infof("found existing open orders:") return tradingutil.UniversalCancelAllOrders(ctx, session.Exchange, s.Symbol, openOrders)
types.OrderSlice(openOrders).Print()
return session.Exchange.CancelOrders(ctx, openOrders...)
} }
func selectSessions2( func selectSessions2(