From cb0285544ebd5bfa4902a9818ceb0f8e230ca2d5 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 31 Aug 2023 13:48:56 +0800 Subject: [PATCH 1/4] add lock to recoverActiveOrders --- pkg/strategy/grid2/strategy.go | 37 ++++++++++++++++++++++++++-------- 1 file changed, 29 insertions(+), 8 deletions(-) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index e36c942d2..7f17cdcb9 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -583,7 +583,9 @@ func (s *Strategy) handleOrderFilled(o types.Order) { s.processFilledOrder(o) } -func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) { +func (s *Strategy) checkRequiredInvestmentByQuantity( + baseBalance, quoteBalance, quantity, lastPrice fixedpoint.Value, pins []Pin, +) (requiredBase, requiredQuote fixedpoint.Value, err error) { // check more investment budget details requiredBase = fixedpoint.Zero requiredQuote = fixedpoint.Zero @@ -641,7 +643,9 @@ func (s *Strategy) checkRequiredInvestmentByQuantity(baseBalance, quoteBalance, return requiredBase, requiredQuote, nil } -func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin) (requiredBase, requiredQuote fixedpoint.Value, err error) { +func (s *Strategy) checkRequiredInvestmentByAmount( + baseBalance, quoteBalance, amount, lastPrice fixedpoint.Value, pins []Pin, +) (requiredBase, requiredQuote fixedpoint.Value, err error) { // check more investment budget details requiredBase = fixedpoint.Zero @@ -702,7 +706,9 @@ func (s *Strategy) checkRequiredInvestmentByAmount(baseBalance, quoteBalance, am return requiredBase, requiredQuote, nil } -func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) { +func (s *Strategy) calculateQuoteInvestmentQuantity( + quoteInvestment, lastPrice fixedpoint.Value, pins []Pin, +) (fixedpoint.Value, error) { // quoteInvestment = (p1 * q) + (p2 * q) + (p3 * q) + .... // => // quoteInvestment = (p1 + p2 + p3) * q @@ -758,7 +764,9 @@ func (s *Strategy) calculateQuoteInvestmentQuantity(quoteInvestment, lastPrice f return q, nil } -func (s *Strategy) calculateBaseQuoteInvestmentQuantity(quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin) (fixedpoint.Value, error) { +func (s *Strategy) calculateBaseQuoteInvestmentQuantity( + quoteInvestment, baseInvestment, lastPrice fixedpoint.Value, pins []Pin, +) (fixedpoint.Value, error) { s.logger.Infof("calculating quantity by base/quote investment: %f / %f", baseInvestment.Float64(), quoteInvestment.Float64()) // q_p1 = q_p2 = q_p3 = q_p4 // baseInvestment = q_p1 + q_p2 + q_p3 + q_p4 + .... @@ -1463,7 +1471,9 @@ func (s *Strategy) checkMinimalQuoteInvestment(grid *Grid) error { return nil } -func (s *Strategy) recoverGridWithOpenOrders(ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order) error { +func (s *Strategy) recoverGridWithOpenOrders( + ctx context.Context, historyService types.ExchangeTradeHistoryService, openOrders []types.Order, +) error { grid := s.newGrid() s.logger.Infof("GRID RECOVER: %s", grid.String()) @@ -1622,7 +1632,10 @@ func (s *Strategy) getGrid() *Grid { // replayOrderHistory queries the closed order history from the API and rebuild the orderbook from the order history. // startTime, endTime is the time range of the order history. -func (s *Strategy) replayOrderHistory(ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService, startTime, endTime time.Time, lastOrderID uint64) error { +func (s *Strategy) replayOrderHistory( + ctx context.Context, grid *Grid, orderBook *bbgo.ActiveOrderBook, historyService types.ExchangeTradeHistoryService, + startTime, endTime time.Time, lastOrderID uint64, +) error { // a simple guard, in reality, this startTime is not possible to exceed the endTime // because the queries closed orders might still in the range. orderIdChanged := true @@ -1952,7 +1965,11 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. }) session.UserDataStream.OnConnect(func() { - s.handleConnect(ctx, session) + if !bbgo.IsBackTesting { + go time.AfterFunc(time.Minute, func() { + s.recoverActiveOrders(ctx, session) + }) + } }) // if TriggerPrice is zero, that means we need to open the grid when start up @@ -2117,12 +2134,16 @@ func (s *Strategy) newClientOrderID() string { return "" } -func (s *Strategy) handleConnect(ctx context.Context, session *bbgo.ExchangeSession) { +func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.ExchangeSession) { grid := s.getGrid() if grid == nil { return } + // this lock avoids recovering the active orders while the openGrid is executing + s.mu.Lock() + defer s.mu.Unlock() + // TODO: move this logics into the active maker orders component, like activeOrders.Sync(ctx) activeOrderBook := s.orderExecutor.ActiveMakerOrders() activeOrders := activeOrderBook.Orders() From 7de6c3d8e4bfe4eff80d857dc63cdf2fb9776a53 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 31 Aug 2023 13:59:44 +0800 Subject: [PATCH 2/4] grid2: add more update logs --- pkg/strategy/grid2/strategy.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 7f17cdcb9..b5e62e131 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -2135,6 +2135,8 @@ func (s *Strategy) newClientOrderID() string { } func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.ExchangeSession) { + s.logger.Infof("recovering active orders after websocket connect") + grid := s.getGrid() if grid == nil { return @@ -2147,7 +2149,14 @@ func (s *Strategy) recoverActiveOrders(ctx context.Context, session *bbgo.Exchan // TODO: move this logics into the active maker orders component, like activeOrders.Sync(ctx) activeOrderBook := s.orderExecutor.ActiveMakerOrders() activeOrders := activeOrderBook.Orders() + if len(activeOrders) == 0 { + return + } + + s.logger.Infof("found %d active orders to update...", len(activeOrders)) for _, o := range activeOrders { + s.logger.Infof("updating %d order...", o.OrderID) + var updatedOrder *types.Order err := retry.GeneralBackoff(ctx, func() error { var err error From f24bd3532cdfa413d859d5ee9eca09c0ed19e607 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 31 Aug 2023 14:08:19 +0800 Subject: [PATCH 3/4] grid2: add 5s delay and <10seconds jitter --- pkg/strategy/grid2/strategy.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index b5e62e131..5b34de3b7 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1966,7 +1966,9 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. session.UserDataStream.OnConnect(func() { if !bbgo.IsBackTesting { - go time.AfterFunc(time.Minute, func() { + // callback may block the stream execution, so we spawn the recover function to the background + // add (5 seconds + random <10 seconds jitter) delay + go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { s.recoverActiveOrders(ctx, session) }) } From e74da87e5111e642a913efae32648777c9c6f19a Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 31 Aug 2023 17:08:00 +0800 Subject: [PATCH 4/4] grid2: delay start process by 5s --- pkg/strategy/grid2/strategy.go | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/pkg/strategy/grid2/strategy.go b/pkg/strategy/grid2/strategy.go index 5b34de3b7..6a51513ba 100644 --- a/pkg/strategy/grid2/strategy.go +++ b/pkg/strategy/grid2/strategy.go @@ -1964,16 +1964,6 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. } }) - session.UserDataStream.OnConnect(func() { - if !bbgo.IsBackTesting { - // callback may block the stream execution, so we spawn the recover function to the background - // add (5 seconds + random <10 seconds jitter) delay - go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { - s.recoverActiveOrders(ctx, session) - }) - } - }) - // if TriggerPrice is zero, that means we need to open the grid when start up if s.TriggerPrice.IsZero() { // must call the openGrid method inside the OnStart callback because @@ -1985,13 +1975,25 @@ func (s *Strategy) Run(ctx context.Context, _ bbgo.OrderExecutor, session *bbgo. s.logger.Infof("user data stream started, initializing grid...") if !bbgo.IsBackTesting { - go s.startProcess(ctx, session) + go time.AfterFunc(3*time.Second, func() { + s.startProcess(ctx, session) + }) } else { s.startProcess(ctx, session) } }) } + session.UserDataStream.OnConnect(func() { + if !bbgo.IsBackTesting { + // callback may block the stream execution, so we spawn the recover function to the background + // add (5 seconds + random <10 seconds jitter) delay + go time.AfterFunc(util.MillisecondsJitter(5*time.Second, 1000*10), func() { + s.recoverActiveOrders(ctx, session) + }) + } + }) + return nil }