Merge pull request #1836 from c9s/c9s/xmaker-optimize
Some checks are pending
Go / build (1.21, 6.2) (push) Waiting to run
golang-lint / lint (push) Waiting to run

IMPROVE: [xmaker] add more improvements
This commit is contained in:
c9s 2024-11-21 13:56:54 +08:00 committed by GitHub
commit 7b391b15f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 225 additions and 39 deletions

View File

@ -64,7 +64,7 @@ func TestDepthBuffer_CorruptedUpdateAtTheBeginning(t *testing.T) {
{Price: itov(99), Volume: itov(1)}, {Price: itov(99), Volume: itov(1)},
}, },
}, snapshotFinalID, nil }, snapshotFinalID, nil
}) }, time.Millisecond*5)
resetC := make(chan struct{}, 1) resetC := make(chan struct{}, 1)
@ -104,7 +104,7 @@ func TestDepthBuffer_ConcurrentRun(t *testing.T) {
{Price: itov(99), Volume: itov(1)}, {Price: itov(99), Volume: itov(1)},
}, },
}, snapshotFinalID, nil }, snapshotFinalID, nil
}) }, time.Millisecond*5)
readyCnt := 0 readyCnt := 0
resetCnt := 0 resetCnt := 0

View File

@ -1,12 +1,15 @@
package xmaker package xmaker
import ( import (
"bytes"
"context" "context"
"fmt" "fmt"
"io"
"math" "math"
"sync" "sync"
"time" "time"
"github.com/jedib0t/go-pretty/v6/table"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
@ -14,12 +17,14 @@ import (
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/core" "github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/dynamic"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2" indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/pricesolver" "github.com/c9s/bbgo/pkg/pricesolver"
"github.com/c9s/bbgo/pkg/profile/timeprofile" "github.com/c9s/bbgo/pkg/profile/timeprofile"
"github.com/c9s/bbgo/pkg/risk/circuitbreaker" "github.com/c9s/bbgo/pkg/risk/circuitbreaker"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/style"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/util/timejitter" "github.com/c9s/bbgo/pkg/util/timejitter"
@ -109,6 +114,29 @@ func init() {
bbgo.RegisterStrategy(ID, &Strategy{}) bbgo.RegisterStrategy(ID, &Strategy{})
} }
type SignalMargin struct {
Enabled bool `json:"enabled"`
Scale *bbgo.SlideRule `json:"scale,omitempty"`
Threshold float64 `json:"threshold,omitempty"`
}
type DelayedHedge struct {
// EnableDelayHedge enables the delay hedge feature
Enabled bool `json:"enabled"`
// MaxDelayDuration is the maximum delay duration to hedge the position
MaxDelayDuration types.Duration `json:"maxDelay"`
// FixedDelayDuration is the fixed delay duration
FixedDelayDuration types.Duration `json:"fixedDelay"`
// SignalThreshold is the signal threshold to trigger the delay hedge
SignalThreshold float64 `json:"signalThreshold"`
// DynamicDelayScale is the dynamic delay scale
DynamicDelayScale *bbgo.SlideRule `json:"dynamicDelayScale,omitempty"`
}
type Strategy struct { type Strategy struct {
Environment *bbgo.Environment Environment *bbgo.Environment
@ -128,11 +156,18 @@ type Strategy struct {
EnableSignalMargin bool `json:"enableSignalMargin"` EnableSignalMargin bool `json:"enableSignalMargin"`
SignalConfigList []SignalConfig `json:"signals"` SignalConfigList []SignalConfig `json:"signals"`
SignalMarginScale *bbgo.SlideRule `json:"signalMarginScale,omitempty"`
SignalReverseSideMargin *SignalMargin `json:"signalReverseSideMargin,omitempty"`
SignalTrendSideMarginDiscount *SignalMargin `json:"signalTrendSideMarginDiscount,omitempty"`
// Margin is the default margin for the quote
Margin fixedpoint.Value `json:"margin"` Margin fixedpoint.Value `json:"margin"`
BidMargin fixedpoint.Value `json:"bidMargin"` BidMargin fixedpoint.Value `json:"bidMargin"`
AskMargin fixedpoint.Value `json:"askMargin"` AskMargin fixedpoint.Value `json:"askMargin"`
// MinMargin is the minimum margin protection for signal margin
MinMargin *fixedpoint.Value `json:"minMargin"`
UseDepthPrice bool `json:"useDepthPrice"` UseDepthPrice bool `json:"useDepthPrice"`
DepthQuantity fixedpoint.Value `json:"depthQuantity"` DepthQuantity fixedpoint.Value `json:"depthQuantity"`
SourceDepthLevel types.Depth `json:"sourceDepthLevel"` SourceDepthLevel types.Depth `json:"sourceDepthLevel"`
@ -143,6 +178,8 @@ type Strategy struct {
MaxDelayHedgeDuration types.Duration `json:"maxHedgeDelayDuration"` MaxDelayHedgeDuration types.Duration `json:"maxHedgeDelayDuration"`
DelayHedgeSignalThreshold float64 `json:"delayHedgeSignalThreshold"` DelayHedgeSignalThreshold float64 `json:"delayHedgeSignalThreshold"`
DelayedHedge *DelayedHedge `json:"delayedHedge,omitempty"`
EnableBollBandMargin bool `json:"enableBollBandMargin"` EnableBollBandMargin bool `json:"enableBollBandMargin"`
BollBandInterval types.Interval `json:"bollBandInterval"` BollBandInterval types.Interval `json:"bollBandInterval"`
BollBandMargin fixedpoint.Value `json:"bollBandMargin"` BollBandMargin fixedpoint.Value `json:"bollBandMargin"`
@ -179,6 +216,8 @@ type Strategy struct {
RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"` RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"`
MaxQuoteQuotaRatio fixedpoint.Value `json:"maxQuoteQuotaRatio,omitempty"`
NumLayers int `json:"numLayers"` NumLayers int `json:"numLayers"`
// Pips is the pips of the layer prices // Pips is the pips of the layer prices
@ -317,9 +356,49 @@ func (s *Strategy) Initialize() error {
"exchange": s.MakerExchange, "exchange": s.MakerExchange,
"symbol": s.Symbol, "symbol": s.Symbol,
} }
if s.SignalReverseSideMargin != nil && s.SignalReverseSideMargin.Scale != nil {
scale, err := s.SignalReverseSideMargin.Scale.Scale()
if err != nil {
return err
}
if solveErr := scale.Solve(); solveErr != nil {
return solveErr
}
}
if s.SignalTrendSideMarginDiscount != nil && s.SignalTrendSideMarginDiscount.Scale != nil {
scale, err := s.SignalTrendSideMarginDiscount.Scale.Scale()
if err != nil {
return err
}
if solveErr := scale.Solve(); solveErr != nil {
return solveErr
}
}
if s.DelayedHedge != nil && s.DelayedHedge.DynamicDelayScale != nil {
if scale, _ := s.DelayedHedge.DynamicDelayScale.Scale(); scale != nil {
if err := scale.Solve(); err != nil {
return err
}
}
}
return nil return nil
} }
func (s *Strategy) PrintConfig(f io.Writer, pretty bool, withColor ...bool) {
var tableStyle *table.Style
if pretty {
tableStyle = style.NewDefaultTableStyle()
}
dynamic.PrintConfig(s, f, tableStyle, len(withColor) > 0 && withColor[0], dynamic.DefaultWhiteList()...)
}
// getBollingerTrend returns -1 when the price is in the downtrend, 1 when the price is in the uptrend, 0 when the price is in the band // getBollingerTrend returns -1 when the price is in the downtrend, 1 when the price is in the uptrend, 0 when the price is in the band
func (s *Strategy) getBollingerTrend(quote *Quote) int { func (s *Strategy) getBollingerTrend(quote *Quote) int {
// when bid price is lower than the down band, then it's in the downtrend // when bid price is lower than the down band, then it's in the downtrend
@ -377,32 +456,65 @@ func (s *Strategy) applySignalMargin(ctx context.Context, quote *Quote) error {
return nil return nil
} }
scale, err := s.SignalMarginScale.Scale() signalAbs := math.Abs(signal)
var trendSideMarginDiscount, reverseSideMargin float64
var trendSideMarginDiscountFp, reverseSideMarginFp fixedpoint.Value
if s.SignalTrendSideMarginDiscount != nil && s.SignalTrendSideMarginDiscount.Enabled {
trendSideMarginScale, err := s.SignalTrendSideMarginDiscount.Scale.Scale()
if err != nil { if err != nil {
return err return err
} }
margin := scale.Call(math.Abs(signal)) if signalAbs > s.SignalTrendSideMarginDiscount.Threshold {
// trendSideMarginDiscount is the discount for the trend side margin
trendSideMarginDiscount = trendSideMarginScale.Call(math.Abs(signal))
trendSideMarginDiscountFp = fixedpoint.NewFromFloat(trendSideMarginDiscount)
s.logger.Infof("signal margin: %f", margin) if signal > 0.0 {
quote.BidMargin = quote.BidMargin.Sub(trendSideMarginDiscountFp)
} else if signal < 0.0 {
quote.AskMargin = quote.AskMargin.Sub(trendSideMarginDiscountFp)
}
}
}
marginFp := fixedpoint.NewFromFloat(margin) if s.SignalReverseSideMargin != nil && s.SignalReverseSideMargin.Enabled {
reverseSideMarginScale, err := s.SignalReverseSideMargin.Scale.Scale()
if err != nil {
return err
}
if signalAbs > s.SignalReverseSideMargin.Threshold {
reverseSideMargin = reverseSideMarginScale.Call(math.Abs(signal))
reverseSideMarginFp = fixedpoint.NewFromFloat(reverseSideMargin)
if signal < 0.0 { if signal < 0.0 {
quote.BidMargin = quote.BidMargin.Add(marginFp) quote.BidMargin = quote.BidMargin.Add(reverseSideMarginFp)
if signal <= -2.0 {
// quote.BidMargin = fixedpoint.Zero
}
s.logger.Infof("adjusted bid margin: %f", quote.BidMargin.Float64())
} else if signal > 0.0 { } else if signal > 0.0 {
quote.AskMargin = quote.AskMargin.Add(marginFp) quote.AskMargin = quote.AskMargin.Add(reverseSideMarginFp)
if signal >= 2.0 { }
// quote.AskMargin = fixedpoint.Zero }
} }
s.logger.Infof("adjusted ask margin: %f", quote.AskMargin.Float64()) s.logger.Infof("signal margin params: signal = %f, reverseSideMargin = %f, trendSideMarginDiscount = %f", signal, reverseSideMargin, trendSideMarginDiscount)
s.logger.Infof("calculated signal margin: signal = %f, askMargin = %s, bidMargin = %s",
signal,
quote.AskMargin,
quote.BidMargin,
)
if s.MinMargin != nil {
quote.AskMargin = fixedpoint.Max(*s.MinMargin, quote.AskMargin)
quote.BidMargin = fixedpoint.Max(*s.MinMargin, quote.BidMargin)
} }
s.logger.Infof("final signal margin: signal = %f, askMargin = %s, bidMargin = %s",
signal,
quote.AskMargin,
quote.BidMargin,
)
return nil return nil
} }
@ -696,7 +808,14 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
if b, ok := makerBalances[s.makerMarket.QuoteCurrency]; ok { if b, ok := makerBalances[s.makerMarket.QuoteCurrency]; ok {
if b.Available.Compare(s.makerMarket.MinNotional) > 0 { if b.Available.Compare(s.makerMarket.MinNotional) > 0 {
if s.MaxQuoteQuotaRatio.Sign() > 0 {
quoteAvailable := b.Available.Mul(s.MaxQuoteQuotaRatio)
makerQuota.QuoteAsset.Add(quoteAvailable)
} else {
// use all quote balances as much as possible
makerQuota.QuoteAsset.Add(b.Available) makerQuota.QuoteAsset.Add(b.Available)
}
} else { } else {
disableMakerBid = true disableMakerBid = true
s.logger.Infof("%s maker bid disabled: insufficient quote balance %s", s.Symbol, b.String()) s.logger.Infof("%s maker bid disabled: insufficient quote balance %s", s.Symbol, b.String())
@ -835,7 +954,7 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
} }
if disableMakerAsk && disableMakerBid { if disableMakerAsk && disableMakerBid {
log.Warnf("%s bid/ask maker is disabled due to insufficient balances", s.Symbol) log.Warnf("%s bid/ask maker is disabled", s.Symbol)
return nil return nil
} }
@ -1178,14 +1297,16 @@ func AdjustHedgeQuantityWithAvailableBalance(
return market.TruncateQuantity(quantity) return market.TruncateQuantity(quantity)
} }
func (s *Strategy) canDelayHedge(side types.SideType, pos fixedpoint.Value) bool { // canDelayHedge returns true if the hedge can be delayed
if !s.EnableDelayHedge { func (s *Strategy) canDelayHedge(hedgeSide types.SideType, pos fixedpoint.Value) bool {
if s.DelayedHedge == nil || !s.DelayedHedge.Enabled {
return false return false
} }
signal := s.lastAggregatedSignal.Get() signal := s.lastAggregatedSignal.Get()
if math.Abs(signal) < s.DelayHedgeSignalThreshold { signalAbs := math.Abs(signal)
if signalAbs < s.DelayedHedge.SignalThreshold {
return false return false
} }
@ -1195,8 +1316,21 @@ func (s *Strategy) canDelayHedge(side types.SideType, pos fixedpoint.Value) bool
return false return false
} }
if (signal > 0 && side == types.SideTypeSell) || (signal < 0 && side == types.SideTypeBuy) { var maxDelay = s.DelayedHedge.MaxDelayDuration.Duration()
if period < s.MaxDelayHedgeDuration.Duration() { var delay = s.DelayedHedge.FixedDelayDuration.Duration()
if s.DelayedHedge.DynamicDelayScale != nil {
if scale, _ := s.DelayedHedge.DynamicDelayScale.Scale(); scale != nil {
delay = time.Duration(scale.Call(signalAbs)) * time.Millisecond
}
}
if delay > maxDelay {
delay = maxDelay
}
if (signal > 0 && hedgeSide == types.SideTypeSell) || (signal < 0 && hedgeSide == types.SideTypeBuy) {
if period < delay {
s.logger.Infof("delay hedge enabled, signal %f is strong enough, waiting for the next tick to hedge %s quantity (max period %s)", signal, pos, s.MaxDelayHedgeDuration.Duration().String()) s.logger.Infof("delay hedge enabled, signal %f is strong enough, waiting for the next tick to hedge %s quantity (max period %s)", signal, pos, s.MaxDelayHedgeDuration.Duration().String())
delayHedgeCounterMetrics.With(s.metricsLabels).Inc() delayHedgeCounterMetrics.With(s.metricsLabels).Inc()
@ -1363,6 +1497,7 @@ func (s *Strategy) Defaults() error {
if s.BollBandMarginFactor.IsZero() { if s.BollBandMarginFactor.IsZero() {
s.BollBandMarginFactor = fixedpoint.One s.BollBandMarginFactor = fixedpoint.One
} }
if s.BollBandMargin.IsZero() { if s.BollBandMargin.IsZero() {
s.BollBandMargin = fixedpoint.NewFromFloat(0.001) s.BollBandMargin = fixedpoint.NewFromFloat(0.001)
} }
@ -1410,6 +1545,42 @@ func (s *Strategy) Defaults() error {
s.CircuitBreaker.SetMetricsInfo(ID, s.InstanceID(), s.Symbol) s.CircuitBreaker.SetMetricsInfo(ID, s.InstanceID(), s.Symbol)
} }
if s.EnableSignalMargin {
if s.SignalReverseSideMargin.Scale == nil {
s.SignalReverseSideMargin.Scale = &bbgo.SlideRule{
ExpScale: &bbgo.ExponentialScale{
Domain: [2]float64{0, 2.0},
Range: [2]float64{0.00010, 0.00500},
},
QuadraticScale: nil,
}
}
if s.SignalTrendSideMarginDiscount.Scale == nil {
s.SignalTrendSideMarginDiscount.Scale = &bbgo.SlideRule{
ExpScale: &bbgo.ExponentialScale{
Domain: [2]float64{0, 2.0},
Range: [2]float64{0.00010, 0.00500},
},
}
}
if s.SignalTrendSideMarginDiscount.Threshold == 0.0 {
s.SignalTrendSideMarginDiscount.Threshold = 1.0
}
}
if s.DelayedHedge != nil {
// default value protection for delayed hedge
if s.DelayedHedge.MaxDelayDuration == 0 {
s.DelayedHedge.MaxDelayDuration = types.Duration(3 * time.Second)
}
if s.DelayedHedge.SignalThreshold == 0.0 {
s.DelayedHedge.SignalThreshold = 0.5
}
}
// circuitBreakerAlertLimiter is for CircuitBreaker alerts // circuitBreakerAlertLimiter is for CircuitBreaker alerts
s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2) s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 1) s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 1)
@ -1579,6 +1750,10 @@ func (s *Strategy) CrossRun(
) error { ) error {
instanceID := s.InstanceID() instanceID := s.InstanceID()
configWriter := bytes.NewBuffer(nil)
s.PrintConfig(configWriter, true, false)
s.logger.Infof("config: %s", configWriter.String())
// configure sessions // configure sessions
sourceSession, ok := sessions[s.SourceExchange] sourceSession, ok := sessions[s.SourceExchange]
if !ok { if !ok {
@ -1761,13 +1936,17 @@ func (s *Strategy) CrossRun(
if s.EnableSignalMargin { if s.EnableSignalMargin {
s.logger.Infof("signal margin is enabled") s.logger.Infof("signal margin is enabled")
scale, err := s.SignalMarginScale.Scale() if s.SignalReverseSideMargin == nil || s.SignalReverseSideMargin.Scale == nil {
if err != nil { return errors.New("signalReverseSideMarginScale can not be nil when signal margin is enabled")
return err
} }
if solveErr := scale.Solve(); solveErr != nil { if s.SignalTrendSideMarginDiscount == nil || s.SignalTrendSideMarginDiscount.Scale == nil {
return solveErr return errors.New("signalTrendSideMarginScale can not be nil when signal margin is enabled")
}
scale, err := s.SignalReverseSideMargin.Scale.Scale()
if err != nil {
return err
} }
minAdditionalMargin := scale.Call(0.0) minAdditionalMargin := scale.Call(0.0)

View File

@ -81,6 +81,9 @@ func bindMockMarketDataStream(mockStream *mocks.MockStream, stream *types.Standa
mockStream.EXPECT().OnConnect(Catch(func(x any) { mockStream.EXPECT().OnConnect(Catch(func(x any) {
stream.OnConnect(x.(func())) stream.OnConnect(x.(func()))
})).AnyTimes() })).AnyTimes()
mockStream.EXPECT().OnDisconnect(Catch(func(x any) {
stream.OnDisconnect(x.(func()))
})).AnyTimes()
} }
func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) { func bindMockUserDataStream(mockStream *mocks.MockStream, stream *types.StandardStream) {

View File

@ -212,6 +212,10 @@ func (sb *StreamOrderBook) updateMetrics(t time.Time) {
} }
func (sb *StreamOrderBook) BindStream(stream Stream) { func (sb *StreamOrderBook) BindStream(stream Stream) {
stream.OnDisconnect(func() {
sb.Reset()
})
stream.OnBookSnapshot(func(book SliceOrderBook) { stream.OnBookSnapshot(func(book SliceOrderBook) {
if sb.MutexOrderBook.Symbol != book.Symbol { if sb.MutexOrderBook.Symbol != book.Symbol {
return return