Merge pull request #1661 from c9s/c9s/improve-trade-batch-query

IMPROVE: [batch] improve trade batch query
This commit is contained in:
c9s 2024-06-20 17:05:58 +08:00 committed by GitHub
commit ee09922865
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 75 additions and 11 deletions

View File

@ -17,20 +17,23 @@ type TradeBatchQuery struct {
types.ExchangeTradeHistoryService types.ExchangeTradeHistoryService
} }
func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions, opts ...Option) (c chan types.Trade, errC chan error) { func (e TradeBatchQuery) Query(
ctx context.Context, symbol string, options *types.TradeQueryOptions, opts ...Option,
) (c chan types.Trade, errC chan error) {
if options.EndTime == nil { if options.EndTime == nil {
now := time.Now() now := time.Now()
options.EndTime = &now options.EndTime = &now
} }
startTime := *options.StartTime
endTime := *options.EndTime
query := &AsyncTimeRangedBatchQuery{ query := &AsyncTimeRangedBatchQuery{
Type: types.Trade{}, Type: types.Trade{},
Q: func(startTime, endTime time.Time) (interface{}, error) { Q: func(startTime, endTime time.Time) (interface{}, error) {
options.StartTime = &startTime return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
options.EndTime = &endTime StartTime: &startTime,
return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, options) EndTime: &endTime,
Limit: options.Limit,
LastTradeID: options.LastTradeID,
})
}, },
T: func(obj interface{}) time.Time { T: func(obj interface{}) time.Time {
return time.Time(obj.(types.Trade).Time) return time.Time(obj.(types.Trade).Time)
@ -40,9 +43,10 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type
if trade.ID > options.LastTradeID { if trade.ID > options.LastTradeID {
options.LastTradeID = trade.ID options.LastTradeID = trade.ID
} }
return trade.Key().String() return trade.Key().String()
}, },
JumpIfEmpty: 24 * time.Hour, JumpIfEmpty: 24*time.Hour - 5*time.Minute, // exchange may not have trades in the last 24 hours
} }
for _, opt := range opts { for _, opt := range opts {
@ -50,6 +54,6 @@ func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *type
} }
c = make(chan types.Trade, 100) c = make(chan types.Trade, 100)
errC = query.Query(ctx, c, startTime, endTime) errC = query.Query(ctx, c, *options.StartTime, *options.EndTime)
return c, errC return c, errC
} }

View File

@ -2,6 +2,7 @@ package batch
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"testing" "testing"
"time" "time"
@ -14,6 +15,49 @@ import (
"github.com/c9s/bbgo/pkg/types/mocks" "github.com/c9s/bbgo/pkg/types/mocks"
) )
func matchTradeQueryOptions(expected *types.TradeQueryOptions) *TradeQueryOptionsMatcher {
return &TradeQueryOptionsMatcher{
expected: expected,
}
}
type TradeQueryOptionsMatcher struct {
expected *types.TradeQueryOptions
}
func (m TradeQueryOptionsMatcher) Matches(arg interface{}) bool {
given, ok := arg.(*types.TradeQueryOptions)
if !ok {
return false
}
if given.StartTime != nil && m.expected.StartTime != nil {
if !given.StartTime.Equal(*m.expected.StartTime) {
return false
}
}
if given.EndTime != nil && m.expected.EndTime != nil {
if !given.EndTime.Equal(*m.expected.EndTime) {
return false
}
}
if given.Limit != m.expected.Limit {
return false
}
if given.LastTradeID != m.expected.LastTradeID {
return false
}
return true
}
func (m TradeQueryOptionsMatcher) String() string {
return fmt.Sprintf("%+v", m.expected)
}
func Test_TradeBatchQuery(t *testing.T) { func Test_TradeBatchQuery(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
@ -55,7 +99,7 @@ func Test_TradeBatchQuery(t *testing.T) {
mockExchange = mocks.NewMockExchangeTradeHistoryService(ctrl) mockExchange = mocks.NewMockExchangeTradeHistoryService(ctrl)
) )
mockExchange.EXPECT().QueryTrades(ctx, expSymbol, expOptions).DoAndReturn( mockExchange.EXPECT().QueryTrades(ctx, expSymbol, matchTradeQueryOptions(expOptions)).DoAndReturn(
func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) { func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) {
assert.Equal(t, startTime, *options.StartTime) assert.Equal(t, startTime, *options.StartTime)
assert.Equal(t, endTime, *options.EndTime) assert.Equal(t, endTime, *options.EndTime)
@ -63,7 +107,13 @@ func Test_TradeBatchQuery(t *testing.T) {
assert.Equal(t, expOptions.Limit, options.Limit) assert.Equal(t, expOptions.Limit, options.Limit)
return queryTrades1, nil return queryTrades1, nil
}).Times(1) }).Times(1)
mockExchange.EXPECT().QueryTrades(ctx, expSymbol, expOptions).DoAndReturn(
mockExchange.EXPECT().QueryTrades(ctx, expSymbol, matchTradeQueryOptions(&types.TradeQueryOptions{
StartTime: timePtr(queryTrades1[0].Time.Time()),
EndTime: expOptions.EndTime,
LastTradeID: 1,
Limit: 50,
})).DoAndReturn(
func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) { func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) {
assert.Equal(t, queryTrades1[0].Time.Time(), *options.StartTime) assert.Equal(t, queryTrades1[0].Time.Time(), *options.StartTime)
assert.Equal(t, endTime, *options.EndTime) assert.Equal(t, endTime, *options.EndTime)
@ -71,7 +121,13 @@ func Test_TradeBatchQuery(t *testing.T) {
assert.Equal(t, expOptions.Limit, options.Limit) assert.Equal(t, expOptions.Limit, options.Limit)
return queryTrades2, nil return queryTrades2, nil
}).Times(1) }).Times(1)
mockExchange.EXPECT().QueryTrades(ctx, expSymbol, expOptions).DoAndReturn(
mockExchange.EXPECT().QueryTrades(ctx, expSymbol, matchTradeQueryOptions(&types.TradeQueryOptions{
StartTime: timePtr(queryTrades2[1].Time.Time()),
EndTime: expOptions.EndTime,
LastTradeID: queryTrades2[1].ID,
Limit: 50,
})).DoAndReturn(
func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) { func(ctx context.Context, symbol string, options *types.TradeQueryOptions) ([]types.Trade, error) {
assert.Equal(t, queryTrades2[1].Time.Time(), *options.StartTime) assert.Equal(t, queryTrades2[1].Time.Time(), *options.StartTime)
assert.Equal(t, endTime, *options.EndTime) assert.Equal(t, endTime, *options.EndTime)
@ -138,3 +194,7 @@ func Test_TradeBatchQuery(t *testing.T) {
assert.Equal(t, rcvCount, 0) assert.Equal(t, rcvCount, 0)
}) })
} }
func timePtr(t time.Time) *time.Time {
return &t
}