Merge pull request #1780 from c9s/c9s/xmaker/improvements2
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

FEATURE: [xmaker] support delayed hedge
This commit is contained in:
c9s 2024-10-16 11:56:25 +08:00 committed by GitHub
commit 770f981895
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -136,6 +136,12 @@ type Strategy struct {
DepthQuantity fixedpoint.Value `json:"depthQuantity"`
SourceDepthLevel types.Depth `json:"sourceDepthLevel"`
// EnableDelayHedge enables the delay hedge feature
EnableDelayHedge bool `json:"enableDelayHedge"`
// MaxHedgeDelayDuration is the maximum delay duration to hedge the position
MaxDelayHedgeDuration types.Duration `json:"maxHedgeDelayDuration"`
DelayHedgeSignalThreshold float64 `json:"delayHedgeSignalThreshold"`
EnableBollBandMargin bool `json:"enableBollBandMargin"`
BollBandInterval types.Interval `json:"bollBandInterval"`
BollBandMargin fixedpoint.Value `json:"bollBandMargin"`
@ -165,7 +171,7 @@ type Strategy struct {
NotifyTrade bool `json:"notifyTrade"`
EnableArbitrage bool `json:"arbitrage"`
EnableArbitrage bool `json:"enableArbitrage"`
// RecoverTrade tries to find the missing trades via the REStful API
RecoverTrade bool `json:"recoverTrade"`
@ -196,9 +202,9 @@ type Strategy struct {
CircuitBreaker *circuitbreaker.BasicCircuitBreaker `json:"circuitBreaker"`
// persistence fields
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.Value `json:"coveredPosition,omitempty" persistence:"covered_position"`
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
CoveredPosition fixedpoint.MutexValue `json:"coveredPosition,omitempty" persistence:"covered_position"`
sourceBook, makerBook *types.StreamOrderBook
activeMakerOrders *bbgo.ActiveOrderBook
@ -213,7 +219,7 @@ type Strategy struct {
accountValueCalculator *bbgo.AccountValueCalculator
lastPrice fixedpoint.Value
lastPrice fixedpoint.MutexValue
groupID uint32
stopC chan struct{}
@ -230,6 +236,9 @@ type Strategy struct {
// lastAggregatedSignal stores the last aggregated signal with mutex
// TODO: use float64 series instead, so that we can store history signal values
lastAggregatedSignal MutexFloat64
positionStartedAt *time.Time
positionStartedAtMutex sync.Mutex
}
func (s *Strategy) ID() string {
@ -335,6 +344,34 @@ func (s *Strategy) getBollingerTrend(quote *Quote) int {
}
}
// setPositionStartTime sets the position start time only if it's not set
func (s *Strategy) setPositionStartTime(now time.Time) {
s.positionStartedAtMutex.Lock()
if s.positionStartedAt == nil {
s.positionStartedAt = &now
}
s.positionStartedAtMutex.Unlock()
}
func (s *Strategy) resetPositionStartTime() {
s.positionStartedAtMutex.Lock()
s.positionStartedAt = nil
s.positionStartedAtMutex.Unlock()
}
func (s *Strategy) getPositionHoldingPeriod(now time.Time) (time.Duration, bool) {
s.positionStartedAtMutex.Lock()
startedAt := s.positionStartedAt
s.positionStartedAtMutex.Unlock()
if startedAt == nil || startedAt.IsZero() {
return 0, false
}
return now.Sub(*startedAt), true
}
func (s *Strategy) applySignalMargin(ctx context.Context, quote *Quote) error {
signal, err := s.aggregateSignal(ctx)
if err != nil {
@ -600,9 +637,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
}
// use mid-price for the last price
s.lastPrice = bestBid.Price.Add(bestAsk.Price).Div(two)
s.priceSolver.Update(s.Symbol, s.lastPrice)
midPrice := bestBid.Price.Add(bestAsk.Price).Div(two)
s.lastPrice.Set(midPrice)
s.priceSolver.Update(s.Symbol, midPrice)
bookLastUpdateTime := s.sourceBook.LastUpdateTime()
@ -644,7 +681,7 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
makerQuota := &bbgo.QuotaTransaction{}
if b, ok := makerBalances[s.makerMarket.BaseCurrency]; ok {
if s.makerMarket.IsDustQuantity(b.Available, s.lastPrice) {
if s.makerMarket.IsDustQuantity(b.Available, s.lastPrice.Get()) {
disableMakerAsk = true
s.logger.Infof("%s maker ask disabled: insufficient base balance %s", s.Symbol, b.String())
} else {
@ -1027,7 +1064,7 @@ func (s *Strategy) tryArbitrage(ctx context.Context, quote *Quote, makerBalances
return false, nil
}
if qty.IsZero() {
if qty.IsZero() || s.makerMarket.IsDustQuantity(qty, sumPv.Price) {
return false, nil
}
@ -1057,7 +1094,7 @@ func (s *Strategy) tryArbitrage(ctx context.Context, quote *Quote, makerBalances
return false, nil
}
if qty.IsZero() {
if qty.IsZero() || s.makerMarket.IsDustQuantity(qty, sumPv.Price) {
return false, nil
}
@ -1129,6 +1166,27 @@ func AdjustHedgeQuantityWithAvailableBalance(
return market.TruncateQuantity(quantity)
}
func (s *Strategy) canDelayHedge(side types.SideType, pos fixedpoint.Value) bool {
if !s.EnableDelayHedge {
return false
}
signal := s.lastAggregatedSignal.Get()
// if the signal is strong enough, we can delay the hedge and wait for the next tick
if math.Abs(signal) > s.DelayHedgeSignalThreshold {
period, ok := s.getPositionHoldingPeriod(time.Now())
if ok && (signal > 0 && side == types.SideTypeSell) || (signal < 0 && side == types.SideTypeBuy) {
if period < s.MaxDelayHedgeDuration.Duration() {
s.logger.Infof("delay hedge enabled, signal %f is strong enough, waiting for the next tick to hedge %s quantity (max period %s)", signal, pos, s.MaxDelayHedgeDuration.Duration().String())
return true
}
}
}
return false
}
func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
side := types.SideTypeBuy
if pos.IsZero() {
@ -1141,7 +1199,11 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
side = types.SideTypeSell
}
lastPrice := s.lastPrice
if s.canDelayHedge(side, pos) {
return
}
lastPrice := s.lastPrice.Get()
sourceBook := s.sourceBook.CopyDepth(1)
switch side {
@ -1160,7 +1222,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
if s.sourceSession.Margin {
// check the margin level
if !s.MinMarginLevel.IsZero() && !account.MarginLevel.IsZero() && account.MarginLevel.Compare(s.MinMarginLevel) < 0 {
log.Errorf("margin level %f is too low (< %f), skip hedge", account.MarginLevel.Float64(), s.MinMarginLevel.Float64())
s.logger.Errorf("margin level %f is too low (< %f), skip hedge", account.MarginLevel.Float64(), s.MinMarginLevel.Float64())
return
}
} else {
@ -1172,7 +1234,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
quantity = s.sourceMarket.TruncateQuantity(quantity)
if s.sourceMarket.IsDustQuantity(quantity, lastPrice) {
log.Warnf("skip dust quantity: %s @ price %f", quantity.String(), lastPrice.Float64())
s.logger.Warnf("skip dust quantity: %s @ price %f", quantity.String(), lastPrice.Float64())
return
}
@ -1180,6 +1242,7 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
if !s.hedgeErrorRateReservation.OK() {
return
}
bbgo.Notify("Hit hedge error rate limit, waiting...")
time.Sleep(s.hedgeErrorRateReservation.Delay())
s.hedgeErrorRateReservation = nil
@ -1221,9 +1284,9 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
// if it's selling, then we should add a positive position
if side == types.SideTypeSell {
s.CoveredPosition = s.CoveredPosition.Add(quantity)
s.CoveredPosition.Add(quantity)
} else {
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
s.CoveredPosition.Add(quantity.Neg())
}
}
@ -1266,6 +1329,14 @@ func (s *Strategy) Defaults() error {
s.BollBandInterval = types.Interval1m
}
if s.MaxDelayHedgeDuration == 0 {
s.MaxDelayHedgeDuration = types.Duration(10 * time.Second)
}
if s.DelayHedgeSignalThreshold == 0.0 {
s.DelayHedgeSignalThreshold = 0.5
}
if s.SourceDepthLevel == "" {
s.SourceDepthLevel = types.DepthLevelMedium
}
@ -1355,15 +1426,17 @@ func (s *Strategy) quoteWorker(ctx context.Context) {
select {
case <-s.stopC:
log.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
s.logger.Warnf("%s maker goroutine stopped, due to the stop signal", s.Symbol)
return
case <-ctx.Done():
log.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
s.logger.Warnf("%s maker goroutine stopped, due to the cancelled context", s.Symbol)
return
case <-ticker.C:
s.updateQuote(ctx)
if err := s.updateQuote(ctx); err != nil {
s.logger.WithError(err).Errorf("unable to place maker orders")
}
}
}
@ -1379,11 +1452,11 @@ func (s *Strategy) accountUpdater(ctx context.Context) {
case <-ticker.C:
if _, err := s.sourceSession.UpdateAccount(ctx); err != nil {
log.WithError(err).Errorf("unable to update account")
s.logger.WithError(err).Errorf("unable to update account")
}
if err := s.accountValueCalculator.UpdatePrices(ctx); err != nil {
log.WithError(err).Errorf("unable to update account value with prices")
s.logger.WithError(err).Errorf("unable to update account value with prices")
return
}
@ -1405,7 +1478,7 @@ func (s *Strategy) hedgeWorker(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
case tt := <-ticker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2
//
@ -1420,20 +1493,34 @@ func (s *Strategy) hedgeWorker(ctx context.Context) {
position := s.Position.GetBase()
uncoverPosition := position.Sub(s.CoveredPosition)
absPos := uncoverPosition.Abs()
if !s.DisableHedge && absPos.Compare(s.sourceMarket.MinQuantity) > 0 {
log.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
s.CoveredPosition,
uncoverPosition,
)
s.Hedge(ctx, uncoverPosition.Neg())
profitChanged = true
if position.IsZero() || s.Position.IsDust() {
s.resetPositionStartTime()
} else {
s.setPositionStartTime(tt)
}
coveredPosition := s.CoveredPosition.Get()
uncoverPosition := position.Sub(coveredPosition)
absPos := uncoverPosition.Abs()
if s.sourceMarket.IsDustQuantity(absPos, s.lastPrice.Get()) {
continue
}
if !s.DisableHedge {
continue
}
s.logger.Infof("%s base position %v coveredPosition: %v uncoverPosition: %v",
s.Symbol,
position,
coveredPosition,
uncoverPosition,
)
s.Hedge(ctx, uncoverPosition.Neg())
profitChanged = true
case <-reportTicker.C:
if profitChanged {
if s.reportProfitStatsRateLimiter.Allow() {
@ -1529,12 +1616,6 @@ func (s *Strategy) CrossRun(
}
}
if s.CoveredPosition.IsZero() {
if s.state != nil && !s.CoveredPosition.IsZero() {
s.CoveredPosition = s.state.CoveredPosition
}
}
s.priceSolver = pricesolver.NewSimplePriceResolver(sourceMarkets)
s.priceSolver.BindStream(s.sourceSession.MarketDataStream)
s.sourceSession.UserDataStream.OnTradeUpdate(s.priceSolver.UpdateFromTrade)
@ -1663,7 +1744,7 @@ func (s *Strategy) CrossRun(
s.tradeCollector.OnTrade(func(trade types.Trade, profit, netProfit fixedpoint.Value) {
c := trade.PositionChange()
if trade.Exchange == s.sourceSession.ExchangeName {
s.CoveredPosition = s.CoveredPosition.Add(c)
s.CoveredPosition.Add(c)
}
s.ProfitStats.AddTrade(trade)