xmaker: integrate CircuitBreaker

This commit is contained in:
c9s 2024-08-24 11:58:09 +08:00
parent 5f65e87e89
commit 6ef8aa62e5
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 36 additions and 7 deletions

View File

@ -59,6 +59,13 @@ crossExchangeStrategies:
# 0.1 pip is 0.01, here we use 10, so we will get 18000.00, 18001.00 and # 0.1 pip is 0.01, here we use 10, so we will get 18000.00, 18001.00 and
# 18002.00 # 18002.00
pips: 10 pips: 10
persistence: circuitBreaker:
type: redis maximumConsecutiveTotalLoss: 36.0
maximumConsecutiveLossTimes: 10
maximumLossPerRound: 15.0
maximumTotalLoss: 80.0
ignoreConsecutiveDustLoss: true
consecutiveDustLossThreshold: 0.003
haltDuration: "30m"
maximumHaltTimes: 2
maximumHaltTimesExceededPanic: true

View File

@ -223,21 +223,21 @@ func (b *BasicCircuitBreaker) reset() {
b.updateMetrics() b.updateMetrics()
} }
func (b *BasicCircuitBreaker) IsHalted(now time.Time) bool { func (b *BasicCircuitBreaker) IsHalted(now time.Time) (string, bool) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock() defer b.mu.Unlock()
if !b.halted { if !b.halted {
return false return "", false
} }
// check if it's an expired halt // check if it's an expired halt
if now.After(b.haltTo) { if now.After(b.haltTo) {
b.reset() b.reset()
return false return "", false
} }
return true return b.haltReason, true
} }
func (b *BasicCircuitBreaker) halt(now time.Time, reason string) { func (b *BasicCircuitBreaker) halt(now time.Time, reason string) {

View File

@ -22,6 +22,9 @@ import (
var defaultMargin = fixedpoint.NewFromFloat(0.003) var defaultMargin = fixedpoint.NewFromFloat(0.003)
var Two = fixedpoint.NewFromInt(2) var Two = fixedpoint.NewFromInt(2)
// circuitBreakerAlertLimiter is for CircuitBreaker alerts
var circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
const priceUpdateTimeout = 30 * time.Second const priceUpdateTimeout = 30 * time.Second
const ID = "xmaker" const ID = "xmaker"
@ -190,6 +193,19 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
return return
} }
if s.CircuitBreaker != nil {
now := time.Now()
if reason, halted := s.CircuitBreaker.IsHalted(now); halted {
log.Warnf("[arbWorker] strategy is halted, reason: %s", reason)
if circuitBreakerAlertLimiter.AllowN(now, 1) {
bbgo.Notify("Strategy is halted, reason: %s", reason)
}
return
}
}
bestBid, bestAsk, hasPrice := s.book.BestBidAndAsk() bestBid, bestAsk, hasPrice := s.book.BestBidAndAsk()
if !hasPrice { if !hasPrice {
return return
@ -573,6 +589,8 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity) bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
// TODO: improve order executor
orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession} orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession}
returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{ returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
Market: s.sourceMarket, Market: s.sourceMarket,
@ -824,6 +842,10 @@ func (s *Strategy) CrossRun(
s.ProfitStats.AddProfit(p) s.ProfitStats.AddProfit(p)
s.Environment.RecordPosition(s.Position, trade, &p) s.Environment.RecordPosition(s.Position, trade, &p)
if s.CircuitBreaker != nil {
s.CircuitBreaker.RecordProfit(profit, trade.Time.Time())
}
} }
}) })