diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index e36c942d2..6a51513ba 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -583,7 +583,9 @@ func (s *Strategy) handleOrderFilled(o types.Order) { s.processFilledOrder(o) } -func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) { +func (s *Strategy) checkRequiredInvestmentByQuantity( + baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin, +) (requiredBase, requiredQuote fixedpoint.Value, err error) { // check more investment budget details requiredBase = fixedpoint.Zero requiredQuote = fixedpoint.Zero @@ -641,7 +643,9 @@ func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance, return requiredBase, requiredQuote, nil } -func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) { +func (s *Strategy) checkRequiredInvestmentByAmount( + baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin, +) (requiredBase, requiredQuote fixedpoint.Value, err error) { // check more investment budget details requiredBase = fixedpoint.Zero @@ -702,7 +706,9 @@ func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, am return requiredBase, requiredQuote, nil } -func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) { +func (s *Strategy) calculateQuoteInvestmentQuantity( + quoteInvestment, lastPrice fixedpoint.Value, pins []Pin, +) (fixedpoint.Value, error) { // quoteInvestment = (p1 * q) + (p2 * q) + (p3 * q) + .... // => // quoteInvestment = (p1 + p2 + p3) * q @@ -758,7 +764,9 @@ func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice f return q, nil } -func (s *Strategy) calculateBaseQuoteInvestmentQuantity(quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) { +func (s *Strategy) calculateBaseQuoteInvestmentQuantity( + quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin, +) (fixedpoint.Value, error) { s.logger.Infof("calculating quantity by base/quote investment: %f / %f", baseInvestment.Float64(), quoteInvestment.Float64()) // q_p1 = q_p2 = q_p3 = q_p4 // baseInvestment = q_p1 + q_p2 + q_p3 + q_p4 + .... @@ -1463,7 +1471,9 @@ func (s *Strategy) checkMinimalQuoteInvestment(grid *Grid) error { return nil } -func (s *Strategy) recoverGridWithOpenOrders(ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order) error { +func (s *Strategy) recoverGridWithOpenOrders( + ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order, +) error { grid := s.newGrid() s.logger.Infof("GRID RECOVER: %s", grid.String()) @@ -1622,7 +1632,10 @@ func (s *Strategy) getGrid() *Grid { // replayOrderHistory queries the closed order history from the API and rebuild the orderbook from the order history. // startTime, endTime is the time range of the order history. -func (s *Strategy) replayOrderHistory(ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService, startTime, endTime time.Time, lastOrderID uint64) error { +func (s *Strategy) replayOrderHistory( + ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService, + startTime, endTime time.Time, lastOrderID uint64, +) error { // a simple guard, in reality, this startTime is not possible to exceed the endTime // because the queries closed orders might still in the range. orderIdChanged := true @@ -1951,10 +1964,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. } }) - session.UserDataStream.OnConnect(func() { - s.handleConnect(ctx, session) - }) - // if TriggerPrice is zero, that means we need to open the grid when start up if s.TriggerPrice.IsZero() { // must call the openGrid method inside the OnStart callback because @@ -1966,13 +1975,25 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.logger.Infof("user data stream started, initializing grid...") if !bbgo.IsBackTesting { - go s.startProcess(ctx, session) + go time.AfterFunc(3*time.Second, func() { + s.startProcess(ctx, session) + }) } else { s.startProcess(ctx, session) } }) } + session.UserDataStream.OnConnect(func() { + if !bbgo.IsBackTesting { + // callback may block the stream execution, so we spawn the recover function to the background + // add (5 seconds + random <10 seconds jitter) delay + go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { + s.recoverActiveOrders(ctx, session) + }) + } + }) + return nil } @@ -2117,16 +2138,29 @@ func (s *Strategy) newClientOrderID() string { return "" } -func (s *Strategy) handleConnect(ctx context.Context, session *bbgo.ExchangeSession) { +func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.ExchangeSession) { + s.logger.Infof("recovering active orders after websocket connect") + grid := s.getGrid() if grid == nil { return } + // this lock avoids recovering the active orders while the openGrid is executing + s.mu.Lock() + defer s.mu.Unlock() + // TODO: move this logics into the active maker orders component, like activeOrders.Sync(ctx) activeOrderBook := s.orderExecutor.ActiveMakerOrders() activeOrders := activeOrderBook.Orders() + if len(activeOrders) == 0 { + return + } + + s.logger.Infof("found %d active orders to update...", len(activeOrders)) for _, o := range activeOrders { + s.logger.Infof("updating %d order...", o.OrderID) + var updatedOrder *types.Order err := retry.GeneralBackoff(ctx, func() error { var err error