Merge pull request #1540 from c9s/kbearXD/dca2/monitor-metrics

This commit is contained in:
c9s 2024-03-05 10:09:14 +08:00 committed by GitHub
commit 88a55793b5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 454 additions and 183 deletions

View File

@ -1,6 +1,9 @@
package exchange
import "github.com/c9s/bbgo/pkg/types"
import (
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/types"
)
func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsolated bool, isolatedSymbol string) {
if marginExchange, ok := exchange.(types.MarginExchange); ok {
@ -27,3 +30,8 @@ func GetSessionAttributes(exchange types.Exchange) (isMargin, isFutures, isIsola
return isMargin, isFutures, isIsolated, isolatedSymbol
}
func IsMaxExchange(exchange interface{}) bool {
_, res := exchange.(*max.Exchange)
return res
}

100
pkg/strategy/common/sync.go Normal file
View File

@ -0,0 +1,100 @@
package common
import (
"context"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/types"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
)
func SyncActiveOrder(ctx context.Context, ex types.Exchange, orderQueryService types.ExchangeOrderQueryService, activeOrderBook *bbgo.ActiveOrderBook, orderID uint64, syncBefore time.Time) (isOrderUpdated bool, err error) {
isMax := exchange.IsMaxExchange(ex)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: activeOrderBook.Symbol,
OrderID: strconv.FormatUint(orderID, 10),
})
if err != nil {
return isOrderUpdated, err
}
// maxapi.OrderStateFinalizing does not mean the fee is calculated
// we should only consider order state done for MAX
if isMax && updatedOrder.OriginalStatus != string(maxapi.OrderStateDone) {
return isOrderUpdated, nil
}
// should only trigger order update when the updated time is old enough
isOrderUpdated = updatedOrder.UpdateTime.Before(syncBefore)
if isOrderUpdated {
activeOrderBook.Update(*updatedOrder)
}
return isOrderUpdated, nil
}
type SyncActiveOrdersOpts struct {
Logger *logrus.Entry
Exchange types.Exchange
OrderQueryService types.ExchangeOrderQueryService
ActiveOrderBook *bbgo.ActiveOrderBook
OpenOrders []types.Order
}
func SyncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.Logger.Infof("[ActiveOrderRecover] syncActiveOrders")
// only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice
syncBefore := time.Now().Add(-3 * time.Minute)
activeOrders := opts.ActiveOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range opts.OpenOrders {
openOrdersMap[openOrder.OrderID] = openOrder
}
var errs error
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID)
} else {
opts.Logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
isActiveOrderBookUpdated, err := SyncActiveOrder(ctx, opts.Exchange, opts.OrderQueryService, opts.ActiveOrderBook, activeOrder.OrderID, syncBefore)
if err != nil {
opts.Logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue
}
if !isActiveOrderBookUpdated {
opts.Logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID)
}
}
}
// update open orders not in active orders
for _, openOrder := range openOrdersMap {
opts.Logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID)
// we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice.
if openOrder.UpdateTime.After(syncBefore) {
opts.Logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID)
continue
}
opts.ActiveOrderBook.Add(openOrder)
}
return errs
}

View File

