From 3a89b0a7141abc7c7a7a06e81efd3291eba7296e Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 18 Feb 2021 18:20:18 +0800 Subject: [PATCH] improve trade sync --- pkg/exchange/batch/batch.go | 16 +++++++++++++++- pkg/exchange/binance/exchange.go | 9 ++++----- pkg/exchange/binance/trade.go | 22 ---------------------- pkg/exchange/max/exchange.go | 2 -- pkg/service/sync.go | 24 ++++++++++++++++++++---- pkg/service/trade.go | 20 ++++++-------------- pkg/types/batch.go | 2 -- pkg/types/trade.go | 19 +++++++++++++++++++ 8 files changed, 64 insertions(+), 50 deletions(-) delete mode 100644 pkg/exchange/binance/trade.go delete mode 100644 pkg/types/batch.go diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go index 319fa6d04..4b175386f 100644 --- a/pkg/exchange/batch/batch.go +++ b/pkg/exchange/batch/batch.go @@ -130,6 +130,8 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str defer close(c) defer close(errC) + var tradeKeys = map[types.TradeKey]struct{}{} + for { if err := limiter.Wait(ctx); err != nil { logrus.WithError(err).Error("rate limit error") @@ -155,12 +157,24 @@ func (e ExchangeBatchProcessor) BatchQueryTrades(ctx context.Context, symbol str break } - logrus.Infof("returned %d trades", len(trades)) + end := len(trades) - 1 + if _, exists := tradeKeys[trades[end].Key()]; exists { + break + } + + logrus.Debugf("returned %d trades", len(trades)) // increase the window to the next time frame by adding 1 millisecond startTime = time.Time(trades[len(trades)-1].Time) for _, t := range trades { + key := t.Key() + if _, ok := tradeKeys[key]; ok { + logrus.Debugf("ignore duplicated trade: %+v", key) + continue + } + lastTradeID = t.ID + tradeKeys[key] = struct{}{} // ignore the first trade if last TradeID is given c <- t diff --git a/pkg/exchange/binance/exchange.go b/pkg/exchange/binance/exchange.go index b62b8eeae..531492bff 100644 --- a/pkg/exchange/binance/exchange.go +++ b/pkg/exchange/binance/exchange.go @@ -712,9 +712,9 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond)) } - // BINANCE uses inclusive last trade ID, so we need to add by 1 + // BINANCE uses inclusive last trade ID if options.LastTradeID > 0 { - req.FromID(options.LastTradeID + 1) + req.FromID(options.LastTradeID) } remoteTrades, err = req.Do(ctx) @@ -738,9 +738,9 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type req.EndTime(options.EndTime.UnixNano() / int64(time.Millisecond)) } - // BINANCE uses inclusive last trade ID, so we need to add by 1 + // BINANCE uses inclusive last trade ID if options.LastTradeID > 0 { - req.FromID(options.LastTradeID + 1) + req.FromID(options.LastTradeID) } remoteTrades, err = req.Do(ctx) @@ -756,7 +756,6 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type continue } - log.Infof("trade: %d %s % 4s price: % 13s volume: % 11s %6s % 5s %s", t.ID, t.Symbol, localTrade.Side, t.Price, t.Quantity, BuyerOrSellerLabel(t), MakerOrTakerLabel(t), localTrade.Time) trades = append(trades, *localTrade) } diff --git a/pkg/exchange/binance/trade.go b/pkg/exchange/binance/trade.go deleted file mode 100644 index ee8f0e66e..000000000 --- a/pkg/exchange/binance/trade.go +++ /dev/null @@ -1,22 +0,0 @@ -package binance - -import "github.com/adshao/go-binance/v2" - -func BuyerOrSellerLabel(trade *binance.TradeV3) (o string) { - if trade.IsBuyer { - o = "BUYER" - } else { - o = "SELLER" - } - return o -} - -func MakerOrTakerLabel(trade *binance.TradeV3) (o string) { - if trade.IsMaker { - o += "MAKER" - } else { - o += "TAKER" - } - - return o -} diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index da4de1e9c..d85882c80 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -551,8 +551,6 @@ func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *type continue } - logger.Infof("T: %d %7s %4s P=%f Q=%f %s", localTrade.ID, localTrade.Symbol, localTrade.Side, localTrade.Price, localTrade.Quantity, localTrade.Time) - trades = append(trades, *localTrade) } diff --git a/pkg/service/sync.go b/pkg/service/sync.go index fd8432216..f83ed2a5e 100644 --- a/pkg/service/sync.go +++ b/pkg/service/sync.go @@ -77,16 +77,23 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s } } - lastTrade, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated) + lastTrades, err := s.TradeService.QueryLast(exchange.Name(), symbol, isMargin, isIsolated, 10) if err != nil { return err } + var tradeKeys = map[types.TradeKey]struct{}{} var lastTradeID int64 = 0 - if lastTrade != nil { - startTime = time.Time(lastTrade.Time).Add(time.Millisecond) + if len(lastTrades) > 0 { + for _, t := range lastTrades { + tradeKeys[t.Key()] = struct{}{} + } + + lastTrade := lastTrades[len(lastTrades)-1] lastTradeID = lastTrade.ID - logrus.Infof("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime) + + startTime = time.Time(lastTrade.Time) + logrus.Debugf("found last trade, start from lastID = %d since %s", lastTrade.ID, startTime) } batch := &batch2.ExchangeBatchProcessor{Exchange: exchange} @@ -108,6 +115,15 @@ func (s *SyncService) SyncTrades(ctx context.Context, exchange types.Exchange, s default: } + key := trade.Key() + if _, ok := tradeKeys[key]; ok { + continue + } + + tradeKeys[key] = struct{}{} + + logrus.Infof("inserting trade: %d %s %-4s price: %-13f volume: %-11f %5s %s", trade.ID, trade.Symbol, trade.Side, trade.Price, trade.Quantity, trade.MakerOrTakerLabel(), trade.Time.String()) + if err := s.TradeService.Insert(trade); err != nil { return err } diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 4e43417f5..d436d7f41 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -188,32 +188,24 @@ func generateMysqlTradingVolumeQuerySQL(options TradingVolumeQueryOptions) strin } // QueryLast queries the last trade from the database -func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool) (*types.Trade, error) { - log.Infof("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated) +func (s *TradeService) QueryLast(ex types.ExchangeName, symbol string, isMargin bool, isIsolated bool, limit int) ([]types.Trade, error) { + log.Debugf("querying last trade exchange = %s AND symbol = %s AND is_margin = %v AND is_isolated = %v", ex, symbol, isMargin, isIsolated) - rows, err := s.DB.NamedQuery(`SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT 1`, map[string]interface{}{ + sql := `SELECT * FROM trades WHERE exchange = :exchange AND symbol = :symbol AND is_margin = :is_margin AND is_isolated = :is_isolated ORDER BY gid DESC LIMIT :limit` + rows, err := s.DB.NamedQuery(sql, map[string]interface{}{ "symbol": symbol, "exchange": ex, "is_margin": isMargin, "is_isolated": isIsolated, + "limit": limit, }) if err != nil { return nil, errors.Wrap(err, "query last trade error") } - if rows.Err() != nil { - return nil, rows.Err() - } - defer rows.Close() - if rows.Next() { - var trade types.Trade - err = rows.StructScan(&trade) - return &trade, err - } - - return nil, rows.Err() + return s.scanRows(rows) } func (s *TradeService) QueryForTradingFeeCurrency(ex types.ExchangeName, symbol string, feeCurrency string) ([]types.Trade, error) { diff --git a/pkg/types/batch.go b/pkg/types/batch.go deleted file mode 100644 index 5bb13ecf9..000000000 --- a/pkg/types/batch.go +++ /dev/null @@ -1,2 +0,0 @@ -package types - diff --git a/pkg/types/trade.go b/pkg/types/trade.go index 5505cc633..0d8ce7fde 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -98,3 +98,22 @@ func (trade Trade) SlackAttachment() slack.Attachment { // FooterIcon: "", } } + +func (trade Trade) MakerOrTakerLabel() (o string) { + if trade.IsMaker { + o += "MAKER" + } else { + o += "TAKER" + } + + return o +} + +func (trade Trade) Key() TradeKey { + return TradeKey{ID: trade.ID, Side: trade.Side} +} + +type TradeKey struct { + ID int64 + Side SideType +}