Merge pull request #565 from c9s/fix/trade-sync

fix: service: correct QueryLast query
This commit is contained in:
Yo-An Lin 2022-04-27 12:40:05 +08:00 committed by GitHub
commit 14a29df975
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 18 additions and 7 deletions

View File

@ -73,13 +73,16 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
} }
} }
// records descending ordered, buffer 50 trades and use the trades ID to scan if the new trades are duplicated // buffer 50 trades and use the trades ID to scan if the new trades are duplicated
records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 50) records, err := s.QueryLast(exchange.Name(), symbol, isMargin, isFutures, isIsolated, 100)
if err != nil { if err != nil {
return err return err
} }
var tradeKeys = map[types.TradeKey]struct{}{} var tradeKeys = map[types.TradeKey]struct{}{}
// for exchange supports trade id query, we should always try to query from the first trade.
// 0 means disable.
var lastTradeID uint64 = 1 var lastTradeID uint64 = 1
var now = time.Now() var now = time.Now()
if len(records) > 0 { if len(records) > 0 {
@ -87,8 +90,10 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
tradeKeys[record.Key()] = struct{}{} tradeKeys[record.Key()] = struct{}{}
} }
lastTradeID = records[0].ID end := len(records) - 1
startTime = time.Time(records[0].Time) last := records[end]
lastTradeID = last.ID
startTime = last.Time.Time()
} }
b := &batch.TradeBatchQuery{Exchange: exchange} b := &batch.TradeBatchQuery{Exchange: exchange}
@ -283,7 +288,7 @@ func generateMysqlTradingVolumeQuerySQL(options TradingVolumeQueryOptions) strin
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Trade, error) { func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin, isFutures, isIsolated bool, limit int) ([]types.Trade, error) {
log.Debugf("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated) log.Debugf("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_futures = %v AND is_isolated = %v", ex, symbol, isMargin, isFutures, isIsolated)
sql := "SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_futures = :is_futures AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT :limit" sql := "SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_futures = :is_futures AND is_isolated = :is_isolated ORDER BY traded_at DESC LIMIT :limit"
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"symbol": symbol, "symbol": symbol,
"exchange": ex, "exchange": ex,
@ -298,7 +303,13 @@ func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin,
defer rows.Close() defer rows.Close()
return s.scanRows(rows) trades, err := s.scanRows(rows)
if err != nil {
return nil, err
}
trades = types.SortTradesAscending(trades)
return trades, nil
} }
func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol string, feeCurrency string) ([]types.Trade, error) { func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol string, feeCurrency string) ([]types.Trade, error) {

View File

@ -237,7 +237,7 @@ func (o Order) String() string {
orderID = strconv.FormatUint(o.OrderID, 10) orderID = strconv.FormatUint(o.OrderID, 10)
} }
return fmt.Sprintf("ORDER %s | %s | %s | %s %s | %s/%s @ %s | %s", return fmt.Sprintf("ORDER %s | %s | %s | %s %-4s | %s/%s @ %s | %s",
o.Exchange.String(), o.Exchange.String(),
o.CreationTime.Time().Local().Format(time.RFC1123), o.CreationTime.Time().Local().Format(time.RFC1123),
orderID, orderID,