xmaker: pull out trade recover go routine

This commit is contained in:
c9s 2023-07-22 17:29:16 +08:00
parent 8c02383df4
commit 941067670e
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 63 additions and 43 deletions

View File

@ -89,6 +89,8 @@ func (c *TradeCollector) Emit() {
}
func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error {
logrus.Debugf("recovering %s trades...", symbol)
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &from,
})
@ -98,8 +100,8 @@ func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHist
}
for _, td := range trades {
logrus.Debugf("processing trade: %s", td.String())
if c.ProcessTrade(td) {
logrus.Debugf("checking trade: %s", td.String())
if c.processTrade(td) {
logrus.Infof("recovered trade: %s", td.String())
c.EmitRecover(td)
}
@ -178,45 +180,33 @@ func (c *TradeCollector) processTrade(trade types.Trade) bool {
return false
}
if c.orderStore.Exists(trade.OrderID) {
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
c.EmitPositionUpdate(c.position)
if !c.orderStore.Exists(trade.OrderID) {
return false
}
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
c.doneTrades[key] = struct{}{}
return true
c.EmitPositionUpdate(c.position)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
}
return false
c.doneTrades[key] = struct{}{}
return true
}
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
key := trade.Key()
// if it's already done, remove the trade from the trade store
c.mu.Lock()
if _, done := c.doneTrades[key]; done {
return false
}
c.mu.Unlock()
if c.processTrade(trade) {
return true
}
c.tradeStore.Add(trade)
return false
return c.processTrade(trade)
}
// Run is a goroutine executed in the background

View File

@ -76,6 +76,11 @@ type Strategy struct {
NotifyTrade bool `json:"notifyTrade"`
// RecoverTrade tries to find the missing trades via the REStful API
RecoverTrade bool `json:"recoverTrade"`
RecoverTradeScanPeriod types.Duration `json:"recoverTradeScanPeriod"`
NumLayers int `json:"numLayers"`
// Pips is the pips of the layer prices
@ -584,6 +589,38 @@ func (s *Strategy) Hedge(ctx context.Context, pos fixedpoint.Value) {
s.orderStore.Add(returnOrders...)
}
func (s *Strategy) tradeRecover(ctx context.Context) {
tradeScanInterval := s.RecoverTradeScanPeriod.Duration()
if tradeScanInterval == 0 {
tradeScanInterval = 30 * time.Minute
}
tradeScanTicker := time.NewTicker(tradeScanInterval)
defer tradeScanTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval)
if s.RecoverTrade {
startTime := time.Now().Add(-tradeScanInterval)
if err := s.tradeCollector.Recover(ctx, s.sourceSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
if err := s.tradeCollector.Recover(ctx, s.makerSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
}
}
}
}
func (s *Strategy) Validate() error {
if s.Quantity.IsZero() || s.QuantityScale == nil {
return errors.New("quantity or quantityScale can not be empty")
@ -779,6 +816,10 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
s.stopC = make(chan struct{})
if s.RecoverTrade {
go s.tradeRecover(ctx)
}
go func() {
posTicker := time.NewTicker(util.MillisecondsJitter(s.HedgeInterval.Duration(), 200))
defer posTicker.Stop()
@ -789,10 +830,6 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
reportTicker := time.NewTicker(time.Hour)
defer reportTicker.Stop()
tradeScanInterval := 20 * time.Minute
tradeScanTicker := time.NewTicker(tradeScanInterval)
defer tradeScanTicker.Stop()
defer func() {
if err := s.activeMakerOrders.GracefulCancel(context.Background(), s.makerSession.Exchange); err != nil {
log.WithError(err).Errorf("can not cancel %s orders", s.Symbol)
@ -816,13 +853,6 @@ func (s *Strategy) CrossRun(ctx context.Context, orderExecutionRouter bbgo.Order
case <-reportTicker.C:
bbgo.Notify(s.ProfitStats)
case <-tradeScanTicker.C:
log.Infof("scanning trades from %s ago...", tradeScanInterval)
startTime := time.Now().Add(-tradeScanInterval)
if err := s.tradeCollector.Recover(ctx, s.sourceSession.Exchange.(types.ExchangeTradeHistoryService), s.Symbol, startTime); err != nil {
log.WithError(err).Errorf("query trades error")
}
case <-posTicker.C:
// For positive position and positive covered position:
// uncover position = +5 - +3 (covered position) = 2