@ -1,4 +1,4 @@
package grid2
package common
import (
"context"
@ -10,7 +10,7 @@ import (
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/types/mocks"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)
@ -23,34 +23,30 @@ func TestSyncActiveOrders(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
log := logrus.WithField("strategy", "test")
symbol := "ETHUSDT"
labels := prometheus.Labels{
"exchange": "default",
"symbol": symbol,
}
t.Run("all open orders are match with active orderbook", func(t *testing.T) {
mockOrderQueryService := mocks.NewMockExchangeOrderQueryService(mockCtrl)
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
}
order.Symbol = symbol
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
opts := SyncActiveOrdersOpts{
Logger: log,
Exchange: mockExchange,
OrderQueryService: mockOrderQueryService,
ActiveOrderBook: activeOrderbook,
OpenOrders: []types.Order{order},
}
assert.NoError(syncActiveOrders(ctx, opts))
activeOrderbook.Add(order)
assert.NoError(SyncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
@ -64,14 +60,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
@ -82,14 +70,21 @@ func TestSyncActiveOrders(t *testing.T) {
updatedOrder := order
updatedOrder.Status = types.OrderStatusFilled
opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: nil,
}
activeOrderbook.Add(order)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return(nil, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order.OrderID, 10),
}).Return(&updatedOrder, nil)
assert.NoError(syncActiveOrders(ctx, opts))
assert.NoError(SyncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
@ -101,14 +96,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
@ -118,8 +105,14 @@ func TestSyncActiveOrders(t *testing.T) {
CreationTime: types.Time(time.Now()),
}
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order}, nil)
assert.NoError(syncActiveOrders(ctx, opts))
opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: []types.Order{order},
}
assert.NoError(SyncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()
@ -133,14 +126,6 @@ func TestSyncActiveOrders(t *testing.T) {
mockExchange := mocks.NewMockExchange(mockCtrl)
activeOrderbook := bbgo.NewActiveOrderBook(symbol)
opts := SyncActiveOrdersOpts{
logger: log,
metricsLabels: labels,
activeOrderBook: activeOrderbook,
orderQueryService: mockOrderQueryService,
exchange: mockExchange,
}
order1 := types.Order{
OrderID: 1,
Status: types.OrderStatusNew,
@ -158,14 +143,21 @@ func TestSyncActiveOrders(t *testing.T) {
},
}
opts := SyncActiveOrdersOpts{
Logger: log,
ActiveOrderBook: activeOrderbook,
OrderQueryService: mockOrderQueryService,
Exchange: mockExchange,
OpenOrders: []types.Order{order2},
}
activeOrderbook.Add(order1)
mockExchange.EXPECT().QueryOpenOrders(ctx, symbol).Return([]types.Order{order2}, nil)
mockOrderQueryService.EXPECT().QueryOrder(ctx, types.OrderQuery{
Symbol: symbol,
OrderID: strconv.FormatUint(order1.OrderID, 10),
}).Return(&updatedOrder1, nil)
assert.NoError(syncActiveOrders(ctx, opts))
assert.NoError(SyncActiveOrders(ctx, opts))
// verify active orderbook
activeOrders := activeOrderbook.Orders()

View File

@ -0,0 +1,57 @@
package dca2
import (
"context"
"time"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/util"
)
func (s *Strategy) recoverPeriodically(ctx context.Context) {
s.logger.Info("[DCA] monitor and recover periodically")
interval := util.MillisecondsJitter(10*time.Minute, 5*60*1000)
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := s.recoverActiveOrders(ctx); err != nil {
s.logger.WithError(err).Warn(err, "failed to recover active orders")
}
}
}
}
func (s *Strategy) recoverActiveOrders(ctx context.Context) error {
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Warn("failed to query open orders")
return err
}
activeOrders := s.OrderExecutor.ActiveMakerOrders().Orders()
// update num of open orders metrics
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders)))
}
// update num of active orders metrics
if metricsNumOfActiveOrders != nil {
metricsNumOfActiveOrders.With(baseLabels).Set(float64(len(activeOrders)))
}
opts := common.SyncActiveOrdersOpts{
Logger: s.logger,
Exchange: s.ExchangeSession.Exchange,
ActiveOrderBook: s.OrderExecutor.ActiveMakerOrders(),
OpenOrders: openOrders,
}
return common.SyncActiveOrders(ctx, opts)
}

View File

