From 6fb6467d593cfc4d331c0fd676a85bee9b9826a1 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 27 Aug 2024 14:48:30 +0800 Subject: [PATCH] xmaker: refactor hedge worker and quote worker --- pkg/strategy/xmaker/strategy.go | 197 +++++++++++++++++--------------- 1 file changed, 106 insertions(+), 91 deletions(-) diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 7abd6b42d..b8b8c5c8e 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -192,7 +192,7 @@ func (s *Strategy) Initialize() error { return nil } -func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter) { +func (s *Strategy) updateQuote(ctx context.Context) { if err := s.activeMakerOrders.GracefulCancel(ctx, s.makerSession.Exchange); err != nil { log.Warnf("there are some %s orders not canceled, skipping placing maker orders", s.Symbol) s.activeMakerOrders.Print() @@ -706,35 +706,6 @@ func (s *Strategy) tradeRecover(ctx context.Context) { } func (s *Strategy) Defaults() error { - if s.CircuitBreaker == nil { - s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID()) - } - - // circuitBreakerAlertLimiter is for CircuitBreaker alerts - s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2) - s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(5*time.Minute), 1) - return nil -} - -func (s *Strategy) Validate() error { - if s.Quantity.IsZero() || s.QuantityScale == nil { - return errors.New("quantity or quantityScale can not be empty") - } - - if !s.QuantityMultiplier.IsZero() && s.QuantityMultiplier.Sign() < 0 { - return errors.New("quantityMultiplier can not be a negative number") - } - - if len(s.Symbol) == 0 { - return errors.New("symbol is required") - } - - return nil -} - -func (s *Strategy) CrossRun( - ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession, -) error { if s.BollBandInterval == "" { s.BollBandInterval = types.Interval1m } @@ -775,6 +746,108 @@ func (s *Strategy) CrossRun( } } + if s.CircuitBreaker == nil { + s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID()) + } + + // circuitBreakerAlertLimiter is for CircuitBreaker alerts + s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2) + s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(5*time.Minute), 1) + return nil +} + +func (s *Strategy) Validate() error { + if s.Quantity.IsZero() || s.QuantityScale == nil { + return errors.New("quantity or quantityScale can not be empty") + } + + if !s.QuantityMultiplier.IsZero() && s.QuantityMultiplier.Sign() < 0 { + return errors.New("quantityMultiplier can not be a negative number") + } + + if len(s.Symbol) == 0 { + return errors.New("symbol is required") + } + + return nil +} + +func (s *Strategy) quoteWorker(ctx context.Context) { + quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200)) + defer quoteTicker.Stop() + + defer func() { + if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil { + log.WithError(err).Errorf("can not cancel %s orders", s.Symbol) + } + }() + + for { + select { + + case <-s.stopC: + log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol) + return + + case <-ctx.Done(): + log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol) + return + + case <-quoteTicker.C: + s.updateQuote(ctx) + + } + } +} + +func (s *Strategy) hedgeWorker(ctx context.Context) { + ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-ticker.C: + // For positive position and positive covered position: + // uncover position = +5 - +3 (covered position) = 2 + // + // For positive position and negative covered position: + // uncover position = +5 - (-3) (covered position) = 8 + // + // meaning we bought 5 on MAX and sent buy order with 3 on binance + // + // For negative position: + // uncover position = -5 - -3 (covered position) = -2 + s.tradeCollector.Process() + + position := s.Position.GetBase() + + uncoverPosition := position.Sub(s.CoveredPosition) + absPos := uncoverPosition.Abs() + if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 { + log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", + s.Symbol, + position, + s.CoveredPosition, + uncoverPosition, + ) + + s.Hedge(ctx, uncoverPosition.Neg()) + } + + if s.reportProfitStatsRateLimiter.Allow() { + bbgo.Notify(s.ProfitStats) + } + } + } +} + +func (s *Strategy) CrossRun( + ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession, +) error { + s.hedgeErrorLimiter = rate.NewLimiter(rate.Every(1*time.Minute), 1) // configure sessions @@ -945,6 +1018,7 @@ func (s *Strategy) CrossRun( s.tradeCollector.OnPositionUpdate(func(position *types.Position) { bbgo.Notify(position) }) + s.tradeCollector.OnRecover(func(trade types.Trade) { bbgo.Notify("Recovered trade", trade) }) @@ -959,67 +1033,8 @@ func (s *Strategy) CrossRun( go s.tradeRecover(ctx) } - go func() { - hedgeTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) - defer hedgeTicker.Stop() - - quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200)) - defer quoteTicker.Stop() - - defer func() { - if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil { - log.WithError(err).Errorf("can not cancel %s orders", s.Symbol) - } - }() - - for { - select { - - case <-s.stopC: - log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol) - return - - case <-ctx.Done(): - log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol) - return - - case <-quoteTicker.C: - s.updateQuote(ctx, orderExecutionRouter) - - case <-hedgeTicker.C: - // For positive position and positive covered position: - // uncover position = +5 - +3 (covered position) = 2 - // - // For positive position and negative covered position: - // uncover position = +5 - (-3) (covered position) = 8 - // - // meaning we bought 5 on MAX and sent buy order with 3 on binance - // - // For negative position: - // uncover position = -5 - -3 (covered position) = -2 - s.tradeCollector.Process() - - position := s.Position.GetBase() - - uncoverPosition := position.Sub(s.CoveredPosition) - absPos := uncoverPosition.Abs() - if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 { - log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v", - s.Symbol, - position, - s.CoveredPosition, - uncoverPosition, - ) - - s.Hedge(ctx, uncoverPosition.Neg()) - } - - if s.reportProfitStatsRateLimiter.Allow() { - bbgo.Notify(s.ProfitStats) - } - } - } - }() + go s.hedgeWorker(ctx) + go s.quoteWorker(ctx) bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { defer wg.Done()