Merge pull request #1770 from c9s/c9s/xmaker/improvements2

IMPROVE: [xmaker] add more stability improvements and refactoring
This commit is contained in:
c9s 2024-10-09 12:59:30 +08:00 committed by GitHub
commit f554d5f594
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 124 additions and 48 deletions

View File

@ -295,7 +295,7 @@ func (rule *SlideRule) Scale() (Scale, error) {
return rule.QuadraticScale, nil
}
return nil, errors.New("no any scale is defined")
return nil, fmt.Errorf("no any scale is defined, avaiable scales: log, exp, linear, quadratic")
}
// LayerScale defines the scale DSL for maker layers, e.g.,

View File

@ -9,7 +9,7 @@ import (
"github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/c9s/bbgo/pkg/util"
"github.com/c9s/bbgo/pkg/profile/timeprofile"
)
var log = logrus.WithField("component", "batch")
@ -56,7 +56,7 @@ func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, s
log.Debugf("batch querying %T: %v <=> %v", q.Type, startTime, endTime)
queryProfiler := util.StartTimeProfile("remoteQuery")
queryProfiler := timeprofile.Start("remoteQuery")
sliceInf, err := q.Q(startTime, endTime)
if err != nil {

View File

@ -1,34 +1,43 @@
package util
package timeprofile
import (
"time"
)
type logFunction func(format string, args ...interface{})
type TimeProfile struct {
Name string
StartTime, EndTime time.Time
Duration time.Duration
}
func StartTimeProfile(args ...string) TimeProfile {
func Start(args ...string) TimeProfile {
name := ""
if len(args) > 0 {
name = args[0]
}
return TimeProfile{StartTime: time.Now(), Name: name}
}
// TilNow returns the duration from the start time to now
func (p *TimeProfile) TilNow() time.Duration {
return time.Since(p.StartTime)
}
// Stop stops the time profile, set the end time and returns the duration
func (p *TimeProfile) Stop() time.Duration {
p.EndTime = time.Now()
p.Duration = p.EndTime.Sub(p.StartTime)
return p.Duration
}
type logFunction func(format string, args ...interface{})
// Do runs the function f and stops the time profile
func (p *TimeProfile) Do(f func()) {
defer p.Stop()
f()
}
func (p *TimeProfile) StopAndLog(f logFunction) {
duration := p.Stop()

View File

@ -18,10 +18,10 @@ import (
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/profile/timeprofile"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/style"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
//go:generate bash symbols.sh
@ -675,7 +675,7 @@ func (s *Strategy) iocOrderExecution(
}
func (s *Strategy) waitWebSocketOrderDone(ctx context.Context, orderID uint64, timeoutDuration time.Duration) (*types.Order, error) {
prof := util.StartTimeProfile("waitWebSocketOrderDone")
prof := timeprofile.Start("waitWebSocketOrderDone")
defer prof.StopAndLog(log.Infof)
if order, ok := s.orderStore.Get(orderID); ok {
@ -869,7 +869,7 @@ func (s *Strategy) calculateRanks(minRatio float64, method func(p *Path) float64
func waitForOrderFilled(
ctx context.Context, ex types.ExchangeOrderQueryService, order types.Order, timeout time.Duration,
) (*types.Order, error) {
prof := util.StartTimeProfile("waitForOrderFilled")
prof := timeprofile.Start("waitForOrderFilled")
defer prof.StopAndLog(log.Infof)
timeoutC := time.After(timeout)
@ -880,7 +880,7 @@ func waitForOrderFilled(
return nil, fmt.Errorf("order wait timeout %s", timeout)
default:
p := util.StartTimeProfile("queryOrder")
p := timeprofile.Start("queryOrder")
remoteOrder, err2 := ex.QueryOrder(ctx, types.OrderQuery{
Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),

View File

@ -2,6 +2,18 @@ package xmaker
import "github.com/prometheus/client_golang/prometheus"
var cancelOrderDurationMetrics = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmaker_cancel_order_duration_milliseconds",
Help: "cancel order duration in milliseconds",
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
var makerOrderPlacementDurationMetrics = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "xmaker_maker_order_placement_duration_milliseconds",
Help: "maker order placement duration in milliseconds",
}, []string{"strategy_type", "strategy_id", "exchange", "symbol"})
var openOrderBidExposureInUsdMetrics = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "xmaker_open_order_bid_exposure_in_usd",
@ -77,6 +89,8 @@ func init() {
bidMarginMetrics,
askMarginMetrics,
aggregatedSignalMetrics,
cancelOrderDurationMetrics,
makerOrderPlacementDurationMetrics,
configNumOfLayersMetrics,
configMaxExposureMetrics,
configBidMarginMetrics,

View File

@ -21,6 +21,10 @@ func init() {
prometheus.MustRegister(orderBookSignalMetrics)
}
type StreamBookSetter interface {
SetStreamBook(book *types.StreamOrderBook)
}
type OrderBookBestPriceVolumeSignal struct {
RatioThreshold fixedpoint.Value `json:"ratioThreshold"`
MinVolume fixedpoint.Value `json:"minVolume"`
@ -29,7 +33,7 @@ type OrderBookBestPriceVolumeSignal struct {
book *types.StreamOrderBook
}
func (s *OrderBookBestPriceVolumeSignal) BindStreamBook(book *types.StreamOrderBook) {
func (s *OrderBookBestPriceVolumeSignal) SetStreamBook(book *types.StreamOrderBook) {
s.book = book
}
@ -65,7 +69,13 @@ func (s *OrderBookBestPriceVolumeSignal) CalculateSignal(ctx context.Context) (f
signal = -numerator.Div(denominator).Float64()
}
log.Infof("[OrderBookBestPriceVolumeSignal] %f bid/ask = %f/%f", signal, bid.Volume.Float64(), ask.Volume.Float64())
log.Infof("[OrderBookBestPriceVolumeSignal] %f bid/ask = %f/%f, bid ratio = %f, ratio threshold = %f",
signal,
bid.Volume.Float64(),
ask.Volume.Float64(),
bidRatio.Float64(),
s.RatioThreshold.Float64(),
)
orderBookSignalMetrics.WithLabelValues(s.symbol).Set(signal)
return signal, nil

View File

@ -31,7 +31,7 @@ type DepthRatioSignal struct {
book *types.StreamOrderBook
}
func (s *DepthRatioSignal) BindStreamBook(book *types.StreamOrderBook) {
func (s *DepthRatioSignal) SetStreamBook(book *types.StreamOrderBook) {
s.book = book
}

View File

@ -17,6 +17,7 @@ import (
"github.com/c9s/bbgo/pkg/fixedpoint"
indicatorv2 "github.com/c9s/bbgo/pkg/indicator/v2"
"github.com/c9s/bbgo/pkg/pricesolver"
"github.com/c9s/bbgo/pkg/profile/timeprofile"
"github.com/c9s/bbgo/pkg/risk/circuitbreaker"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
@ -34,6 +35,24 @@ const ID = "xmaker"
var log = logrus.WithField("strategy", ID)
type MutexFloat64 struct {
value float64
mu sync.Mutex
}
func (m *MutexFloat64) Set(v float64) {
m.mu.Lock()
m.value = v
m.mu.Unlock()
}
func (m *MutexFloat64) Get() float64 {
m.mu.Lock()
v := m.value
m.mu.Unlock()
return v
}
type Quote struct {
BestBidPrice, BestAskPrice fixedpoint.Value
@ -71,6 +90,20 @@ type SignalConfig struct {
TradeVolumeWindowSignal *TradeVolumeWindowSignal `json:"tradeVolumeWindow,omitempty"`
}
func (c *SignalConfig) Get() SignalProvider {
if c.OrderBookBestPriceSignal != nil {
return c.OrderBookBestPriceSignal
} else if c.DepthRatioSignal != nil {
return c.DepthRatioSignal
} else if c.BollingerBandTrendSignal != nil {
return c.BollingerBandTrendSignal
} else if c.TradeVolumeWindowSignal != nil {
return c.TradeVolumeWindowSignal
}
panic(fmt.Errorf("no valid signal provider found, please check your config"))
}
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
}
@ -193,6 +226,10 @@ type Strategy struct {
metricsLabels prometheus.Labels
connectivityGroup *types.ConnectivityGroup
// lastAggregatedSignal stores the last aggregated signal with mutex
// TODO: use float64 series instead, so that we can store history signal values
lastAggregatedSignal MutexFloat64
}
func (s *Strategy) ID() string {
@ -304,6 +341,7 @@ func (s *Strategy) applySignalMargin(ctx context.Context, quote *Quote) error {
return err
}
s.lastAggregatedSignal.Set(signal)
s.logger.Infof("aggregated signal: %f", signal)
if signal == 0.0 {
@ -398,18 +436,9 @@ func (s *Strategy) applyBollingerMargin(
func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) {
sum := 0.0
voters := 0.0
for _, signal := range s.SignalConfigList {
var sig float64
var err error
if signal.OrderBookBestPriceSignal != nil {
sig, err = signal.OrderBookBestPriceSignal.CalculateSignal(ctx)
} else if signal.DepthRatioSignal != nil {
sig, err = signal.DepthRatioSignal.CalculateSignal(ctx)
} else if signal.BollingerBandTrendSignal != nil {
sig, err = signal.BollingerBandTrendSignal.CalculateSignal(ctx)
} else if signal.TradeVolumeWindowSignal != nil {
sig, err = signal.TradeVolumeWindowSignal.CalculateSignal(ctx)
}
for _, signalConfig := range s.SignalConfigList {
signalProvider := signalConfig.Get()
sig, err := signalProvider.CalculateSignal(ctx)
if err != nil {
return 0, err
@ -417,9 +446,9 @@ func (s *Strategy) aggregateSignal(ctx context.Context) (float64, error) {
continue
}
if signal.Weight > 0.0 {
sum += sig * signal.Weight
voters += signal.Weight
if signalConfig.Weight > 0.0 {
sum += sig * signalConfig.Weight
voters += signalConfig.Weight
} else {
sum += sig
voters++
@ -577,6 +606,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect()
return err
}
@ -584,6 +616,9 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
s.logger.WithError(err).Errorf("quote update error, %s price not updating, order book last update: %s ago",
s.Symbol,
time.Since(bookLastUpdateTime))
s.sourceBook.Reset()
s.sourceSession.MarketDataStream.Reconnect()
return err
}
@ -914,12 +949,15 @@ func (s *Strategy) updateQuote(ctx context.Context) error {
defer s.tradeCollector.Process()
makerOrderPlacementProfile := timeprofile.Start("makerOrderPlacement")
createdOrders, errIdx, err := bbgo.BatchPlaceOrder(ctx, s.makerSession.Exchange, s.makerOrderCreateCallback, formattedOrders...)
if err != nil {
log.WithError(err).Errorf("unable to place maker orders: %+v", formattedOrders)
return err
}
makerOrderPlacementDurationMetrics.With(s.metricsLabels).Observe(float64(makerOrderPlacementProfile.Stop().Milliseconds()))
openOrderBidExposureInUsdMetrics.With(s.metricsLabels).Set(bidExposureInUsd.Float64())
openOrderAskExposureInUsdMetrics.With(s.metricsLabels).Set(askExposureInUsd.Float64())
@ -1568,32 +1606,36 @@ func (s *Strategy) CrossRun(
s.sourceBook.BindStream(s.sourceSession.MarketDataStream)
if s.EnableSignalMargin {
s.logger.Infof("signal margin is enabled")
scale, err := s.SignalMarginScale.Scale()
if err != nil {
return err
}
if solveErr := scale.Solve(); solveErr != nil {
return solveErr
}
minAdditionalMargin := scale.Call(0.0)
middleAdditionalMargin := scale.Call(1.0)
maxAdditionalMargin := scale.Call(2.0)
s.logger.Infof("signal margin range: %.2f%% @ 0.0 ~ %.2f%% @ 1.0 ~ %.2f%% @ 2.0",
minAdditionalMargin*100.0,
middleAdditionalMargin*100.0,
maxAdditionalMargin*100.0)
}
for _, signalConfig := range s.SignalConfigList {
if signalConfig.OrderBookBestPriceSignal != nil {
signalConfig.OrderBookBestPriceSignal.book = s.sourceBook
if err := signalConfig.OrderBookBestPriceSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err
}
} else if signalConfig.DepthRatioSignal != nil {
signalConfig.DepthRatioSignal.book = s.sourceBook
if err := signalConfig.DepthRatioSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err
}
} else if signalConfig.BollingerBandTrendSignal != nil {
if err := signalConfig.BollingerBandTrendSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err
}
} else if signalConfig.TradeVolumeWindowSignal != nil {
if err := signalConfig.TradeVolumeWindowSignal.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
signal := signalConfig.Get()
if setter, ok := signal.(StreamBookSetter); ok {
s.logger.Infof("setting stream book on signal %T", signal)
setter.SetStreamBook(s.sourceBook)
}
if binder, ok := signal.(SessionBinder); ok {
s.logger.Infof("binding session on signal %T", signal)
if err := binder.Bind(ctx, s.sourceSession, s.Symbol); err != nil {
return err
}
}
@ -1660,11 +1702,8 @@ func (s *Strategy) CrossRun(
s.connectivityGroup = types.NewConnectivityGroup(sourceConnectivity)
if s.RecoverTrade {
go s.tradeRecover(ctx)
}
go func() {
s.logger.Infof("waiting for authentication connections to be ready...")
select {
case <-ctx.Done():
case <-s.connectivityGroup.AllAuthedC(ctx, 15*time.Second):
@ -1675,6 +1714,10 @@ func (s *Strategy) CrossRun(
go s.accountUpdater(ctx)
go s.hedgeWorker(ctx)
go s.quoteWorker(ctx)
if s.RecoverTrade {
go s.tradeRecover(ctx)
}
}()
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {