batch: improve trade batch query

This commit is contained in:
c9s 2024-06-19 17:35:38 +08:00
parent 6be38558e4
commit df125c0efb
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54

View File

@ -17,20 +17,22 @@ type TradeBatchQuery struct {
types.ExchangeTradeHistoryService types.ExchangeTradeHistoryService
} }
func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions, opts ...Option) (c chan types.Trade, errC chan error) { func (e TradeBatchQuery) Query(
ctx context.Context, symbol string, options *types.TradeQueryOptions, opts ...Option,
) (c chan types.Trade, errC chan error) {
if options.EndTime == nil { if options.EndTime == nil {
now := time.Now() now := time.Now()
options.EndTime = &now options.EndTime = &now
} }
startTime := *options.StartTime
endTime := *options.EndTime
query := &AsyncTimeRangedBatchQuery{ query := &AsyncTimeRangedBatchQuery{
Type: types.Trade{}, Type: types.Trade{},
Q: func(startTime, endTime time.Time) (interface{}, error) { Q: func(startTime, endTime time.Time) (interface{}, error) {
options.StartTime = &startTime return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
options.EndTime = &endTime StartTime: &startTime,
return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, options) EndTime: &endTime,
LastTradeID: options.LastTradeID,
})
}, },
T: func(obj interface{}) time.Time { T: func(obj interface{}) time.Time {
return time.Time(obj.(types.Trade).Time) return time.Time(obj.(types.Trade).Time)
@ -40,9 +42,10 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type
if trade.ID > options.LastTradeID { if trade.ID > options.LastTradeID {
options.LastTradeID = trade.ID options.LastTradeID = trade.ID
} }
return trade.Key().String() return trade.Key().String()
}, },
JumpIfEmpty: 24 * time.Hour, JumpIfEmpty: 23 * time.Hour, // exchange may not have trades in the last 24 hours
} }
for _, opt := range opts { for _, opt := range opts {
@ -50,6 +53,6 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type
} }
c = make(chan types.Trade, 100) c = make(chan types.Trade, 100)
errC = query.Query(ctx, c, startTime, endTime) errC = query.Query(ctx, c, *options.StartTime, *options.EndTime)
return c, errC return c, errC
} }