diff --git a/pkg/strategy/dca2/metrics.go b/pkg/strategy/dca2/metrics.go new file mode 100644 index 000000000..7fcad2011 --- /dev/null +++ b/pkg/strategy/dca2/metrics.go @@ -0,0 +1,116 @@ +package dca2 + +import ( + "strconv" + + "github.com/prometheus/client_golang/prometheus" +) + +var ( + metricsState *prometheus.GaugeVec + metricsNumOfActiveOrders *prometheus.GaugeVec + metricsNumOfOpenOrders *prometheus.GaugeVec + metricsProfit *prometheus.GaugeVec +) + +func labelKeys(labels prometheus.Labels) []string { + var keys []string + for k := range labels { + keys = append(keys, k) + } + + return keys +} + +func mergeLabels(a, b prometheus.Labels) prometheus.Labels { + labels := prometheus.Labels{} + for k, v := range a { + labels[k] = v + } + + for k, v := range b { + labels[k] = v + } + return labels +} + +func initMetrics(extendedLabels []string) { + if metricsState == nil { + metricsState = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_dca2_state", + Help: "state of this DCA2 strategy", + }, + append([]string{ + "exchange", + "symbol", + }, extendedLabels...), + ) + } + + if metricsNumOfActiveOrders == nil { + metricsNumOfActiveOrders = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_dca2_num_of_active_orders", + Help: "number of active orders", + }, + append([]string{ + "exchange", + "symbol", + }, extendedLabels...), + ) + } + + if metricsNumOfOpenOrders == nil { + metricsNumOfOpenOrders = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_dca2_num_of_open_orders", + Help: "number of open orders", + }, + append([]string{ + "exchange", + "symbol", + }, extendedLabels...), + ) + } + + if metricsProfit == nil { + metricsProfit = prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "bbgo_dca2_profit", + Help: "profit of this DCA@ strategy", + }, + append([]string{ + "exchange", + "symbol", + "round", + }, extendedLabels...), + ) + } +} + +var metricsRegistered = false + +func registerMetrics() { + if metricsRegistered { + return + } + + initMetrics(nil) + + prometheus.MustRegister( + metricsState, + metricsNumOfActiveOrders, + metricsNumOfOpenOrders, + metricsProfit, + ) + + metricsRegistered = true +} + +func updateProfitMetrics(round int64, profit float64) { + labels := mergeLabels(baseLabels, prometheus.Labels{ + "round": strconv.FormatInt(round, 10), + }) + metricsProfit.With(labels).Set(profit) +} diff --git a/pkg/strategy/dca2/recover.go b/pkg/strategy/dca2/recover.go index d114c40e0..ae192d9f8 100644 --- a/pkg/strategy/dca2/recover.go +++ b/pkg/strategy/dca2/recover.go @@ -67,7 +67,7 @@ func (s *Strategy) recover(ctx context.Context) error { if err != nil { return err } - s.state = state + s.updateState(state) s.logger.Info("recover stats DONE") return nil diff --git a/pkg/strategy/dca2/state.go b/pkg/strategy/dca2/state.go index 227661dc6..939e4481a 100644 --- a/pkg/strategy/dca2/state.go +++ b/pkg/strategy/dca2/state.go @@ -46,6 +46,13 @@ func (s *Strategy) initializeNextStateC() bool { return isInitialize } +func (s *Strategy) updateState(state State) { + s.state = state + + s.logger.Infof("[state] update state to %d", state) + metricsState.With(baseLabels).Set(float64(s.state)) +} + func (s *Strategy) emitNextState(nextState State) { select { case s.nextStateC <- nextState: @@ -63,17 +70,22 @@ func (s *Strategy) emitNextState(nextState State) { // TakeProfitReady -> the takeProfit order filled -> func (s *Strategy) runState(ctx context.Context) { s.logger.Info("[DCA] runState") - ticker := time.NewTicker(5 * time.Second) - defer ticker.Stop() + stateTriggerTicker := time.NewTicker(5 * time.Second) + defer stateTriggerTicker.Stop() + + monitorTicker := time.NewTicker(10 * time.Minute) + defer monitorTicker.Stop() for { select { case <-ctx.Done(): s.logger.Info("[DCA] runState DONE") return - case <-ticker.C: - s.logger.Infof("[DCA] triggerNextState current state: %d", s.state) + case <-stateTriggerTicker.C: + // s.logger.Infof("[DCA] triggerNextState current state: %d", s.state) s.triggerNextState() + case <-monitorTicker.C: + s.updateNumOfOrdersMetrics(ctx) case nextState := <-s.nextStateC: s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState) @@ -131,7 +143,7 @@ func (s *Strategy) runWaitToOpenPositionState(ctx context.Context, next State) { return } - s.state = PositionOpening + s.updateState(PositionOpening) s.logger.Info("[State] WaitToOpenPosition -> PositionOpening") } @@ -141,17 +153,17 @@ func (s *Strategy) runPositionOpening(ctx context.Context, next State) { s.logger.WithError(err).Error("failed to place dca orders, please check it.") return } - s.state = OpenPositionReady + s.updateState(OpenPositionReady) s.logger.Info("[State] PositionOpening -> OpenPositionReady") } func (s *Strategy) runOpenPositionReady(_ context.Context, next State) { - s.state = OpenPositionOrderFilled + s.updateState(OpenPositionOrderFilled) s.logger.Info("[State] OpenPositionReady -> OpenPositionOrderFilled") } func (s *Strategy) runOpenPositionOrderFilled(_ context.Context, next State) { - s.state = OpenPositionOrdersCancelling + s.updateState(OpenPositionOrdersCancelling) s.logger.Info("[State] OpenPositionOrderFilled -> OpenPositionOrdersCancelling") // after open position cancelling, immediately trigger open position cancelled to cancel the other orders @@ -164,7 +176,7 @@ func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next Sta s.logger.WithError(err).Error("failed to cancel maker orders") return } - s.state = OpenPositionOrdersCancelled + s.updateState(OpenPositionOrdersCancelled) s.logger.Info("[State] OpenPositionOrdersCancelling -> OpenPositionOrdersCancelled") // after open position cancelled, immediately trigger take profit ready to open take-profit order @@ -177,7 +189,7 @@ func (s *Strategy) runOpenPositionOrdersCancelled(ctx context.Context, next Stat s.logger.WithError(err).Error("failed to open take profit orders") return } - s.state = TakeProfitReady + s.updateState(TakeProfitReady) s.logger.Info("[State] OpenPositionOrdersCancelled -> TakeProfitReady") } @@ -200,6 +212,6 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) { // set the start time of the next round s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration()) - s.state = WaitToOpenPosition + s.updateState(WaitToOpenPosition) s.logger.Info("[State] TakeProfitReady -> WaitToOpenPosition") } diff --git a/pkg/strategy/dca2/strategy.go b/pkg/strategy/dca2/strategy.go index cf673fa1f..dbbdc77b8 100644 --- a/pkg/strategy/dca2/strategy.go +++ b/pkg/strategy/dca2/strategy.go @@ -21,11 +21,15 @@ import ( "github.com/c9s/bbgo/pkg/util/tradingutil" ) -const ID = "dca2" +const ( + ID = "dca2" + orderTag = "dca2" +) -const orderTag = "dca2" - -var log = logrus.WithField("strategy", ID) +var ( + log = logrus.WithField("strategy", ID) + baseLabels prometheus.Labels +) func init() { bbgo.RegisterStrategy(ID, &Strategy{}) @@ -119,6 +123,7 @@ func (s *Strategy) Defaults() error { s.LogFields["symbol"] = s.Symbol s.LogFields["strategy"] = ID + return nil } @@ -135,6 +140,23 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: types.Interval1m}) } +func (s *Strategy) newPrometheusLabels() prometheus.Labels { + labels := prometheus.Labels{ + "exchange": "default", + "symbol": s.Symbol, + } + + if s.Session != nil { + labels["exchange"] = s.Session.Name + } + + if s.PrometheusLabels == nil { + return labels + } + + return mergeLabels(s.PrometheusLabels, labels) +} + func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error { instanceID := s.InstanceID() s.Session = session @@ -146,6 +168,15 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.Position = types.NewPositionFromMarket(s.Market) } + // prometheus + if s.PrometheusLabels != nil { + initMetrics(labelKeys(s.PrometheusLabels)) + } + registerMetrics() + + // prometheus labels + baseLabels = s.newPrometheusLabels() + // if dev mode is on and it's not a new strategy if s.DevMode != nil && s.DevMode.Enabled && !s.DevMode.IsNewAccount { s.ProfitStats = newProfitStats(s.Market, s.QuoteInvestment) @@ -192,6 +223,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. default: s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o) } + + // update metrics when filled + s.updateNumOfOrdersMetrics(ctx) }) session.MarketDataStream.OnKLine(func(kline types.KLine) { @@ -218,7 +252,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. // 1. recoverWhenStart is false // 2. dev mode is on and it's not new strategy if !s.RecoverWhenStart || (s.DevMode != nil && s.DevMode.Enabled && !s.DevMode.IsNewAccount) { - s.state = WaitToOpenPosition + s.updateState(WaitToOpenPosition) } else { // recover if err := s.recover(ctx); err != nil { @@ -400,9 +434,23 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error { // emit profit s.EmitProfit(s.ProfitStats) + updateProfitMetrics(s.ProfitStats.Round, s.ProfitStats.CurrentRoundProfit.Float64()) s.ProfitStats.NewRound() } return nil } + +func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) { + // update open orders metrics + openOrders, err := s.Session.Exchange.QueryOpenOrders(ctx, s.Symbol) + if err != nil { + s.logger.WithError(err).Warn("failed to query open orders to update num of the orders metrics") + } else { + metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders))) + } + + // update active orders metrics + metricsNumOfActiveOrders.With(baseLabels).Set(float64(s.OrderExecutor.ActiveMakerOrders().NumOfOrders())) +}