From e66eb08db44e59b7b8d8a4f884d8d95f3a471123 Mon Sep 17 00:00:00 2001 From: c9s Date: Tue, 31 May 2022 00:59:33 +0800 Subject: [PATCH] batch: refactor batch query --- config/sync.yaml | 4 + pkg/exchange/batch/batch.go | 169 -------------------- pkg/exchange/batch/batch_test.go | 2 + pkg/exchange/batch/closedorders.go | 98 +++--------- pkg/exchange/batch/kline.go | 38 +++++ pkg/exchange/batch/reward.go | 34 ++++ pkg/exchange/batch/time_range_query.go | 115 +++++++++++++ pkg/exchange/batch/time_range_query_test.go | 45 ++++++ pkg/exchange/batch/trade.go | 122 ++++---------- pkg/exchange/binance/stream_test.go | 58 ------- pkg/exchange/ftx/stream_test.go | 89 ----------- pkg/exchange/max/exchange.go | 2 +- pkg/service/backtest.go | 43 +---- pkg/service/order.go | 9 +- pkg/service/trade.go | 12 +- pkg/types/sort.go | 10 +- pkg/types/trade.go | 4 + 17 files changed, 325 insertions(+), 529 deletions(-) delete mode 100644 pkg/exchange/batch/batch.go create mode 100644 pkg/exchange/batch/batch_test.go create mode 100644 pkg/exchange/batch/kline.go create mode 100644 pkg/exchange/batch/reward.go create mode 100644 pkg/exchange/batch/time_range_query.go create mode 100644 pkg/exchange/batch/time_range_query_test.go delete mode 100644 pkg/exchange/binance/stream_test.go delete mode 100644 pkg/exchange/ftx/stream_test.go diff --git a/config/sync.yaml b/config/sync.yaml index b1cba855c..ec31dd986 100644 --- a/config/sync.yaml +++ b/config/sync.yaml @@ -30,3 +30,7 @@ sync: - BTCUSDT - ETHUSDT - LINKUSDT + + depositHistory: true + rewardHistory: true + withdrawHistory: true diff --git a/pkg/exchange/batch/batch.go b/pkg/exchange/batch/batch.go deleted file mode 100644 index dafa90500..000000000 --- a/pkg/exchange/batch/batch.go +++ /dev/null @@ -1,169 +0,0 @@ -package batch - -import ( - "context" - "sort" - "time" - - "github.com/pkg/errors" - - "github.com/sirupsen/logrus" - "golang.org/x/time/rate" - - "github.com/c9s/bbgo/pkg/types" -) - -var log = logrus.WithField("component", "batch") - -type KLineBatchQuery struct { - types.Exchange -} - -func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan []types.KLine, errC chan error) { - c = make(chan []types.KLine, 1000) - errC = make(chan error, 1) - - go func() { - defer close(c) - defer close(errC) - - var tryQueryKlineTimes = 0 - for startTime.Before(endTime) { - log.Debugf("batch query klines %s %s %s <=> %s", symbol, interval, startTime, endTime) - - kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ - StartTime: &startTime, - EndTime: &endTime, - }) - - if err != nil { - errC <- err - return - } - - // ensure the kline is in the right order - sort.Slice(kLines, func(i, j int) bool { - return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix() - }) - - if len(kLines) == 0 { - return - } - - tryQueryKlineTimes++ - const BatchSize = 200 - - var batchKLines = make([]types.KLine, 0, BatchSize) - for _, kline := range kLines { - // ignore any kline before the given start time of the batch query - if kline.StartTime.Before(startTime) { - continue - } - - // if there is a kline after the endTime of the batch query, it means the data is out of scope, we should exit - if kline.StartTime.After(endTime) || kline.EndTime.After(endTime) { - if len(batchKLines) != 0 { - c <- batchKLines - batchKLines = nil - } - return - } - - batchKLines = append(batchKLines, kline) - - if len(batchKLines) == BatchSize { - c <- batchKLines - batchKLines = nil - } - - // The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever. - // (above comment was written by @tony1223) - startTime = kline.EndTime.Time() - tryQueryKlineTimes = 0 - } - - // push the rest klines in the buffer - if len(batchKLines) > 0 { - c <- batchKLines - batchKLines = nil - } - - if tryQueryKlineTimes > 10 { // it means loop 10 times - errC <- errors.Errorf("there's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime:%s ", symbol, interval, startTime.String()) - return - } - } - }() - - return c, errC -} - -type RewardBatchQuery struct { - Service types.ExchangeRewardService -} - -func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) { - c = make(chan types.Reward, 500) - errC = make(chan error, 1) - - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - lastID := "" - rewardKeys := make(map[string]struct{}, 500) - - for startTime.Before(endTime) { - if err := limiter.Wait(ctx); err != nil { - log.WithError(err).Error("rate limit error") - } - - log.Infof("batch querying rewards %s <=> %s", startTime, endTime) - - rewards, err := q.Service.QueryRewards(ctx, startTime) - if err != nil { - errC <- err - return - } - - // empty data - if len(rewards) == 0 { - return - } - - // there is no new data - if len(rewards) == 1 && rewards[0].UUID == lastID { - return - } - - newCnt := 0 - for _, o := range rewards { - if _, ok := rewardKeys[o.UUID]; ok { - continue - } - - if o.CreatedAt.Time().After(endTime) { - // stop batch query - return - } - - newCnt++ - c <- o - rewardKeys[o.UUID] = struct{}{} - } - - if newCnt == 0 { - return - } - - end := len(rewards) - 1 - startTime = rewards[end].CreatedAt.Time() - lastID = rewards[end].UUID - } - - }() - - return c, errC -} diff --git a/pkg/exchange/batch/batch_test.go b/pkg/exchange/batch/batch_test.go new file mode 100644 index 000000000..720b9c4f1 --- /dev/null +++ b/pkg/exchange/batch/batch_test.go @@ -0,0 +1,2 @@ +package batch + diff --git a/pkg/exchange/batch/closedorders.go b/pkg/exchange/batch/closedorders.go index e0f78867c..b648acc7d 100644 --- a/pkg/exchange/batch/closedorders.go +++ b/pkg/exchange/batch/closedorders.go @@ -2,93 +2,39 @@ package batch import ( "context" - "sort" + "strconv" "time" - "github.com/sirupsen/logrus" "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/types" ) type ClosedOrderBatchQuery struct { - types.Exchange + types.ExchangeTradeHistoryService } -func (e ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) { - c = make(chan types.Order, 500) - errC = make(chan error, 1) - - tradeHistoryService, ok := e.Exchange.(types.ExchangeTradeHistoryService) - if !ok { - defer close(c) - defer close(errC) - // skip exchanges that does not support trading history services - logrus.Warnf("exchange %s does not implement ExchangeTradeHistoryService, skip syncing closed orders (ClosedOrderBatchQuery.Query) ", e.Exchange.Name()) - return c, errC +func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) { + query := &AsyncTimeRangedBatchQuery{ + Type: types.Order{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Q: func(startTime, endTime time.Time) (interface{}, error) { + orders, err := q.ExchangeTradeHistoryService.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID) + return orders, err + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.Order).CreationTime) + }, + ID: func(obj interface{}) string { + order := obj.(types.Order) + if order.OrderID > lastOrderID { + lastOrderID = order.OrderID + } + return strconv.FormatUint(order.OrderID, 10) + }, } - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - orderIDs := make(map[uint64]struct{}, 500) - if lastOrderID > 0 { - orderIDs[lastOrderID] = struct{}{} - } - - for startTime.Before(endTime) { - if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime) - - orders, err := tradeHistoryService.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID) - if err != nil { - errC <- err - return - } - for _, o := range orders { - logrus.Infof("%+v", o) - } - - if len(orders) == 0 { - return - } else if len(orders) > 0 { - allExists := true - for _, o := range orders { - if _, exists := orderIDs[o.OrderID]; !exists { - allExists = false - break - } - } - if allExists { - return - } - } - - // sort orders by time in ascending order - sort.Slice(orders, func(i, j int) bool { - return orders[i].CreationTime.Before(time.Time(orders[j].CreationTime)) - }) - - for _, o := range orders { - if _, ok := orderIDs[o.OrderID]; ok { - continue - } - - c <- o - startTime = o.CreationTime.Time() - lastOrderID = o.OrderID - orderIDs[o.OrderID] = struct{}{} - } - } - - }() - + c = make(chan types.Order, 100) + errC = query.Query(ctx, c, startTime, endTime) return c, errC } - diff --git a/pkg/exchange/batch/kline.go b/pkg/exchange/batch/kline.go new file mode 100644 index 000000000..30ef7b767 --- /dev/null +++ b/pkg/exchange/batch/kline.go @@ -0,0 +1,38 @@ +package batch + +import ( + "context" + "time" + + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/types" +) + +type KLineBatchQuery struct { + types.Exchange +} + +func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) { + query := &AsyncTimeRangedBatchQuery{ + Type: types.KLine{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Q: func(startTime, endTime time.Time) (interface{}, error) { + return e.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{ + StartTime: &startTime, + EndTime: &endTime, + }) + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.KLine).StartTime) + }, + ID: func(obj interface{}) string { + kline := obj.(types.KLine) + return kline.StartTime.String() + }, + } + + c = make(chan types.KLine, 100) + errC = query.Query(ctx, c, startTime, endTime) + return c, errC +} diff --git a/pkg/exchange/batch/reward.go b/pkg/exchange/batch/reward.go new file mode 100644 index 000000000..07d39a11d --- /dev/null +++ b/pkg/exchange/batch/reward.go @@ -0,0 +1,34 @@ +package batch + +import ( + "context" + "time" + + "golang.org/x/time/rate" + + "github.com/c9s/bbgo/pkg/types" +) + +type RewardBatchQuery struct { + Service types.ExchangeRewardService +} + +func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) { + query := &AsyncTimeRangedBatchQuery{ + Type: types.Reward{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Q: func(startTime, endTime time.Time) (interface{}, error) { + return q.Service.QueryRewards(ctx, startTime) + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.Reward).CreatedAt) + }, + ID: func(obj interface{}) string { + return obj.(types.Reward).UUID + }, + } + + c = make(chan types.Reward, 500) + errC = query.Query(ctx, c, startTime, endTime) + return c, errC +} diff --git a/pkg/exchange/batch/time_range_query.go b/pkg/exchange/batch/time_range_query.go new file mode 100644 index 000000000..954ace647 --- /dev/null +++ b/pkg/exchange/batch/time_range_query.go @@ -0,0 +1,115 @@ +package batch + +import ( + "context" + "reflect" + "sort" + "time" + + "github.com/sirupsen/logrus" + "golang.org/x/time/rate" +) + +var log = logrus.WithField("component", "batch") + +type AsyncTimeRangedBatchQuery struct { + // Type is the object type of the result + Type interface{} + + // Limiter is the rate limiter for each query + Limiter *rate.Limiter + + // Q is the remote query function + Q func(startTime, endTime time.Time) (interface{}, error) + + // T function returns time of an object + T func(obj interface{}) time.Time + + // ID returns the ID of the object + ID func(obj interface{}) string + + // JumpIfEmpty jump the startTime + duration when the result is empty + JumpIfEmpty time.Duration +} + +func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, startTime, endTime time.Time) chan error { + errC := make(chan error, 1) + cRef := reflect.ValueOf(ch) + // cRef := reflect.MakeChan(reflect.TypeOf(q.Type), 100) + + go func() { + defer cRef.Close() + defer close(errC) + + idMap := make(map[string]struct{}, 100) + for startTime.Before(endTime) { + if q.Limiter != nil { + if err := q.Limiter.Wait(ctx); err != nil { + errC <- err + return + } + } + + log.Debugf("batch querying %T: %v <=> %v", q.Type, startTime, endTime) + + sliceInf, err := q.Q(startTime, endTime) + if err != nil { + errC <- err + return + } + + listRef := reflect.ValueOf(sliceInf) + listLen := listRef.Len() + + if listLen == 0 { + if q.JumpIfEmpty > 0 { + startTime = startTime.Add(q.JumpIfEmpty) + continue + } + + return + } + + // sort by time + sort.Slice(listRef.Interface(), func(i, j int) bool { + a := listRef.Index(i) + b := listRef.Index(j) + tA := q.T(a.Interface()) + tB := q.T(b.Interface()) + return tA.Before(tB) + }) + + sentAny := false + for i := 0; i < listLen; i++ { + item := listRef.Index(i) + entryTime := q.T(item.Interface()) + + if entryTime.Before(startTime) { + continue + } + if entryTime.After(endTime) { + continue + } + + obj := item.Interface() + id := q.ID(obj) + if _, exists := idMap[id]; exists { + log.Debugf("batch querying %T: duplicated id %s", q.Type, id) + continue + } + + idMap[id] = struct{}{} + + cRef.Send(item) + sentAny = true + startTime = entryTime + } + + if !sentAny { + return + } + } + }() + + return errC +} diff --git a/pkg/exchange/batch/time_range_query_test.go b/pkg/exchange/batch/time_range_query_test.go new file mode 100644 index 000000000..e3d6634e0 --- /dev/null +++ b/pkg/exchange/batch/time_range_query_test.go @@ -0,0 +1,45 @@ +package batch + +import ( + "context" + "strconv" + "testing" + "time" +) + +func Test_TimeRangedQuery(t *testing.T) { + startTime := time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC) + endTime := time.Date(2021, time.January, 2, 0, 0, 0, 0, time.UTC) + q := &AsyncTimeRangedBatchQuery{ + Type: time.Time{}, + T: func(obj interface{}) time.Time { + return obj.(time.Time) + }, + ID: func(obj interface{}) string { + return strconv.FormatInt(obj.(time.Time).UnixMilli(), 10) + }, + Q: func(startTime, endTime time.Time) (interface{}, error) { + var cnt = 0 + var data []time.Time + for startTime.Before(endTime) && cnt < 5 { + d := startTime + data = append(data, d) + cnt++ + startTime = startTime.Add(time.Minute) + } + t.Logf("data: %v", data) + return data, nil + }, + } + + ch := make(chan time.Time, 100) + + // consumer + go func() { + for d := range ch { + _ = d + } + }() + errC := q.Query(context.Background(), ch, startTime, endTime) + <-errC +} diff --git a/pkg/exchange/batch/trade.go b/pkg/exchange/batch/trade.go index 3757424e7..bfec9c1a4 100644 --- a/pkg/exchange/batch/trade.go +++ b/pkg/exchange/batch/trade.go @@ -2,113 +2,45 @@ package batch import ( "context" - "errors" "time" - "github.com/sirupsen/logrus" "golang.org/x/time/rate" "github.com/c9s/bbgo/pkg/types" ) +var closedErrChan = make(chan error) + +func init() { + close(closedErrChan) +} + type TradeBatchQuery struct { - types.Exchange + types.ExchangeTradeHistoryService } func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) { - c = make(chan types.Trade, 500) - errC = make(chan error, 1) - - tradeHistoryService, ok := e.Exchange.(types.ExchangeTradeHistoryService) - if !ok { - close(errC) - close(c) - // skip exchanges that does not support trading history services - logrus.Warnf("exchange %s does not implement ExchangeTradeHistoryService, skip syncing closed orders (TradeBatchQuery.Query)", e.Exchange.Name()) - return c, errC + startTime := *options.StartTime + endTime := *options.EndTime + query := &AsyncTimeRangedBatchQuery{ + Type: types.Trade{}, + Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2), + Q: func(startTime, endTime time.Time) (interface{}, error) { + return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, options) + }, + T: func(obj interface{}) time.Time { + return time.Time(obj.(types.Trade).Time) + }, + ID: func(obj interface{}) string { + trade := obj.(types.Trade) + if trade.ID > options.LastTradeID { + options.LastTradeID = trade.ID + } + return trade.Key().String() + }, } - if options.StartTime == nil { - - errC <- errors.New("start time is required for syncing trades") - close(errC) - close(c) - return c, errC - } - - var lastTradeID = options.LastTradeID - var startTime = *options.StartTime - var endTime = *options.EndTime - - go func() { - limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety) - - defer close(c) - defer close(errC) - - var tradeKeys = map[types.TradeKey]struct{}{} - - for startTime.Before(endTime) { - if err := limiter.Wait(ctx); err != nil { - logrus.WithError(err).Error("rate limit error") - } - - logrus.Infof("querying %s trades from id=%d limit=%d between %s <=> %s", symbol, lastTradeID, options.Limit, startTime, endTime) - - var err error - var trades []types.Trade - - trades, err = tradeHistoryService.QueryTrades(ctx, symbol, &types.TradeQueryOptions{ - StartTime: options.StartTime, - LastTradeID: lastTradeID, - }) - - // sort trades by time in ascending order - types.SortTradesAscending(trades) - - if err != nil { - errC <- err - return - } - - // if all trades are duplicated or empty, we end the batch query - if len(trades) == 0 { - return - } - - if len(trades) > 0 { - allExists := true - for _, td := range trades { - k := td.Key() - if _, exists := tradeKeys[k]; !exists { - allExists = false - break - } - } - if allExists { - return - } - } - - for _, td := range trades { - key := td.Key() - - logrus.Debugf("checking trade key: %v trade: %+v", key, td) - - if _, ok := tradeKeys[key]; ok { - logrus.Debugf("ignore duplicated trade: %+v", key) - continue - } - - lastTradeID = td.ID - startTime = time.Time(td.Time) - tradeKeys[key] = struct{}{} - - // ignore the first trade if last TradeID is given - c <- td - } - } - }() - + c = make(chan types.Trade, 100) + errC = query.Query(ctx, c, startTime, endTime) return c, errC } diff --git a/pkg/exchange/binance/stream_test.go b/pkg/exchange/binance/stream_test.go deleted file mode 100644 index eedef1a9c..000000000 --- a/pkg/exchange/binance/stream_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package binance - -import ( - "context" - batch2 "github.com/c9s/bbgo/pkg/exchange/batch" - "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" - "os" - "testing" - "time" -) - -func Test_Batch(t *testing.T) { - key := os.Getenv("BINANCE_API_KEY") - secret := os.Getenv("BINANCE_API_SECRET") - if len(key) == 0 && len(secret) == 0 { - t.Skip("api key/secret are not configured") - } - - e := New(key, secret) - //stream := NewStream(key, secret, subAccount, e) - - batch := &batch2.KLineBatchQuery{Exchange: e} - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // should use channel here - - starttime, _ := time.Parse("2006-1-2 15:04", "2021-08-01 00:00") - endtime, _ := time.Parse("2006-1-2 15:04", "2021-12-14 00:19") - klineC, _ := batch.Query(ctx, "XRPUSDT", types.Interval1m, starttime, endtime) - - var lastmintime time.Time - var lastmaxtime time.Time - for klines := range klineC { - assert.NotEmpty(t, klines) - - var nowMinTime = klines[0].StartTime - var nowMaxTime = klines[0].StartTime - for _, item := range klines { - if nowMaxTime.Unix() < item.StartTime.Unix() { - nowMaxTime = item.StartTime - } - if nowMinTime.Unix() > item.StartTime.Unix() { - nowMinTime = item.StartTime - } - } - assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix()) - assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix()) - assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix()) - - lastmintime = nowMinTime.Time() - lastmaxtime = nowMaxTime.Time() - assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix()) - - } - -} diff --git a/pkg/exchange/ftx/stream_test.go b/pkg/exchange/ftx/stream_test.go deleted file mode 100644 index 843cb4ce2..000000000 --- a/pkg/exchange/ftx/stream_test.go +++ /dev/null @@ -1,89 +0,0 @@ -package ftx - -import ( - "context" - batch2 "github.com/c9s/bbgo/pkg/exchange/batch" - "github.com/c9s/bbgo/pkg/types" - "github.com/stretchr/testify/assert" - "os" - "testing" - "time" -) - -func TestLastKline(t *testing.T) { - key := os.Getenv("FTX_API_KEY") - secret := os.Getenv("FTX_API_SECRET") - subAccount := os.Getenv("FTX_SUBACCOUNT") - if len(key) == 0 && len(secret) == 0 { - t.Skip("api key/secret are not configured") - } - - e := NewExchange(key, secret, subAccount) - //stream := NewStream(key, secret, subAccount, e) - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - klines := getLastClosedKLine(e, ctx, "XRPUSD", types.Interval1m) - assert.Equal(t, 1, len(klines)) - -} - -func Test_Batch(t *testing.T) { - key := os.Getenv("FTX_API_KEY") - secret := os.Getenv("FTX_API_SECRET") - subAccount := os.Getenv("FTX_SUBACCOUNT") - if len(key) == 0 && len(secret) == 0 { - t.Skip("api key/secret are not configured") - } - - e := NewExchange(key, secret, subAccount) - //stream := NewStream(key, secret, subAccount, e) - - batch := &batch2.KLineBatchQuery{Exchange: e} - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - // should use channel here - - starttime, err := time.Parse("2006-1-2 15:04", "2021-08-01 00:00") - assert.NoError(t, err) - endtime, err := time.Parse("2006-1-2 15:04", "2021-08-04 00:19") - assert.NoError(t, err) - - klineC, errC := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime) - - if err := <-errC; err != nil { - assert.NoError(t, err) - } - - var lastmintime time.Time - var lastmaxtime time.Time - - for klines := range klineC { - assert.NotEmpty(t, klines) - - var nowMinTime = klines[0].StartTime - var nowMaxTime = klines[0].StartTime - for _, item := range klines { - - if nowMaxTime.Unix() < item.StartTime.Unix() { - nowMaxTime = item.StartTime - } - if nowMinTime.Unix() > item.StartTime.Unix() { - nowMinTime = item.StartTime - } - - } - - if !lastmintime.IsZero() { - assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix()) - assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix()) - assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix()) - } - lastmintime = nowMinTime.Time() - lastmaxtime = nowMaxTime.Time() - assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix()) - - } - -} diff --git a/pkg/exchange/max/exchange.go b/pkg/exchange/max/exchange.go index a476db984..a8230284e 100644 --- a/pkg/exchange/max/exchange.go +++ b/pkg/exchange/max/exchange.go @@ -245,7 +245,7 @@ func (e *Exchange) queryClosedOrdersByLastOrderID(ctx context.Context, symbol st orders = append(orders, *order) } - orders = types.SortOrderAscending(orders) + orders = types.SortOrdersAscending(orders) return orders, nil } diff --git a/pkg/service/backtest.go b/pkg/service/backtest.go index 7db542f14..af8248cb8 100644 --- a/pkg/service/backtest.go +++ b/pkg/service/backtest.go @@ -12,7 +12,7 @@ import ( "github.com/pkg/errors" log "github.com/sirupsen/logrus" - batch2 "github.com/c9s/bbgo/pkg/exchange/batch" + "github.com/c9s/bbgo/pkg/exchange/batch" "github.com/c9s/bbgo/pkg/types" ) @@ -23,26 +23,16 @@ type BacktestService struct { func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error { log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name()) - batch := &batch2.KLineBatchQuery{Exchange: exchange} + q := &batch.KLineBatchQuery{Exchange: exchange} - // should use channel here - klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime) - - // var previousKLine types.KLine - count := 0 - for klines := range klineC { - if err := s.BatchInsert(klines); err != nil { + klineC, errC := q.Query(ctx, symbol, interval, startTime, endTime) + for kline := range klineC { + if err := s.Insert(kline); err != nil { return err } - count += len(klines) - } - log.Debugf("inserted klines %s %s data: %d", symbol, interval.String(), count) - - if err := <-errC; err != nil { - return err } - return nil + return <-errC } func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error { @@ -306,27 +296,6 @@ func (s *BacktestService) Insert(kline types.KLine) error { return err } -// BatchInsert Note: all kline should be same exchange, or it will cause issue. -func (s *BacktestService) BatchInsert(kline []types.KLine) error { - if len(kline) == 0 { - return nil - } - if len(kline[0].Exchange) == 0 { - return errors.New("kline.Exchange field should not be empty") - } - - tableName := s._targetKlineTable(kline[0].Exchange) - - sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+ - " VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName) - - tx := s.DB.MustBegin() - if _, err := tx.NamedExec(sql, kline); err != nil { - return err - } - return tx.Commit() -} - func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error { if len(k.Exchange) == 0 { diff --git a/pkg/service/order.go b/pkg/service/order.go index 7d6835022..471af49d0 100644 --- a/pkg/service/order.go +++ b/pkg/service/order.go @@ -58,7 +58,14 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol startTime = records[0].CreationTime.Time() } - b := &batch.ClosedOrderBatchQuery{Exchange: exchange} + exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService) + if !ok { + return nil + } + + b := &batch.ClosedOrderBatchQuery{ + ExchangeTradeHistoryService: exchangeTradeHistoryService, + } ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID) for order := range ordersC { select { diff --git a/pkg/service/trade.go b/pkg/service/trade.go index 60640fe3c..d6cc51fcb 100644 --- a/pkg/service/trade.go +++ b/pkg/service/trade.go @@ -90,13 +90,21 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol tradeKeys[record.Key()] = struct{}{} } - end := len(records) - 1 + end := len(records) - 1 last := records[end] lastTradeID = last.ID startTime = last.Time.Time() } - b := &batch.TradeBatchQuery{Exchange: exchange} + exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService) + if !ok { + return nil + } + + b := &batch.TradeBatchQuery{ + ExchangeTradeHistoryService: exchangeTradeHistoryService, + } + tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{ LastTradeID: lastTradeID, StartTime: &startTime, diff --git a/pkg/types/sort.go b/pkg/types/sort.go index d72b76d0e..6893d52e3 100644 --- a/pkg/types/sort.go +++ b/pkg/types/sort.go @@ -12,9 +12,17 @@ func SortTradesAscending(trades []Trade) []Trade { return trades } -func SortOrderAscending(orders []Order) []Order { +func SortOrdersAscending(orders []Order) []Order { sort.Slice(orders, func(i, j int) bool { return orders[i].CreationTime.Time().Before(orders[j].CreationTime.Time()) }) return orders } + +func SortKLinesAscending(klines []KLine) []KLine { + sort.Slice(klines, func(i, j int) bool { + return klines[i].StartTime.Unix() < klines[j].StartTime.Unix() + }) + + return klines +} diff --git a/pkg/types/trade.go b/pkg/types/trade.go index fc9ce7486..6cf04b728 100644 --- a/pkg/types/trade.go +++ b/pkg/types/trade.go @@ -229,3 +229,7 @@ type TradeKey struct { ID uint64 Side SideType } + +func (k TradeKey) String() string { + return k.Exchange.String() + strconv.FormatUint(k.ID, 10) + k.Side.String() +}