@ -0,0 +1,116 @@
package dca2
import (
"strconv"
"github.com/prometheus/client_golang/prometheus"
)
var (
metricsState *prometheus.GaugeVec
metricsNumOfActiveOrders *prometheus.GaugeVec
metricsNumOfOpenOrders *prometheus.GaugeVec
metricsProfit *prometheus.GaugeVec
)
func labelKeys(labels prometheus.Labels) []string {
var keys []string
for k := range labels {
keys = append(keys, k)
}
return keys
}
func mergeLabels(a, b prometheus.Labels) prometheus.Labels {
labels := prometheus.Labels{}
for k, v := range a {
labels[k] = v
}
for k, v := range b {
labels[k] = v
}
return labels
}
func initMetrics(extendedLabels []string) {
if metricsState == nil {
metricsState = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "bbgo_dca2_state",
Help: "state of this DCA2 strategy",
},
append([]string{
"exchange",
"symbol",
}, extendedLabels...),
)
}
if metricsNumOfActiveOrders == nil {
metricsNumOfActiveOrders = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "bbgo_dca2_num_of_active_orders",
Help: "number of active orders",
},
append([]string{
"exchange",
"symbol",
}, extendedLabels...),
)
}
if metricsNumOfOpenOrders == nil {
metricsNumOfOpenOrders = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "bbgo_dca2_num_of_open_orders",
Help: "number of open orders",
},
append([]string{
"exchange",
"symbol",
}, extendedLabels...),
)
}
if metricsProfit == nil {
metricsProfit = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "bbgo_dca2_profit",
Help: "profit of this DCA@ strategy",
},
append([]string{
"exchange",
"symbol",
"round",
}, extendedLabels...),
)
}
}
var metricsRegistered = false
func registerMetrics() {
if metricsRegistered {
return
}
initMetrics(nil)
prometheus.MustRegister(
metricsState,
metricsNumOfActiveOrders,
metricsNumOfOpenOrders,
metricsProfit,
)
metricsRegistered = true
}
func updateProfitMetrics(round int64, profit float64) {
labels := mergeLabels(baseLabels, prometheus.Labels{
"round": strconv.FormatInt(round, 10),
})
metricsProfit.With(labels).Set(profit)
}

View File

@ -16,7 +16,7 @@ type cancelOrdersByGroupIDApi interface {
func (s *Strategy) placeOpenPositionOrders(ctx context.Context) error {
s.logger.Infof("[DCA] start placing open position orders")
price, err := getBestPriceUntilSuccess(ctx, s.Session.Exchange, s.Symbol)
price, err := getBestPriceUntilSuccess(ctx, s.ExchangeSession.Exchange, s.Symbol)
if err != nil {
return err
}

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
@ -25,9 +24,9 @@ type RecoverApiQueryService interface {
func (s *Strategy) recover(ctx context.Context) error {
s.logger.Info("[DCA] recover")
queryService, ok := s.Session.Exchange.(RecoverApiQueryService)
queryService, ok := s.ExchangeSession.Exchange.(RecoverApiQueryService)
if !ok {
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.Session.ExchangeName)
return fmt.Errorf("[DCA] exchange %s doesn't support queryAPI interface", s.ExchangeSession.ExchangeName)
}
openOrders, err := queryService.QueryOpenOrders(ctx, s.Symbol)
@ -63,18 +62,18 @@ func (s *Strategy) recover(ctx context.Context) error {
s.startTimeOfNextRound = startTimeOfNextRound
// recover state
state, err := recoverState(ctx, s.ProfitStats.QuoteInvestment, int(s.MaxOrderCount), currentRound, s.OrderExecutor)
state, err := recoverState(ctx, int(s.MaxOrderCount), currentRound, s.OrderExecutor)
if err != nil {
return err
}
s.state = state
s.updateState(state)
s.logger.Info("recover stats DONE")
return nil
}
// recover state
func recoverState(ctx context.Context, quoteInvestment fixedpoint.Value, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) {
func recoverState(ctx context.Context, maxOrderCount int, currentRound Round, orderExecutor *bbgo.GeneralOrderExecutor) (State, error) {
activeOrderBook := orderExecutor.ActiveMakerOrders()
orderStore := orderExecutor.OrderStore()

View File

@ -7,7 +7,6 @@ import (
"time"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/stretchr/testify/assert"
)
@ -95,13 +94,12 @@ func (m *MockQueryOrders) QueryClosedOrdersDesc(ctx context.Context, symbol stri
func Test_RecoverState(t *testing.T) {
strategy := newTestStrategy()
quoteInvestment := fixedpoint.MustNewFromString("1000")
t.Run("new strategy", func(t *testing.T) {
currentRound := Round{}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state)
})
@ -119,7 +117,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, OpenPositionReady, state)
})
@ -137,7 +135,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrderFilled, state)
})
@ -155,7 +153,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelling, state)
})
@ -173,7 +171,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, OpenPositionOrdersCancelling, state)
})
@ -192,7 +190,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, TakeProfitReady, state)
})
@ -211,7 +209,7 @@ func Test_RecoverState(t *testing.T) {
}
position := types.NewPositionFromMarket(strategy.Market)
orderExecutor := bbgo.NewGeneralOrderExecutor(nil, strategy.Symbol, ID, "", position)
state, err := recoverState(context.Background(), quoteInvestment, 5, currentRound, orderExecutor)
state, err := recoverState(context.Background(), 5, currentRound, orderExecutor)
assert.NoError(t, err)
assert.Equal(t, WaitToOpenPosition, state)
})

