improve trade sync

This commit is contained in:
c9s 2021-02-18 18:20:18 +08:00
parent 654ad62f36
commit 3a89b0a714
8 changed files with 64 additions and 50 deletions

View File

@ -130,6 +130,8 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
defer close(c)
defer close(errC)
var tradeKeys = map[types.TradeKey]struct{}{}
for {
if err := limiter.Wait(ctx); err != nil {
logrus.WithError(err).Error("rate limit error")
@ -155,12 +157,24 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str
break
}
logrus.Infof("returned %d trades", len(trades))
end := len(trades) - 1
if _, exists := tradeKeys[trades[end].Key()]; exists {
break
}
logrus.Debugf("returned %d trades", len(trades))
// increase the window to the next time frame by adding 1 millisecond
startTime = time.Time(trades[len(trades)-1].Time)
for _, t := range trades {
key := t.Key()
if _, ok := tradeKeys[key]; ok {
logrus.Debugf("ignore duplicated trade: %+v", key)
continue
}
lastTradeID = t.ID
tradeKeys[key] = struct{}{}
// ignore the first trade if last TradeID is given
c <- t

View File

@ -712,9 +712,9 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond))
}
// BINANCE uses inclusive last trade ID, so we need to add by 1
// BINANCE uses inclusive last trade ID
if options.LastTradeID > 0 {
req.FromID(options.LastTradeID + 1)
req.FromID(options.LastTradeID)
}
remoteTrades, err = req.Do(ctx)
@ -738,9 +738,9 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond))
}
// BINANCE uses inclusive last trade ID, so we need to add by 1
// BINANCE uses inclusive last trade ID
if options.LastTradeID > 0 {
req.FromID(options.LastTradeID + 1)
req.FromID(options.LastTradeID)
}
remoteTrades, err = req.Do(ctx)
@ -756,7 +756,6 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
continue
}
log.Infof("trade: %d %s % 4s price: % 13s volume: % 11s %6s % 5s %s", t.ID, t.Symbol, localTrade.Side, t.Price, t.Quantity, BuyerOrSellerLabel(t), MakerOrTakerLabel(t), localTrade.Time)
trades = append(trades, *localTrade)
}

View File

@ -1,22 +0,0 @@
package binance
import "github.com/adshao/go-binance/v2"
func BuyerOrSellerLabel(trade *binance.TradeV3) (o string) {
if trade.IsBuyer {
o = "BUYER"
} else {
o = "SELLER"
}
return o
}
func MakerOrTakerLabel(trade *binance.TradeV3) (o string) {
if trade.IsMaker {
o += "MAKER"
} else {
o += "TAKER"
}
return o
}

View File

@ -551,8 +551,6 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type
continue
}
logger.Infof("T: %d %7s %4s P=%f Q=%f %s", localTrade.ID, localTrade.Symbol, localTrade.Side, localTrade.Price, localTrade.Quantity, localTrade.Time)
trades = append(trades, *localTrade)
}

View File

@ -77,16 +77,23 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
}
}
lastTrade, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated)
lastTrades, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 10)
if err != nil {
return err
}
var tradeKeys = map[types.TradeKey]struct{}{}
var lastTradeID int64 = 0
if lastTrade != nil {
startTime = time.Time(lastTrade.Time).Add(time.Millisecond)
if len(lastTrades) > 0 {
for _, t := range lastTrades {
tradeKeys[t.Key()] = struct{}{}
}
lastTrade := lastTrades[len(lastTrades)-1]
lastTradeID = lastTrade.ID
logrus.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
startTime = time.Time(lastTrade.Time)
logrus.Debugf("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime)
}
batch := &batch2.ExchangeBatchProcessor{Exchange: exchange}
@ -108,6 +115,15 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s
default:
}
key := trade.Key()
if _, ok := tradeKeys[key]; ok {
continue
}
tradeKeys[key] = struct{}{}
logrus.Infof("inserting trade: %d %s %-4s price: %-13f volume: %-11f %5s %s", trade.ID, trade.Symbol, trade.Side, trade.Price, trade.Quantity, trade.MakerOrTakerLabel(), trade.Time.String())
if err := s.TradeService.Insert(trade); err != nil {
return err
}

View File

@ -188,32 +188,24 @@ func generateMysqlTradingVolumeQuerySQL(options TradingVolumeQueryOptions) strin
}
// QueryLast queries the last trade from the database
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool) (*types.Trade, error) {
log.Infof("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated)
func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool, limit int) ([]types.Trade, error) {
log.Debugf("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated)
rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT 1`, map[string]interface{}{
sql := `SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT :limit`
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"symbol": symbol,
"exchange": ex,
"is_margin": isMargin,
"is_isolated": isIsolated,
"limit": limit,
})
if err != nil {
return nil, errors.Wrap(err, "query last trade error")
}
if rows.Err() != nil {
return nil, rows.Err()
}
defer rows.Close()
if rows.Next() {
var trade types.Trade
err = rows.StructScan(&trade)
return &trade, err
}
return nil, rows.Err()
return s.scanRows(rows)
}
func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol string, feeCurrency string) ([]types.Trade, error) {

View File

@ -1,2 +0,0 @@
package types

View File

@ -98,3 +98,22 @@ func (trade Trade) SlackAttachment() slack.Attachment {
// FooterIcon: "",
}
}
func (trade Trade) MakerOrTakerLabel() (o string) {
if trade.IsMaker {
o += "MAKER"
} else {
o += "TAKER"
}
return o
}
func (trade Trade) Key() TradeKey {
return TradeKey{ID: trade.ID, Side: trade.Side}
}
type TradeKey struct {
ID int64
Side SideType
}