diff --git a/pkg/core/tradecollector.go b/pkg/core/tradecollector.go index e844d35d5..a782a8126 100644 --- a/pkg/core/tradecollector.go +++ b/pkg/core/tradecollector.go @@ -89,6 +89,8 @@ func (c *TradeCollector) Emit() { } func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error { + logrus.Debugf("recovering %s trades...", symbol) + trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ StartTime: &from, }) @@ -98,8 +100,8 @@ func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHist } for _, td := range trades { - logrus.Debugf("processing trade: %s", td.String()) - if c.ProcessTrade(td) { + logrus.Debugf("checking trade: %s", td.String()) + if c.processTrade(td) { logrus.Infof("recovered trade: %s", td.String()) c.EmitRecover(td) } @@ -178,45 +180,33 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool { return false } - if c.orderStore.Exists(trade.OrderID) { - if c.position != nil { - profit, netProfit, madeProfit := c.position.AddTrade(trade) - if madeProfit { - p := c.position.NewProfit(trade, profit, netProfit) - c.EmitTrade(trade, profit, netProfit) - c.EmitProfit(trade, &p) - } else { - c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) - c.EmitProfit(trade, nil) - } - c.EmitPositionUpdate(c.position) + if !c.orderStore.Exists(trade.OrderID) { + return false + } + + if c.position != nil { + profit, netProfit, madeProfit := c.position.AddTrade(trade) + if madeProfit { + p := c.position.NewProfit(trade, profit, netProfit) + c.EmitTrade(trade, profit, netProfit) + c.EmitProfit(trade, &p) } else { c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) + c.EmitProfit(trade, nil) } - - c.doneTrades[key] = struct{}{} - return true + c.EmitPositionUpdate(c.position) + } else { + c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero) } - return false + + c.doneTrades[key] = struct{}{} + return true } // return true when the given trade is added // return false when the given trade is not added func (c *TradeCollector) ProcessTrade(trade types.Trade) bool { - key := trade.Key() - // if it's already done, remove the trade from the trade store - c.mu.Lock() - if _, done := c.doneTrades[key]; done { - return false - } - c.mu.Unlock() - - if c.processTrade(trade) { - return true - } - - c.tradeStore.Add(trade) - return false + return c.processTrade(trade) } // Run is a goroutine executed in the background diff --git a/pkg/strategy/xmaker/strategy.go b/pkg/strategy/xmaker/strategy.go index 2a2db7230..01953e6af 100644 --- a/pkg/strategy/xmaker/strategy.go +++ b/pkg/strategy/xmaker/strategy.go @@ -76,6 +76,11 @@ type Strategy struct { NotifyTrade bool `json:"notifyTrade"` + // RecoverTrade tries to find the missing trades via the REStful API + RecoverTrade bool `json:"recoverTrade"` + + RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"` + NumLayers int `json:"numLayers"` // Pips is the pips of the layer prices @@ -584,6 +589,38 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) { s.orderStore.Add(returnOrders...) } +func (s *Strategy) tradeRecover(ctx context.Context) { + tradeScanInterval := s.RecoverTradeScanPeriod.Duration() + if tradeScanInterval == 0 { + tradeScanInterval = 30 * time.Minute + } + + tradeScanTicker := time.NewTicker(tradeScanInterval) + defer tradeScanTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + + case <-tradeScanTicker.C: + log.Infof("scanning trades from %s ago...", tradeScanInterval) + + if s.RecoverTrade { + startTime := time.Now().Add(-tradeScanInterval) + + if err := s.tradeCollector.Recover(ctx, s.sourceSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") + } + + if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { + log.WithError(err).Errorf("query trades error") + } + } + } + } +} + func (s *Strategy) Validate() error { if s.Quantity.IsZero() || s.QuantityScale == nil { return errors.New("quantity or quantityScale can not be empty") @@ -779,6 +816,10 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order s.stopC = make(chan struct{}) + if s.RecoverTrade { + go s.tradeRecover(ctx) + } + go func() { posTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200)) defer posTicker.Stop() @@ -789,10 +830,6 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order reportTicker := time.NewTicker(time.Hour) defer reportTicker.Stop() - tradeScanInterval := 20 * time.Minute - tradeScanTicker := time.NewTicker(tradeScanInterval) - defer tradeScanTicker.Stop() - defer func() { if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil { log.WithError(err).Errorf("can not cancel %s orders", s.Symbol) @@ -816,13 +853,6 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order case <-reportTicker.C: bbgo.Notify(s.ProfitStats) - case <-tradeScanTicker.C: - log.Infof("scanning trades from %s ago...", tradeScanInterval) - startTime := time.Now().Add(-tradeScanInterval) - if err := s.tradeCollector.Recover(ctx, s.sourceSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil { - log.WithError(err).Errorf("query trades error") - } - case <-posTicker.C: // For positive position and positive covered position: // uncover position = +5 - +3 (covered position) = 2