Merge pull request #1509 from c9s/kbearXD/dca2/profit-stats-and-recover

[dca2] fix dca2 bug
This commit is contained in:
c9s 2024-01-24 15:50:09 +08:00 committed by GitHub
commit 884b8f2b45
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 126 additions and 68 deletions

View File

@ -201,7 +201,6 @@ func (e *GeneralOrderExecutor) Bind() {
}) })
e.tradeCollector.OnPositionUpdate(func(position *types.Position) { e.tradeCollector.OnPositionUpdate(func(position *types.Position) {
log.Infof("position changed: %s", position)
Notify(position) Notify(position)
}) })
} }

View File

@ -102,7 +102,7 @@ func calculateNotionalAndNumOrders(market types.Market, quoteInvestment fixedpoi
continue continue
} }
return notional, num return market.TruncatePrice(notional), num
} }
return fixedpoint.Zero, 0 return fixedpoint.Zero, 0

View File

@ -65,8 +65,8 @@ func (s *ProfitStats) AddTrade(trade types.Trade) {
s.TotalProfit = s.TotalProfit.Add(quoteQuantity) s.TotalProfit = s.TotalProfit.Add(quoteQuantity)
if s.Market.QuoteCurrency == trade.FeeCurrency { if s.Market.QuoteCurrency == trade.FeeCurrency {
s.CurrentRoundProfit.Sub(trade.Fee) s.CurrentRoundProfit = s.CurrentRoundProfit.Sub(trade.Fee)
s.TotalProfit.Sub(trade.Fee) s.TotalProfit = s.TotalProfit.Sub(trade.Fee)
} }
} }

View File

@ -34,7 +34,7 @@ func (s *Strategy) recover(ctx context.Context) error {
return err return err
} }
closedOrders, err := queryService.QueryClosedOrdersDesc(ctx, s.Symbol, time.Date(2024, time.January, 1, 0, 0, 0, 0, time.Local), time.Now(), 0) closedOrders, err := queryService.QueryClosedOrdersDesc(ctx, s.Symbol, time.Date(2024, time.January, 12, 14, 0, 0, 0, time.Local), time.Now(), 0)
if err != nil { if err != nil {
return err return err
} }
@ -50,14 +50,17 @@ func (s *Strategy) recover(ctx context.Context) error {
if err != nil { if err != nil {
return err return err
} }
s.logger.Info("recover stats DONE")
// recover position // recover position
if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil { if err := recoverPosition(ctx, s.Position, queryService, currentRound); err != nil {
return err return err
} }
s.logger.Info("recover position DONE")
// recover profit stats // recover profit stats
recoverProfitStats(ctx, s) recoverProfitStats(ctx, s)
s.logger.Info("recover profit stats DONE")
// recover startTimeOfNextRound // recover startTimeOfNextRound
startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval) startTimeOfNextRound := recoverStartTimeOfNextRound(ctx, currentRound, s.CoolDownInterval)
@ -194,7 +197,7 @@ func recoverProfitStats(ctx context.Context, strategy *Strategy) error {
return fmt.Errorf("profit stats is nil, please check it") return fmt.Errorf("profit stats is nil, please check it")
} }
strategy.CalculateProfitOfCurrentRound(ctx) strategy.CalculateAndEmitProfit(ctx)
return nil return nil
} }

View File

@ -72,6 +72,7 @@ func (s *Strategy) runState(ctx context.Context) {
s.logger.Info("[DCA] runState DONE") s.logger.Info("[DCA] runState DONE")
return return
case <-ticker.C: case <-ticker.C:
s.logger.Infof("[DCA] triggerNextState current state: %d", s.state)
s.triggerNextState() s.triggerNextState()
case nextState := <-s.nextStateC: case nextState := <-s.nextStateC:
s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState) s.logger.Infof("[DCA] currenct state: %d, next state: %d", s.state, nextState)
@ -85,6 +86,7 @@ func (s *Strategy) runState(ctx context.Context) {
if nextState != validNextState { if nextState != validNextState {
s.logger.Warnf("[DCA] %d is not valid next state of curreny state %d", nextState, s.state) s.logger.Warnf("[DCA] %d is not valid next state of curreny state %d", nextState, s.state)
continue
} }
// move to next state // move to next state
@ -118,7 +120,7 @@ func (s *Strategy) triggerNextState() {
// only trigger from order filled event // only trigger from order filled event
default: default:
if nextState, ok := stateTransition[s.state]; ok { if nextState, ok := stateTransition[s.state]; ok {
s.nextStateC <- nextState s.emitNextState(nextState)
} }
} }
} }
@ -129,13 +131,6 @@ func (s *Strategy) runWaitToOpenPositionState(ctx context.Context, next State) {
return return
} }
// reset position and open new round for profit stats before position opening
s.Position.Reset()
s.ProfitStats.NewRound()
// store into redis
bbgo.Sync(ctx, s)
s.state = PositionOpening s.state = PositionOpening
s.logger.Info("[State] WaitToOpenPosition -> PositionOpening") s.logger.Info("[State] WaitToOpenPosition -> PositionOpening")
} }
@ -160,7 +155,7 @@ func (s *Strategy) runOpenPositionOrderFilled(_ context.Context, next State) {
s.logger.Info("[State] OpenPositionOrderFilled -> OpenPositionOrdersCancelling") s.logger.Info("[State] OpenPositionOrderFilled -> OpenPositionOrdersCancelling")
// after open position cancelling, immediately trigger open position cancelled to cancel the other orders // after open position cancelling, immediately trigger open position cancelled to cancel the other orders
s.nextStateC <- OpenPositionOrdersCancelled s.emitNextState(OpenPositionOrdersCancelled)
} }
func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next State) { func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next State) {
@ -173,7 +168,7 @@ func (s *Strategy) runOpenPositionOrdersCancelling(ctx context.Context, next Sta
s.logger.Info("[State] OpenPositionOrdersCancelling -> OpenPositionOrdersCancelled") s.logger.Info("[State] OpenPositionOrdersCancelling -> OpenPositionOrdersCancelled")
// after open position cancelled, immediately trigger take profit ready to open take-profit order // after open position cancelled, immediately trigger take profit ready to open take-profit order
s.nextStateC <- TakeProfitReady s.emitNextState(TakeProfitReady)
} }
func (s *Strategy) runOpenPositionOrdersCancelled(ctx context.Context, next State) { func (s *Strategy) runOpenPositionOrdersCancelled(ctx context.Context, next State) {
@ -192,11 +187,16 @@ func (s *Strategy) runTakeProfitReady(ctx context.Context, next State) {
s.logger.Info("[State] TakeProfitReady - start reseting position and calculate quote investment for next round") s.logger.Info("[State] TakeProfitReady - start reseting position and calculate quote investment for next round")
// calculate profit stats // reset position
s.CalculateProfitOfCurrentRound(ctx)
bbgo.Sync(ctx, s)
s.EmitProfit(s.ProfitStats) // calculate profit stats
s.CalculateAndEmitProfit(ctx)
// reset position and open new round for profit stats before position opening
s.Position.Reset()
// store into redis
bbgo.Sync(ctx, s)
// set the start time of the next round // set the start time of the next round
s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration()) s.startTimeOfNextRound = time.Now().Add(s.CoolDownInterval.Duration())

View File

@ -9,12 +9,14 @@ import (
"time" "time"
"github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/exchange/retry"
"github.com/c9s/bbgo/pkg/fixedpoint" "github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/strategy/common" "github.com/c9s/bbgo/pkg/strategy/common"
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"go.uber.org/multierr"
) )
const ID = "dca2" const ID = "dca2"
@ -27,6 +29,12 @@ func init() {
bbgo.RegisterStrategy(ID, &Strategy{}) bbgo.RegisterStrategy(ID, &Strategy{})
} }
type advancedOrderCancelApi interface {
CancelAllOrders(ctx context.Context) ([]types.Order, error)
CancelOrdersBySymbol(ctx context.Context, symbol string) ([]types.Order, error)
CancelOrdersByGroupID(ctx context.Context, groupID uint32) ([]types.Order, error)
}
//go:generate callbackgen -type Strateg //go:generate callbackgen -type Strateg
type Strategy struct { type Strategy struct {
Position *types.Position `json:"position,omitempty" persistence:"position"` Position *types.Position `json:"position,omitempty" persistence:"position"`
@ -55,6 +63,9 @@ type Strategy struct {
// KeepOrdersWhenShutdown option is used for keeping the grid orders when shutting down bbgo // KeepOrdersWhenShutdown option is used for keeping the grid orders when shutting down bbgo
KeepOrdersWhenShutdown bool `json:"keepOrdersWhenShutdown"` KeepOrdersWhenShutdown bool `json:"keepOrdersWhenShutdown"`
// UseCancelAllOrdersApiWhenClose uses a different API to cancel all the orders on the market when closing a grid
UseCancelAllOrdersApiWhenClose bool `json:"useCancelAllOrdersApiWhenClose"`
// log // log
logger *logrus.Entry logger *logrus.Entry
LogFields logrus.Fields `json:"logFields"` LogFields logrus.Fields `json:"logFields"`
@ -197,14 +208,14 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.logger.WithError(err).Error("[DCA] something wrong when state recovering") s.logger.WithError(err).Error("[DCA] something wrong when state recovering")
return return
} }
} else {
s.state = WaitToOpenPosition
}
s.logger.Infof("[DCA] state: %d", s.state) s.logger.Infof("[DCA] state: %d", s.state)
s.logger.Infof("[DCA] position %s", s.Position.String()) s.logger.Infof("[DCA] position %s", s.Position.String())
s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String()) s.logger.Infof("[DCA] profit stats %s", s.ProfitStats.String())
s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound) s.logger.Infof("[DCA] startTimeOfNextRound %s", s.startTimeOfNextRound)
} else {
s.state = WaitToOpenPosition
}
s.updateTakeProfitPrice() s.updateTakeProfitPrice()
@ -220,16 +231,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}) })
}) })
balances, err := session.Exchange.QueryAccountBalances(ctx)
if err != nil {
return err
}
balance := balances[s.Market.QuoteCurrency]
if balance.Available.Compare(s.ProfitStats.QuoteInvestment) < 0 {
return fmt.Errorf("the available balance of %s is %s which is less than quote investment setting %s, please check it", s.Market.QuoteCurrency, balance.Available, s.ProfitStats.QuoteInvestment)
}
bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) { bbgo.OnShutdown(ctx, func(ctx context.Context, wg *sync.WaitGroup) {
defer wg.Done() defer wg.Done()
@ -270,16 +271,45 @@ func (s *Strategy) CleanUp(ctx context.Context) error {
_ = s.Initialize() _ = s.Initialize()
defer s.EmitClosed() defer s.EmitClosed()
err := s.OrderExecutor.GracefulCancel(ctx) session := s.Session
if err != nil { if session == nil {
s.logger.WithError(err).Errorf("[DCA] there are errors when cancelling orders at clean up") return fmt.Errorf("Session is nil, please check it")
} }
bbgo.Sync(ctx, s) service, support := session.Exchange.(advancedOrderCancelApi)
return err if !support {
return fmt.Errorf("advancedOrderCancelApi interface is not implemented, fallback to default graceful cancel, exchange %T", session)
}
var werr error
for {
s.logger.Infof("checking %s open orders...", s.Symbol)
openOrders, err := retry.QueryOpenOrdersUntilSuccessful(ctx, session.Exchange, s.Symbol)
if err != nil {
s.logger.WithError(err).Errorf("CancelOrdersByGroupID api call error")
werr = multierr.Append(werr, err)
}
if len(openOrders) == 0 {
break
}
s.logger.Infof("found %d open orders left, using cancel all orders api", len(openOrders))
s.logger.Infof("using cancal all orders api for canceling grid orders...")
if err := retry.CancelAllOrdersUntilSuccessful(ctx, service); err != nil {
s.logger.WithError(err).Errorf("CancelAllOrders api call error")
werr = multierr.Append(werr, err)
}
time.Sleep(1 * time.Second)
}
return werr
} }
func (s *Strategy) CalculateProfitOfCurrentRound(ctx context.Context) error { func (s *Strategy) CalculateAndEmitProfit(ctx context.Context) error {
historyService, ok := s.Session.Exchange.(types.ExchangeTradeHistoryService) historyService, ok := s.Session.Exchange.(types.ExchangeTradeHistoryService)
if !ok { if !ok {
return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.Session.Exchange.Name()) return fmt.Errorf("exchange %s doesn't support ExchangeTradeHistoryService", s.Session.Exchange.Name())
@ -290,30 +320,47 @@ func (s *Strategy) CalculateProfitOfCurrentRound(ctx context.Context) error {
return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.Session.Exchange.Name()) return fmt.Errorf("exchange %s doesn't support ExchangeOrderQueryService", s.Session.Exchange.Name())
} }
// query the orders of this round // TODO: pagination for it
// query the orders
orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID) orders, err := historyService.QueryClosedOrders(ctx, s.Symbol, time.Time{}, time.Time{}, s.ProfitStats.FromOrderID)
if err != nil { if err != nil {
return err return err
} }
// query the trades of this round var rounds []Round
var round Round
for _, order := range orders { for _, order := range orders {
if order.OrderID > s.ProfitStats.FromOrderID {
s.ProfitStats.FromOrderID = order.OrderID
}
// skip not this strategy order // skip not this strategy order
if order.GroupID != s.OrderGroupID { if order.GroupID != s.OrderGroupID {
continue continue
} }
if order.ExecutedQuantity.Sign() == 0 { switch order.Side {
// skip no trade orders case types.SideTypeBuy:
round.OpenPositionOrders = append(round.OpenPositionOrders, order)
case types.SideTypeSell:
if order.Status != types.OrderStatusFilled {
continue continue
} }
round.TakeProfitOrder = order
rounds = append(rounds, round)
round = Round{}
default:
s.logger.Errorf("there is order with unsupported side")
}
}
for _, round := range rounds {
var roundOrders []types.Order = round.OpenPositionOrders
roundOrders = append(roundOrders, round.TakeProfitOrder)
for _, order := range roundOrders {
s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String()) s.logger.Infof("[DCA] calculate profit stats from order: %s", order.String())
// skip no trade orders
if order.ExecutedQuantity.Sign() == 0 {
continue
}
trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{ trades, err := queryService.QueryOrderTrades(ctx, types.OrderQuery{
Symbol: order.Symbol, Symbol: order.Symbol,
OrderID: strconv.FormatUint(order.OrderID, 10), OrderID: strconv.FormatUint(order.OrderID, 10),
@ -329,8 +376,17 @@ func (s *Strategy) CalculateProfitOfCurrentRound(ctx context.Context) error {
} }
} }
s.ProfitStats.FromOrderID = s.ProfitStats.FromOrderID + 1 s.ProfitStats.FromOrderID = round.TakeProfitOrder.OrderID + 1
s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit) s.ProfitStats.QuoteInvestment = s.ProfitStats.QuoteInvestment.Add(s.ProfitStats.CurrentRoundProfit)
// store into persistence
bbgo.Sync(ctx, s)
// emit profit
s.EmitProfit(s.ProfitStats)
s.ProfitStats.NewRound()
}
return nil return nil
} }