View File

@ -46,6 +46,13 @@ func (s *Strategy) initializeNextStateC() bool {
return isInitialize
}
func (s *Strategy) updateState(state State) {
s.state = state
s.logger.Infof("[state] update state to %d", state)
metricsState.With(baseLabels).Set(float64(s.state))
}
func (s *Strategy) emitNextState(nextState State) {
select {
case s.nextStateC <- nextState:
@ -63,17 +70,22 @@ func (s *Strategy) emitNextState(nextState State) {
// TakeProfitReady -> the takeProfit order filled ->
func (s *Strategy) runState(ctx context.Context) {
s.logger.Info("[DCA] runState")
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
stateTriggerTicker := time.NewTicker(5 * time.Second)
defer stateTriggerTicker.Stop()
monitorTicker := time.NewTicker(10 * time.Minute)
defer monitorTicker.Stop()
for {
select {
case <-ctx.Done():
s.logger.Info("[DCA] runState DONE")
return
case <-ticker.C:
s.logger.Infof("[DCA] triggerNextState current state: %d", s.state)
case <-stateTriggerTicker.C:
// s.logger.Infof("[DCA] triggerNextState current state: %d", s.state)
s.triggerNextState()
case <-monitorTicker.C:
s.updateNumOfOrdersMetrics(ctx)
case nextState := <-s.nextStateC:
s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState)
@ -131,7 +143,7 @@ func (s *Strategy) runWaitToOpenPositionState(ctx context.Context, next State) {
return
}
s.state = PositionOpening
s.updateState(PositionOpening)
s.logger.Info("[State] WaitToOpenPosition -> PositionOpening")
}
@ -141,17 +153,17 @@ func (s *Strategy) runPositionOpening(ctx context.Context, next State) {
s.logger.WithError(err).Error("failed to place dca orders, please check it.")
return
}
s.state = OpenPositionReady
s.updateState(OpenPositionReady)
s.logger.Info("[State] PositionOpening -> OpenPositionReady")
}
func (s *Strategy) runOpenPositionReady(_ context.Context, next State) {
s.state = OpenPositionOrderFilled
s.updateState(OpenPositionOrderFilled)
s.logger.Info("[State] OpenPositionReady -> OpenPositionOrderFilled")
}
func (s *Strategy) runOpenPositionOrderFilled(_ context.Context, next State) {
s.state = OpenPositionOrdersCancelling
s.updateState(OpenPositionOrdersCancelling)
s.logger.Info("[State] OpenPositionOrderFilled -> OpenPositionOrdersCancelling")
// after open position cancelling, immediately trigger open position cancelled to cancel the other orders
@ -164,7 +176,7 @@ func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next Sta
s.logger.WithError(err).Error("failed to cancel maker orders")
return
}
s.state = OpenPositionOrdersCancelled
s.updateState(OpenPositionOrdersCancelled)
s.logger.Info("[State] OpenPositionOrdersCancelling -> OpenPositionOrdersCancelled")
// after open position cancelled, immediately trigger take profit ready to open take-profit order
@ -177,7 +189,7 @@ func (s *Strategy) runOpenPositionOrdersCancelled(ctx context.Context, next Stat
s.logger.WithError(err).Error("failed to open take profit orders")
return
}
s.state = TakeProfitReady
s.updateState(TakeProfitReady)
s.logger.Info("[State] OpenPositionOrdersCancelled -> TakeProfitReady")
}
@ -200,6 +212,6 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) {
// set the start time of the next round
s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration())
s.state = WaitToOpenPosition
s.updateState(WaitToOpenPosition)
s.logger.Info("[State] TakeProfitReady -> WaitToOpenPosition")
}

View File

@ -21,11 +21,15 @@ import (
"github.com/c9s/bbgo/pkg/util/tradingutil"
)
const ID = "dca2"
const (
ID = "dca2"
orderTag = "dca2"
)
const orderTag = "dca2"
var log = logrus.WithField("strategy", ID)
var (
log = logrus.WithField("strategy", ID)
baseLabels prometheus.Labels
)
func init() {
bbgo.RegisterStrategy(ID, &Strategy{})
@ -42,10 +46,10 @@ type Strategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"`
ProfitStats *ProfitStats `json:"profitStats,omitempty" persistence:"profit_stats"`
Environment *bbgo.Environment
Session *bbgo.ExchangeSession
OrderExecutor *bbgo.GeneralOrderExecutor
Market types.Market
Environment *bbgo.Environment
ExchangeSession *bbgo.ExchangeSession
OrderExecutor *bbgo.GeneralOrderExecutor
Market types.Market
Symbol string `json:"symbol"`
@ -119,6 +123,7 @@ func (s *Strategy) Defaults() error {
s.LogFields["symbol"] = s.Symbol
s.LogFields["strategy"] = ID
return nil
}
@ -135,9 +140,26 @@ func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: types.Interval1m})
}
func (s *Strategy) newPrometheusLabels() prometheus.Labels {
labels := prometheus.Labels{
"exchange": "default",
"symbol": s.Symbol,
}
if s.ExchangeSession != nil {
labels["exchange"] = s.ExchangeSession.Name
}
if s.PrometheusLabels == nil {
return labels
}
return mergeLabels(s.PrometheusLabels, labels)
}
func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.ExchangeSession) error {
instanceID := s.InstanceID()
s.Session = session
s.ExchangeSession = session
if s.ProfitStats == nil {
s.ProfitStats = newProfitStats(s.Market, s.QuoteInvestment)
}
@ -146,6 +168,15 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.Position = types.NewPositionFromMarket(s.Market)
}
// prometheus
if s.PrometheusLabels != nil {
initMetrics(labelKeys(s.PrometheusLabels))
}
registerMetrics()
// prometheus labels
baseLabels = s.newPrometheusLabels()
// if dev mode is on and it's not a new strategy
if s.DevMode != nil && s.DevMode.Enabled && !s.DevMode.IsNewAccount {
s.ProfitStats = newProfitStats(s.Market, s.QuoteInvestment)
@ -192,6 +223,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
default:
s.logger.Infof("[DCA] unsupported side (%s) of order: %s", o.Side, o)
}
// update metrics when filled
s.updateNumOfOrdersMetrics(ctx)
})
session.MarketDataStream.OnKLine(func(kline types.KLine) {
@ -218,7 +252,7 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
// 1. recoverWhenStart is false
// 2. dev mode is on and it's not new strategy
if !s.RecoverWhenStart || (s.DevMode != nil && s.DevMode.Enabled && !s.DevMode.IsNewAccount) {
s.state = WaitToOpenPosition
s.updateState(WaitToOpenPosition)
} else {
// recover
if err := s.recover(ctx); err != nil {
@ -286,7 +320,7 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
_ = s.Initialize()
defer s.EmitClosed()
session := s.Session
session := s.ExchangeSession
if session == nil {
return fmt.Errorf("Session is nil, please check it")
}
@ -324,14 +358,14 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
}
func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
historyService, ok := s.Session.Exchange.(types.ExchangeTradeHistoryService)
historyService, ok := s.ExchangeSession.Exchange.(types.ExchangeTradeHistoryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.Session.Exchange.Name())
return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.ExchangeSession.Exchange.Name())
}
queryService, ok := s.Session.Exchange.(types.ExchangeOrderQueryService)
queryService, ok := s.ExchangeSession.Exchange.(types.ExchangeOrderQueryService)
if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.Session.Exchange.Name())
return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.ExchangeSession.Exchange.Name())
}
// TODO: pagination for it
@ -400,9 +434,23 @@ func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
// emit profit
s.EmitProfit(s.ProfitStats)
updateProfitMetrics(s.ProfitStats.Round, s.ProfitStats.CurrentRoundProfit.Float64())
s.ProfitStats.NewRound()
}
return nil
}
func (s *Strategy) updateNumOfOrdersMetrics(ctx context.Context) {
// update open orders metrics
openOrders, err := s.ExchangeSession.Exchange.QueryOpenOrders(ctx, s.Symbol)
if err != nil {
s.logger.WithError(err).Warn("failed to query open orders to update num of the orders metrics")
} else {
metricsNumOfOpenOrders.With(baseLabels).Set(float64(len(openOrders)))
}
// update active orders metrics
metricsNumOfActiveOrders.With(baseLabels).Set(float64(s.OrderExecutor.ActiveMakerOrders().NumOfOrders()))
}

