Merge pull request #1298 from c9s/c9s/grid2-fix-recover-active-orders

FIX: [grid2] fix active order recover, add start process delay
This commit is contained in:
c9s 2023-09-01 18:48:58 +08:00 committed by GitHub
commit 8adb0df656
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -583,7 +583,9 @@ func (s *Strategy) handleOrderFilled(o types.Order) {
s.processFilledOrder(o)
}
func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) {
func (s *Strategy) checkRequiredInvestmentByQuantity(
baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin,
) (requiredBase, requiredQuote fixedpoint.Value, err error) {
// check more investment budget details
requiredBase = fixedpoint.Zero
requiredQuote = fixedpoint.Zero
@ -641,7 +643,9 @@ func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance,
return requiredBase, requiredQuote, nil
}
func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) {
func (s *Strategy) checkRequiredInvestmentByAmount(
baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin,
) (requiredBase, requiredQuote fixedpoint.Value, err error) {
// check more investment budget details
requiredBase = fixedpoint.Zero
@ -702,7 +706,9 @@ func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, am
return requiredBase, requiredQuote, nil
}
func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) {
func (s *Strategy) calculateQuoteInvestmentQuantity(
quoteInvestment, lastPrice fixedpoint.Value, pins []Pin,
) (fixedpoint.Value, error) {
// quoteInvestment = (p1 * q) + (p2 * q) + (p3 * q) + ....
// =>
// quoteInvestment = (p1 + p2 + p3) * q
@ -758,7 +764,9 @@ func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice f
return q, nil
}
func (s *Strategy) calculateBaseQuoteInvestmentQuantity(quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) {
func (s *Strategy) calculateBaseQuoteInvestmentQuantity(
quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin,
) (fixedpoint.Value, error) {
s.logger.Infof("calculating quantity by base/quote investment: %f / %f", baseInvestment.Float64(), quoteInvestment.Float64())
// q_p1 = q_p2 = q_p3 = q_p4
// baseInvestment = q_p1 + q_p2 + q_p3 + q_p4 + ....
@ -1463,7 +1471,9 @@ func (s *Strategy) checkMinimalQuoteInvestment(grid *Grid) error {
return nil
}
func (s *Strategy) recoverGridWithOpenOrders(ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order) error {
func (s *Strategy) recoverGridWithOpenOrders(
ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order,
) error {
grid := s.newGrid()
s.logger.Infof("GRID RECOVER: %s", grid.String())
@ -1622,7 +1632,10 @@ func (s *Strategy) getGrid() *Grid {
// replayOrderHistory queries the closed order history from the API and rebuild the orderbook from the order history.
// startTime, endTime is the time range of the order history.
func (s *Strategy) replayOrderHistory(ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService, startTime, endTime time.Time, lastOrderID uint64) error {
func (s *Strategy) replayOrderHistory(
ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService,
startTime, endTime time.Time, lastOrderID uint64,
) error {
// a simple guard, in reality, this startTime is not possible to exceed the endTime
// because the queries closed orders might still in the range.
orderIdChanged := true
@ -1951,10 +1964,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
}
})
session.UserDataStream.OnConnect(func() {
s.handleConnect(ctx, session)
})
// if TriggerPrice is zero, that means we need to open the grid when start up
if s.TriggerPrice.IsZero() {
// must call the openGrid method inside the OnStart callback because
@ -1966,13 +1975,25 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo.
s.logger.Infof("user data stream started, initializing grid...")
if !bbgo.IsBackTesting {
go s.startProcess(ctx, session)
go time.AfterFunc(3*time.Second, func() {
s.startProcess(ctx, session)
})
} else {
s.startProcess(ctx, session)
}
})
}
session.UserDataStream.OnConnect(func() {
if !bbgo.IsBackTesting {
// callback may block the stream execution, so we spawn the recover function to the background
// add (5 seconds + random <10 seconds jitter) delay
go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() {
s.recoverActiveOrders(ctx, session)
})
}
})
return nil
}
@ -2117,16 +2138,29 @@ func (s *Strategy) newClientOrderID() string {
return ""
}
func (s *Strategy) handleConnect(ctx context.Context, session *bbgo.ExchangeSession) {
func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.ExchangeSession) {
s.logger.Infof("recovering active orders after websocket connect")
grid := s.getGrid()
if grid == nil {
return
}
// this lock avoids recovering the active orders while the openGrid is executing
s.mu.Lock()
defer s.mu.Unlock()
// TODO: move this logics into the active maker orders component, like activeOrders.Sync(ctx)
activeOrderBook := s.orderExecutor.ActiveMakerOrders()
activeOrders := activeOrderBook.Orders()
if len(activeOrders) == 0 {
return
}
s.logger.Infof("found %d active orders to update...", len(activeOrders))
for _, o := range activeOrders {
s.logger.Infof("updating %d order...", o.OrderID)
var updatedOrder *types.Order
err := retry.GeneralBackoff(ctx, func() error {
var err error