xdepthmaker: split quote worker and hedge worker

This commit is contained in:
c9s 2024-09-26 17:57:12 +08:00
parent 0c842e0eb5
commit 431d6964d5
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -388,6 +388,115 @@ func (s *Strategy) Defaults() error {
return nil
}
func (s *Strategy) quoteWorker(ctx context.Context) {
updateTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
defer updateTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()
// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Errorf("error cleaning up open orders")
}
s.updateQuote(ctx, 0)
lastOrderReplenishTime := time.Now()
for {
select {
case <-ctx.Done():
return
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-fullReplenishTicker.C:
s.updateQuote(ctx, 0)
lastOrderReplenishTime = time.Now()
case sig, ok := <-s.sourceBook.C:
// when any book change event happened
if !ok {
return
}
if time.Since(lastOrderReplenishTime) < 10*time.Second {
continue
}
switch sig.Type {
case types.BookSignalSnapshot:
s.updateQuote(ctx, 0)
case types.BookSignalUpdate:
s.updateQuote(ctx, 5)
}
lastOrderReplenishTime = time.Now()
}
}
}
func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Warnf("maker goroutine stopped, due to context canceled")
return
case <-s.stopC:
s.logger.Warnf("maker goroutine stopped, due to the stop signal")
return
case <-ticker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
// For positive position and negative covered position:
// uncover position = +5 - (-3) (covered position) = 8
//
// meaning we bought 5 on MAX and sent buy order with 3 on binance
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()
position := s.Position.GetBase()
coveredPosition := s.CoveredPosition.Get()
uncoverPosition := position.Sub(coveredPosition)
absPos := uncoverPosition.Abs()
if !s.hedgeMarket.IsDustQuantity(absPos, s.lastSourcePrice.Get()) {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
coveredPosition,
uncoverPosition,
)
if !s.DisableHedge {
if err := s.Hedge(ctx, uncoverPosition.Neg()); err != nil {
//goland:noinspection GoDirectComparisonOfErrors
switch err {
case ErrZeroQuantity, ErrDustQuantity:
default:
s.logger.WithError(err).Errorf("unable to hedge position")
}
}
}
}
}
}
}
func (s *Strategy) CrossRun(
ctx context.Context, _ bbgo.OrderExecutionRouter,
sessions map[string]*bbgo.ExchangeSession,
@ -474,96 +583,8 @@ func (s *Strategy) CrossRun(
log.Infof("user data stream authenticated, start placing orders...")
posTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer posTicker.Stop()
fullReplenishTicker := time.NewTicker(util.MillisecondsJitter(s.FullReplenishInterval.Duration(), 200))
defer fullReplenishTicker.Stop()
// clean up the previous open orders
if err := s.cleanUpOpenOrders(ctx, s.makerSession); err != nil {
log.WithError(err).Errorf("error cleaning up open orders")
}
s.updateQuote(ctx, 0)
lastOrderReplenishTime := time.Now()
for {
select {
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
return
case <-fullReplenishTicker.C:
s.updateQuote(ctx, 0)
lastOrderReplenishTime = time.Now()
case sig, ok := <-s.sourceBook.C:
// when any book change event happened
if !ok {
return
}
if time.Since(lastOrderReplenishTime) < 10*time.Second {
continue
}
switch sig.Type {
case types.BookSignalSnapshot:
s.updateQuote(ctx, 0)
case types.BookSignalUpdate:
s.updateQuote(ctx, 5)
}
lastOrderReplenishTime = time.Now()
case <-posTicker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
// For positive position and negative covered position:
// uncover position = +5 - (-3) (covered position) = 8
//
// meaning we bought 5 on MAX and sent buy order with 3 on binance
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.HedgeOrderExecutor.TradeCollector().Process()
s.MakerOrderExecutor.TradeCollector().Process()
position := s.Position.GetBase()
coveredPosition := s.CoveredPosition.Get()
uncoverPosition := position.Sub(coveredPosition)
absPos := uncoverPosition.Abs()
if absPos.Compare(s.hedgeMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
coveredPosition,
uncoverPosition,
)
if !s.DisableHedge {
if err := s.Hedge(ctx, uncoverPosition.Neg()); err != nil {
//goland:noinspection GoDirectComparisonOfErrors
switch err {
case ErrZeroQuantity, ErrDustQuantity:
default:
s.logger.WithError(err).Errorf("unable to hedge position")
}
}
}
}
}
}
go s.hedgeWorker(ctx)
go s.quoteWorker(ctx)
}()
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {