mirror of
https://github.com/c9s/bbgo.git
synced 2024-09-20 08:11:08 +00:00
Merge pull request #1709 from c9s/c9s/xmaker/improve-profit-ticker
IMPROVE: [xmaker] improve profit stats ticker and integrate rate limiter
This commit is contained in:
commit
349a3040f3
|
@ -22,9 +22,6 @@ import (
|
||||||
var defaultMargin = fixedpoint.NewFromFloat(0.003)
|
var defaultMargin = fixedpoint.NewFromFloat(0.003)
|
||||||
var Two = fixedpoint.NewFromInt(2)
|
var Two = fixedpoint.NewFromInt(2)
|
||||||
|
|
||||||
// circuitBreakerAlertLimiter is for CircuitBreaker alerts
|
|
||||||
var circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
|
|
||||||
|
|
||||||
const priceUpdateTimeout = 30 * time.Second
|
const priceUpdateTimeout = 30 * time.Second
|
||||||
|
|
||||||
const ID = "xmaker"
|
const ID = "xmaker"
|
||||||
|
@ -124,6 +121,9 @@ type Strategy struct {
|
||||||
groupID uint32
|
groupID uint32
|
||||||
|
|
||||||
stopC chan struct{}
|
stopC chan struct{}
|
||||||
|
|
||||||
|
reportProfitStatsRateLimiter *rate.Limiter
|
||||||
|
circuitBreakerAlertLimiter *rate.Limiter
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Strategy) ID() string {
|
func (s *Strategy) ID() string {
|
||||||
|
@ -198,7 +198,7 @@ func (s *Strategy) updateQuote(ctx context.Context, orderExecutionRouter bbgo.Or
|
||||||
if reason, halted := s.CircuitBreaker.IsHalted(now); halted {
|
if reason, halted := s.CircuitBreaker.IsHalted(now); halted {
|
||||||
log.Warnf("[arbWorker] strategy is halted, reason: %s", reason)
|
log.Warnf("[arbWorker] strategy is halted, reason: %s", reason)
|
||||||
|
|
||||||
if circuitBreakerAlertLimiter.AllowN(now, 1) {
|
if s.circuitBreakerAlertLimiter.AllowN(now, 1) {
|
||||||
bbgo.Notify("Strategy is halted, reason: %s", reason)
|
bbgo.Notify("Strategy is halted, reason: %s", reason)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -587,34 +587,47 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
|
||||||
s.hedgeErrorRateReservation = nil
|
s.hedgeErrorRateReservation = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
|
|
||||||
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
|
bbgo.Notify("Submitting %s hedge order %s %v", s.Symbol, side.String(), quantity)
|
||||||
|
|
||||||
// TODO: improve order executor
|
submitOrders := []types.SubmitOrder{
|
||||||
orderExecutor := &bbgo.ExchangeOrderExecutor{Session: s.sourceSession}
|
{
|
||||||
returnOrders, err := orderExecutor.SubmitOrders(ctx, types.SubmitOrder{
|
Market: s.sourceMarket,
|
||||||
Market: s.sourceMarket,
|
Symbol: s.Symbol,
|
||||||
Symbol: s.Symbol,
|
Type: types.OrderTypeMarket,
|
||||||
Type: types.OrderTypeMarket,
|
Side: side,
|
||||||
Side: side,
|
Quantity: quantity,
|
||||||
Quantity: quantity,
|
MarginSideEffect: types.SideEffectTypeMarginBuy,
|
||||||
MarginSideEffect: types.SideEffectTypeMarginBuy,
|
},
|
||||||
})
|
}
|
||||||
|
|
||||||
|
formattedOrders, err := s.sourceSession.FormatOrders(submitOrders)
|
||||||
|
if err != nil {
|
||||||
|
log.WithError(err).Errorf("unable to format hedge orders")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
orderCreateCallback := func(createdOrder types.Order) {
|
||||||
|
s.orderStore.Add(createdOrder)
|
||||||
|
s.activeMakerOrders.Add(createdOrder)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer s.tradeCollector.Process()
|
||||||
|
|
||||||
|
createdOrders, _, err := bbgo.BatchPlaceOrder(ctx, s.sourceSession.Exchange, orderCreateCallback, formattedOrders...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.hedgeErrorRateReservation = s.hedgeErrorLimiter.Reserve()
|
s.hedgeErrorRateReservation = s.hedgeErrorLimiter.Reserve()
|
||||||
log.WithError(err).Errorf("market order submit error: %s", err.Error())
|
log.WithError(err).Errorf("market order submit error: %s", err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// if it's selling, than we should add positive position
|
log.Infof("submitted hedge orders: %+v", createdOrders)
|
||||||
|
|
||||||
|
// if it's selling, then we should add a positive position
|
||||||
if side == types.SideTypeSell {
|
if side == types.SideTypeSell {
|
||||||
s.CoveredPosition = s.CoveredPosition.Add(quantity)
|
s.CoveredPosition = s.CoveredPosition.Add(quantity)
|
||||||
} else {
|
} else {
|
||||||
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
|
s.CoveredPosition = s.CoveredPosition.Add(quantity.Neg())
|
||||||
}
|
}
|
||||||
|
|
||||||
s.orderStore.Add(returnOrders...)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Strategy) tradeRecover(ctx context.Context) {
|
func (s *Strategy) tradeRecover(ctx context.Context) {
|
||||||
|
@ -656,6 +669,9 @@ func (s *Strategy) Defaults() error {
|
||||||
s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID())
|
s.CircuitBreaker = circuitbreaker.NewBasicCircuitBreaker(ID, s.InstanceID())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// circuitBreakerAlertLimiter is for CircuitBreaker alerts
|
||||||
|
s.circuitBreakerAlertLimiter = rate.NewLimiter(rate.Every(3*time.Minute), 2)
|
||||||
|
s.reportProfitStatsRateLimiter = rate.NewLimiter(rate.Every(5*time.Minute), 1)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -770,9 +786,8 @@ func (s *Strategy) CrossRun(
|
||||||
|
|
||||||
if s.Position == nil {
|
if s.Position == nil {
|
||||||
s.Position = types.NewPositionFromMarket(s.makerMarket)
|
s.Position = types.NewPositionFromMarket(s.makerMarket)
|
||||||
|
s.Position.Strategy = ID
|
||||||
// force update for legacy code
|
s.Position.StrategyInstanceID = instanceID
|
||||||
s.Position.Market = s.makerMarket
|
|
||||||
}
|
}
|
||||||
|
|
||||||
bbgo.Notify("xmaker: %s position is restored", s.Symbol, s.Position)
|
bbgo.Notify("xmaker: %s position is restored", s.Symbol, s.Position)
|
||||||
|
@ -832,29 +847,28 @@ func (s *Strategy) CrossRun(
|
||||||
|
|
||||||
if profit.Compare(fixedpoint.Zero) == 0 {
|
if profit.Compare(fixedpoint.Zero) == 0 {
|
||||||
s.Environment.RecordPosition(s.Position, trade, nil)
|
s.Environment.RecordPosition(s.Position, trade, nil)
|
||||||
} else {
|
|
||||||
log.Infof("%s generated profit: %v", s.Symbol, profit)
|
|
||||||
|
|
||||||
p := s.Position.NewProfit(trade, profit, netProfit)
|
|
||||||
p.Strategy = ID
|
|
||||||
p.StrategyInstanceID = instanceID
|
|
||||||
bbgo.Notify(&p)
|
|
||||||
s.ProfitStats.AddProfit(p)
|
|
||||||
|
|
||||||
s.Environment.RecordPosition(s.Position, trade, &p)
|
|
||||||
|
|
||||||
if s.CircuitBreaker != nil {
|
|
||||||
s.CircuitBreaker.RecordProfit(profit, trade.Time.Time())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
s.tradeCollector.OnProfit(func(trade types.Trade, profit *types.Profit) {
|
||||||
|
if s.CircuitBreaker != nil {
|
||||||
|
s.CircuitBreaker.RecordProfit(profit.Profit, trade.Time.Time())
|
||||||
|
}
|
||||||
|
|
||||||
|
bbgo.Notify(profit)
|
||||||
|
|
||||||
|
s.ProfitStats.AddProfit(*profit)
|
||||||
|
s.Environment.RecordPosition(s.Position, trade, profit)
|
||||||
|
})
|
||||||
|
|
||||||
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
|
s.tradeCollector.OnPositionUpdate(func(position *types.Position) {
|
||||||
bbgo.Notify(position)
|
bbgo.Notify(position)
|
||||||
})
|
})
|
||||||
s.tradeCollector.OnRecover(func(trade types.Trade) {
|
s.tradeCollector.OnRecover(func(trade types.Trade) {
|
||||||
bbgo.Notify("Recovered trade", trade)
|
bbgo.Notify("Recovered trade", trade)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// bind two user data streams so that we can collect the trades together
|
||||||
s.tradeCollector.BindStream(s.sourceSession.UserDataStream)
|
s.tradeCollector.BindStream(s.sourceSession.UserDataStream)
|
||||||
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
|
s.tradeCollector.BindStream(s.makerSession.UserDataStream)
|
||||||
|
|
||||||
|
@ -865,15 +879,12 @@ func (s *Strategy) CrossRun(
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
posTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
|
hedgeTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
|
||||||
defer posTicker.Stop()
|
defer hedgeTicker.Stop()
|
||||||
|
|
||||||
quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
|
quoteTicker := time.NewTicker(util.MillisecondsJitter(s.UpdateInterval.Duration(), 200))
|
||||||
defer quoteTicker.Stop()
|
defer quoteTicker.Stop()
|
||||||
|
|
||||||
reportTicker := time.NewTicker(time.Hour)
|
|
||||||
defer reportTicker.Stop()
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil {
|
if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil {
|
||||||
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
|
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
|
||||||
|
@ -894,10 +905,7 @@ func (s *Strategy) CrossRun(
|
||||||
case <-quoteTicker.C:
|
case <-quoteTicker.C:
|
||||||
s.updateQuote(ctx, orderExecutionRouter)
|
s.updateQuote(ctx, orderExecutionRouter)
|
||||||
|
|
||||||
case <-reportTicker.C:
|
case <-hedgeTicker.C:
|
||||||
bbgo.Notify(s.ProfitStats)
|
|
||||||
|
|
||||||
case <-posTicker.C:
|
|
||||||
// For positive position and positive covered position:
|
// For positive position and positive covered position:
|
||||||
// uncover position = +5 - +3 (covered position) = 2
|
// uncover position = +5 - +3 (covered position) = 2
|
||||||
//
|
//
|
||||||
|
@ -924,6 +932,10 @@ func (s *Strategy) CrossRun(
|
||||||
|
|
||||||
s.Hedge(ctx, uncoverPosition.Neg())
|
s.Hedge(ctx, uncoverPosition.Neg())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if s.reportProfitStatsRateLimiter.Allow() {
|
||||||
|
bbgo.Notify(s.ProfitStats)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in New Issue
Block a user