Merge pull request #1717 from c9s/c9s/xmaker/worker-method

REFACTOR: [xmaker] refactor hedge worker and quote worker
This commit is contained in:
c9s 2024-08-27 15:32:56 +08:00 committed by GitHub
commit 5edf5a763f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -192,7 +192,7 @@ func (s *Strategy) Initialize() error {
return nil
}
func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter) {
func (s *Strategy) updateQuote(ctx context.Context) {
if err := s.activeMakerOrders.GracefulCancel(ctx, s.makerSession.Exchange); err != nil {
log.Warnf("there are some %s orders not canceled, skipping placing maker orders", s.Symbol)
s.activeMakerOrders.Print()
@ -706,35 +706,6 @@ func (s *Strategy) tradeRecover(ctx context.Context) {
}
func (s *Strategy) Defaults() error {
if s.CircuitBreaker == nil {
s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID())
}
// circuitBreakerAlertLimiter is for CircuitBreaker alerts
s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(5*time.Minute), 1)
return nil
}
func (s *Strategy) Validate() error {
if s.Quantity.IsZero() || s.QuantityScale == nil {
return errors.New("quantity or quantityScale can not be empty")
}
if !s.QuantityMultiplier.IsZero() && s.QuantityMultiplier.Sign() < 0 {
return errors.New("quantityMultiplier can not be a negative number")
}
if len(s.Symbol) == 0 {
return errors.New("symbol is required")
}
return nil
}
func (s *Strategy) CrossRun(
ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession,
) error {
if s.BollBandInterval == "" {
s.BollBandInterval = types.Interval1m
}
@ -775,6 +746,108 @@ func (s *Strategy) CrossRun(
}
}
if s.CircuitBreaker == nil {
s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID())
}
// circuitBreakerAlertLimiter is for CircuitBreaker alerts
s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(5*time.Minute), 1)
return nil
}
func (s *Strategy) Validate() error {
if s.Quantity.IsZero() || s.QuantityScale == nil {
return errors.New("quantity or quantityScale can not be empty")
}
if !s.QuantityMultiplier.IsZero() && s.QuantityMultiplier.Sign() < 0 {
return errors.New("quantityMultiplier can not be a negative number")
}
if len(s.Symbol) == 0 {
return errors.New("symbol is required")
}
return nil
}
func (s *Strategy) quoteWorker(ctx context.Context) {
quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
defer quoteTicker.Stop()
defer func() {
if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
}
}()
for {
select {
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
return
case <-quoteTicker.C:
s.updateQuote(ctx)
}
}
}
func (s *Strategy) hedgeWorker(ctx context.Context) {
ticker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
// For positive position and negative covered position:
// uncover position = +5 - (-3) (covered position) = 8
//
// meaning we bought 5 on MAX and sent buy order with 3 on binance
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
s.CoveredPosition,
uncoverPosition,
)
s.Hedge(ctx, uncoverPosition.Neg())
}
if s.reportProfitStatsRateLimiter.Allow() {
bbgo.Notify(s.ProfitStats)
}
}
}
}
func (s *Strategy) CrossRun(
ctx context.Context, orderExecutionRouter bbgo.OrderExecutionRouter, sessions map[string]*bbgo.ExchangeSession,
) error {
s.hedgeErrorLimiter = rate.NewLimiter(rate.Every(1*time.Minute), 1)
// configure sessions
@ -945,6 +1018,7 @@ func (s *Strategy) CrossRun(
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
bbgo.Notify(position)
})
s.tradeCollector.OnRecover(func(trade types.Trade) {
bbgo.Notify("Recovered trade", trade)
})
@ -959,67 +1033,8 @@ func (s *Strategy) CrossRun(
go s.tradeRecover(ctx)
}
go func() {
hedgeTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer hedgeTicker.Stop()
quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
defer quoteTicker.Stop()
defer func() {
if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
}
}()
for {
select {
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
return
case <-quoteTicker.C:
s.updateQuote(ctx, orderExecutionRouter)
case <-hedgeTicker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
// For positive position and negative covered position:
// uncover position = +5 - (-3) (covered position) = 8
//
// meaning we bought 5 on MAX and sent buy order with 3 on binance
//
// For negative position:
// uncover position = -5 - -3 (covered position) = -2
s.tradeCollector.Process()
position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
s.CoveredPosition,
uncoverPosition,
)
s.Hedge(ctx, uncoverPosition.Neg())
}
if s.reportProfitStatsRateLimiter.Allow() {
bbgo.Notify(s.ProfitStats)
}
}
}
}()
go s.hedgeWorker(ctx)
go s.quoteWorker(ctx)
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done()