View File

@ -4,26 +4,12 @@ import (
"context"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
"go.uber.org/multierr"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
type SyncActiveOrdersOpts struct {
logger *logrus.Entry
metricsLabels prometheus.Labels
activeOrderBook *bbgo.ActiveOrderBook
orderQueryService types.ExchangeOrderQueryService
exchange types.Exchange
}
func (s *Strategy) initializeRecoverC() bool {
s.mu.Lock()
defer s.mu.Unlock()
@ -57,17 +43,25 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
// make ticker's interval random in 25 min ~ 35 min
interval := util.MillisecondsJitter(25*time.Minute, 10*60*1000)
s.logger.Infof("[ActiveOrderRecover] interval: %s", interval)
metricsLabel := s.newPrometheusLabels()
orderQueryService, ok := s.session.Exchange.(types.ExchangeOrderQueryService)
if !ok {
s.logger.Errorf("exchange %s doesn't support ExchangeOrderQueryService, please check it", s.session.ExchangeName)
return
}
opts := common.SyncActiveOrdersOpts{
Logger: s.logger,
Exchange: s.session.Exchange,
OrderQueryService: orderQueryService,
ActiveOrderBook: s.orderExecutor.ActiveMakerOrders(),
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
opts := SyncActiveOrdersOpts{
logger: s.logger,
metricsLabels: s.newPrometheusLabels(),
activeOrderBook: s.orderExecutor.ActiveMakerOrders(),
orderQueryService: s.orderQueryService,
exchange: s.session.Exchange,
}
var lastRecoverTime time.Time
for {
@ -82,7 +76,19 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
continue
}
if err := syncActiveOrders(ctx, opts); err != nil {
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, s.session.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
continue
}
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(metricsLabel).Set(float64(len(openOrders)))
}
opts.OpenOrders = openOrders
if err := common.SyncActiveOrders(ctx, opts); err != nil {
log.WithError(err).Errorf("unable to sync active orders")
} else {
lastRecoverTime = time.Now()
@ -90,70 +96,3 @@ func (s *Strategy) recoverActiveOrdersPeriodically(ctx context.Context) {
}
}
}
func isMaxExchange(ex interface{}) bool {
_, yes := ex.(*max.Exchange)
return yes
}
func syncActiveOrders(ctx context.Context, opts SyncActiveOrdersOpts) error {
opts.logger.Infof("[ActiveOrderRecover] syncActiveOrders")
// only sync orders which is updated over 3 min, because we may receive from websocket and handle it twice
syncBefore := time.Now().Add(-3 * time.Minute)
openOrders, err := retry.QueryOpenOrdersUntilSuccessfulLite(ctx, opts.exchange, opts.activeOrderBook.Symbol)
if err != nil {
opts.logger.WithError(err).Error("[ActiveOrderRecover] failed to query open orders, skip this time")
return errors.Wrapf(err, "[ActiveOrderRecover] failed to query open orders, skip this time")
}
if metricsNumOfOpenOrders != nil {
metricsNumOfOpenOrders.With(opts.metricsLabels).Set(float64(len(openOrders)))
}
activeOrders := opts.activeOrderBook.Orders()
openOrdersMap := make(map[uint64]types.Order)
for _, openOrder := range openOrders {
openOrdersMap[openOrder.OrderID] = openOrder
}
var errs error
// update active orders not in open orders
for _, activeOrder := range activeOrders {
if _, exist := openOrdersMap[activeOrder.OrderID]; exist {
// no need to sync active order already in active orderbook, because we only need to know if it filled or not.
delete(openOrdersMap, activeOrder.OrderID)
} else {
opts.logger.Infof("[ActiveOrderRecover] found active order #%d is not in the open orders, updating...", activeOrder.OrderID)
isActiveOrderBookUpdated, err := syncActiveOrder(ctx, opts.activeOrderBook, opts.orderQueryService, activeOrder.OrderID, syncBefore)
if err != nil {
opts.logger.WithError(err).Errorf("[ActiveOrderRecover] unable to query order #%d", activeOrder.OrderID)
errs = multierr.Append(errs, err)
continue
}
if !isActiveOrderBookUpdated {
opts.logger.Infof("[ActiveOrderRecover] active order #%d is updated in 3 min, skip updating...", activeOrder.OrderID)
}
}
}
// update open orders not in active orders
for _, openOrder := range openOrdersMap {
opts.logger.Infof("found open order #%d is not in active orderbook, updating...", openOrder.OrderID)
// we don't add open orders into active orderbook if updated in 3 min, because we may receive message from websocket and add it twice.
if openOrder.UpdateTime.After(syncBefore) {
opts.logger.Infof("open order #%d is updated in 3 min, skip updating...", openOrder.OrderID)
continue
}
opts.activeOrderBook.Add(openOrder)
// opts.activeOrderBook.Update(openOrder)
}
return errs
}

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
@ -19,7 +20,7 @@ func (s *Strategy) recoverByScanningTrades(ctx context.Context, session *bbgo.Ex
defer func() {
s.updateGridNumOfOrdersMetricsWithLock()
}()
isMax := isMaxExchange(session.Exchange)
isMax := exchange.IsMaxExchange(session.Exchange)
s.logger.Infof("isMax: %t", isMax)
historyService, implemented := session.Exchange.(types.ExchangeTradeHistoryService)

View File

@ -9,6 +9,7 @@ import (
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange"
maxapi "github.com/c9s/bbgo/pkg/exchange/max/maxapi"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint"
@ -273,7 +274,7 @@ func syncActiveOrder(
ctx context.Context, activeOrderBook *bbgo.ActiveOrderBook, orderQueryService types.ExchangeOrderQueryService,
orderID uint64, syncBefore time.Time,
) (isOrderUpdated bool, err error) {
isMax := isMaxExchange(orderQueryService)
isMax := exchange.IsMaxExchange(orderQueryService)
updatedOrder, err := retry.QueryOrderUntilSuccessful(ctx, orderQueryService, types.OrderQuery{
Symbol: activeOrderBook.Symbol,