mirror of
https://github.com/c9s/bbgo.git
synced 2024-11-22 06:53:52 +00:00
batch: refactor batch query
This commit is contained in:
parent
7add014a2b
commit
e66eb08db4
|
@ -30,3 +30,7 @@ sync:
|
||||||
- BTCUSDT
|
- BTCUSDT
|
||||||
- ETHUSDT
|
- ETHUSDT
|
||||||
- LINKUSDT
|
- LINKUSDT
|
||||||
|
|
||||||
|
depositHistory: true
|
||||||
|
rewardHistory: true
|
||||||
|
withdrawHistory: true
|
||||||
|
|
|
@ -1,169 +0,0 @@
|
||||||
package batch
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"sort"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/pkg/errors"
|
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"golang.org/x/time/rate"
|
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
var log = logrus.WithField("component", "batch")
|
|
||||||
|
|
||||||
type KLineBatchQuery struct {
|
|
||||||
types.Exchange
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan []types.KLine, errC chan error) {
|
|
||||||
c = make(chan []types.KLine, 1000)
|
|
||||||
errC = make(chan error, 1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(c)
|
|
||||||
defer close(errC)
|
|
||||||
|
|
||||||
var tryQueryKlineTimes = 0
|
|
||||||
for startTime.Before(endTime) {
|
|
||||||
log.Debugf("batch query klines %s %s %s <=> %s", symbol, interval, startTime, endTime)
|
|
||||||
|
|
||||||
kLines, err := e.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
|
||||||
StartTime: &startTime,
|
|
||||||
EndTime: &endTime,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// ensure the kline is in the right order
|
|
||||||
sort.Slice(kLines, func(i, j int) bool {
|
|
||||||
return kLines[i].StartTime.Unix() < kLines[j].StartTime.Unix()
|
|
||||||
})
|
|
||||||
|
|
||||||
if len(kLines) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
tryQueryKlineTimes++
|
|
||||||
const BatchSize = 200
|
|
||||||
|
|
||||||
var batchKLines = make([]types.KLine, 0, BatchSize)
|
|
||||||
for _, kline := range kLines {
|
|
||||||
// ignore any kline before the given start time of the batch query
|
|
||||||
if kline.StartTime.Before(startTime) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// if there is a kline after the endTime of the batch query, it means the data is out of scope, we should exit
|
|
||||||
if kline.StartTime.After(endTime) || kline.EndTime.After(endTime) {
|
|
||||||
if len(batchKLines) != 0 {
|
|
||||||
c <- batchKLines
|
|
||||||
batchKLines = nil
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
batchKLines = append(batchKLines, kline)
|
|
||||||
|
|
||||||
if len(batchKLines) == BatchSize {
|
|
||||||
c <- batchKLines
|
|
||||||
batchKLines = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// The issue is in FTX, prev endtime = next start time , so if add 1 ms , it would query forever.
|
|
||||||
// (above comment was written by @tony1223)
|
|
||||||
startTime = kline.EndTime.Time()
|
|
||||||
tryQueryKlineTimes = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// push the rest klines in the buffer
|
|
||||||
if len(batchKLines) > 0 {
|
|
||||||
c <- batchKLines
|
|
||||||
batchKLines = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if tryQueryKlineTimes > 10 { // it means loop 10 times
|
|
||||||
errC <- errors.Errorf("there's a dead loop in batch.go#Query , symbol: %s , interval: %s, startTime:%s ", symbol, interval, startTime.String())
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, errC
|
|
||||||
}
|
|
||||||
|
|
||||||
type RewardBatchQuery struct {
|
|
||||||
Service types.ExchangeRewardService
|
|
||||||
}
|
|
||||||
|
|
||||||
func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) {
|
|
||||||
c = make(chan types.Reward, 500)
|
|
||||||
errC = make(chan error, 1)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
|
|
||||||
|
|
||||||
defer close(c)
|
|
||||||
defer close(errC)
|
|
||||||
|
|
||||||
lastID := ""
|
|
||||||
rewardKeys := make(map[string]struct{}, 500)
|
|
||||||
|
|
||||||
for startTime.Before(endTime) {
|
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
|
||||||
log.WithError(err).Error("rate limit error")
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Infof("batch querying rewards %s <=> %s", startTime, endTime)
|
|
||||||
|
|
||||||
rewards, err := q.Service.QueryRewards(ctx, startTime)
|
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// empty data
|
|
||||||
if len(rewards) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// there is no new data
|
|
||||||
if len(rewards) == 1 && rewards[0].UUID == lastID {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
newCnt := 0
|
|
||||||
for _, o := range rewards {
|
|
||||||
if _, ok := rewardKeys[o.UUID]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if o.CreatedAt.Time().After(endTime) {
|
|
||||||
// stop batch query
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
newCnt++
|
|
||||||
c <- o
|
|
||||||
rewardKeys[o.UUID] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
if newCnt == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
end := len(rewards) - 1
|
|
||||||
startTime = rewards[end].CreatedAt.Time()
|
|
||||||
lastID = rewards[end].UUID
|
|
||||||
}
|
|
||||||
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, errC
|
|
||||||
}
|
|
2
pkg/exchange/batch/batch_test.go
Normal file
2
pkg/exchange/batch/batch_test.go
Normal file
|
@ -0,0 +1,2 @@
|
||||||
|
package batch
|
||||||
|
|
|
@ -2,93 +2,39 @@ package batch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"sort"
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ClosedOrderBatchQuery struct {
|
type ClosedOrderBatchQuery struct {
|
||||||
types.Exchange
|
types.ExchangeTradeHistoryService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) {
|
func (q *ClosedOrderBatchQuery) Query(ctx context.Context, symbol string, startTime, endTime time.Time, lastOrderID uint64) (c chan types.Order, errC chan error) {
|
||||||
c = make(chan types.Order, 500)
|
query := &AsyncTimeRangedBatchQuery{
|
||||||
errC = make(chan error, 1)
|
Type: types.Order{},
|
||||||
|
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
|
||||||
tradeHistoryService, ok := e.Exchange.(types.ExchangeTradeHistoryService)
|
Q: func(startTime, endTime time.Time) (interface{}, error) {
|
||||||
if !ok {
|
orders, err := q.ExchangeTradeHistoryService.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
|
||||||
defer close(c)
|
return orders, err
|
||||||
defer close(errC)
|
},
|
||||||
// skip exchanges that does not support trading history services
|
T: func(obj interface{}) time.Time {
|
||||||
logrus.Warnf("exchange %s does not implement ExchangeTradeHistoryService, skip syncing closed orders (ClosedOrderBatchQuery.Query) ", e.Exchange.Name())
|
return time.Time(obj.(types.Order).CreationTime)
|
||||||
return c, errC
|
},
|
||||||
|
ID: func(obj interface{}) string {
|
||||||
|
order := obj.(types.Order)
|
||||||
|
if order.OrderID > lastOrderID {
|
||||||
|
lastOrderID = order.OrderID
|
||||||
|
}
|
||||||
|
return strconv.FormatUint(order.OrderID, 10)
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
go func() {
|
c = make(chan types.Order, 100)
|
||||||
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
|
errC = query.Query(ctx, c, startTime, endTime)
|
||||||
|
|
||||||
defer close(c)
|
|
||||||
defer close(errC)
|
|
||||||
|
|
||||||
orderIDs := make(map[uint64]struct{}, 500)
|
|
||||||
if lastOrderID > 0 {
|
|
||||||
orderIDs[lastOrderID] = struct{}{}
|
|
||||||
}
|
|
||||||
|
|
||||||
for startTime.Before(endTime) {
|
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
|
||||||
logrus.WithError(err).Error("rate limit error")
|
|
||||||
}
|
|
||||||
|
|
||||||
logrus.Infof("batch querying %s closed orders %s <=> %s", symbol, startTime, endTime)
|
|
||||||
|
|
||||||
orders, err := tradeHistoryService.QueryClosedOrders(ctx, symbol, startTime, endTime, lastOrderID)
|
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
for _, o := range orders {
|
|
||||||
logrus.Infof("%+v", o)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(orders) == 0 {
|
|
||||||
return
|
|
||||||
} else if len(orders) > 0 {
|
|
||||||
allExists := true
|
|
||||||
for _, o := range orders {
|
|
||||||
if _, exists := orderIDs[o.OrderID]; !exists {
|
|
||||||
allExists = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if allExists {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// sort orders by time in ascending order
|
|
||||||
sort.Slice(orders, func(i, j int) bool {
|
|
||||||
return orders[i].CreationTime.Before(time.Time(orders[j].CreationTime))
|
|
||||||
})
|
|
||||||
|
|
||||||
for _, o := range orders {
|
|
||||||
if _, ok := orderIDs[o.OrderID]; ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
c <- o
|
|
||||||
startTime = o.CreationTime.Time()
|
|
||||||
lastOrderID = o.OrderID
|
|
||||||
orderIDs[o.OrderID] = struct{}{}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, errC
|
return c, errC
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
38
pkg/exchange/batch/kline.go
Normal file
38
pkg/exchange/batch/kline.go
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type KLineBatchQuery struct {
|
||||||
|
types.Exchange
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *KLineBatchQuery) Query(ctx context.Context, symbol string, interval types.Interval, startTime, endTime time.Time) (c chan types.KLine, errC chan error) {
|
||||||
|
query := &AsyncTimeRangedBatchQuery{
|
||||||
|
Type: types.KLine{},
|
||||||
|
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
|
||||||
|
Q: func(startTime, endTime time.Time) (interface{}, error) {
|
||||||
|
return e.Exchange.QueryKLines(ctx, symbol, interval, types.KLineQueryOptions{
|
||||||
|
StartTime: &startTime,
|
||||||
|
EndTime: &endTime,
|
||||||
|
})
|
||||||
|
},
|
||||||
|
T: func(obj interface{}) time.Time {
|
||||||
|
return time.Time(obj.(types.KLine).StartTime)
|
||||||
|
},
|
||||||
|
ID: func(obj interface{}) string {
|
||||||
|
kline := obj.(types.KLine)
|
||||||
|
return kline.StartTime.String()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
c = make(chan types.KLine, 100)
|
||||||
|
errC = query.Query(ctx, c, startTime, endTime)
|
||||||
|
return c, errC
|
||||||
|
}
|
34
pkg/exchange/batch/reward.go
Normal file
34
pkg/exchange/batch/reward.go
Normal file
|
@ -0,0 +1,34 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RewardBatchQuery struct {
|
||||||
|
Service types.ExchangeRewardService
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *RewardBatchQuery) Query(ctx context.Context, startTime, endTime time.Time) (c chan types.Reward, errC chan error) {
|
||||||
|
query := &AsyncTimeRangedBatchQuery{
|
||||||
|
Type: types.Reward{},
|
||||||
|
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
|
||||||
|
Q: func(startTime, endTime time.Time) (interface{}, error) {
|
||||||
|
return q.Service.QueryRewards(ctx, startTime)
|
||||||
|
},
|
||||||
|
T: func(obj interface{}) time.Time {
|
||||||
|
return time.Time(obj.(types.Reward).CreatedAt)
|
||||||
|
},
|
||||||
|
ID: func(obj interface{}) string {
|
||||||
|
return obj.(types.Reward).UUID
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
c = make(chan types.Reward, 500)
|
||||||
|
errC = query.Query(ctx, c, startTime, endTime)
|
||||||
|
return c, errC
|
||||||
|
}
|
115
pkg/exchange/batch/time_range_query.go
Normal file
115
pkg/exchange/batch/time_range_query.go
Normal file
|
@ -0,0 +1,115 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"reflect"
|
||||||
|
"sort"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
)
|
||||||
|
|
||||||
|
var log = logrus.WithField("component", "batch")
|
||||||
|
|
||||||
|
type AsyncTimeRangedBatchQuery struct {
|
||||||
|
// Type is the object type of the result
|
||||||
|
Type interface{}
|
||||||
|
|
||||||
|
// Limiter is the rate limiter for each query
|
||||||
|
Limiter *rate.Limiter
|
||||||
|
|
||||||
|
// Q is the remote query function
|
||||||
|
Q func(startTime, endTime time.Time) (interface{}, error)
|
||||||
|
|
||||||
|
// T function returns time of an object
|
||||||
|
T func(obj interface{}) time.Time
|
||||||
|
|
||||||
|
// ID returns the ID of the object
|
||||||
|
ID func(obj interface{}) string
|
||||||
|
|
||||||
|
// JumpIfEmpty jump the startTime + duration when the result is empty
|
||||||
|
JumpIfEmpty time.Duration
|
||||||
|
}
|
||||||
|
|
||||||
|
func (q *AsyncTimeRangedBatchQuery) Query(ctx context.Context, ch interface{}, startTime, endTime time.Time) chan error {
|
||||||
|
errC := make(chan error, 1)
|
||||||
|
cRef := reflect.ValueOf(ch)
|
||||||
|
// cRef := reflect.MakeChan(reflect.TypeOf(q.Type), 100)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
defer cRef.Close()
|
||||||
|
defer close(errC)
|
||||||
|
|
||||||
|
idMap := make(map[string]struct{}, 100)
|
||||||
|
for startTime.Before(endTime) {
|
||||||
|
if q.Limiter != nil {
|
||||||
|
if err := q.Limiter.Wait(ctx); err != nil {
|
||||||
|
errC <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("batch querying %T: %v <=> %v", q.Type, startTime, endTime)
|
||||||
|
|
||||||
|
sliceInf, err := q.Q(startTime, endTime)
|
||||||
|
if err != nil {
|
||||||
|
errC <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
listRef := reflect.ValueOf(sliceInf)
|
||||||
|
listLen := listRef.Len()
|
||||||
|
|
||||||
|
if listLen == 0 {
|
||||||
|
if q.JumpIfEmpty > 0 {
|
||||||
|
startTime = startTime.Add(q.JumpIfEmpty)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// sort by time
|
||||||
|
sort.Slice(listRef.Interface(), func(i, j int) bool {
|
||||||
|
a := listRef.Index(i)
|
||||||
|
b := listRef.Index(j)
|
||||||
|
tA := q.T(a.Interface())
|
||||||
|
tB := q.T(b.Interface())
|
||||||
|
return tA.Before(tB)
|
||||||
|
})
|
||||||
|
|
||||||
|
sentAny := false
|
||||||
|
for i := 0; i < listLen; i++ {
|
||||||
|
item := listRef.Index(i)
|
||||||
|
entryTime := q.T(item.Interface())
|
||||||
|
|
||||||
|
if entryTime.Before(startTime) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if entryTime.After(endTime) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
obj := item.Interface()
|
||||||
|
id := q.ID(obj)
|
||||||
|
if _, exists := idMap[id]; exists {
|
||||||
|
log.Debugf("batch querying %T: duplicated id %s", q.Type, id)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
idMap[id] = struct{}{}
|
||||||
|
|
||||||
|
cRef.Send(item)
|
||||||
|
sentAny = true
|
||||||
|
startTime = entryTime
|
||||||
|
}
|
||||||
|
|
||||||
|
if !sentAny {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
return errC
|
||||||
|
}
|
45
pkg/exchange/batch/time_range_query_test.go
Normal file
45
pkg/exchange/batch/time_range_query_test.go
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
package batch
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strconv"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_TimeRangedQuery(t *testing.T) {
|
||||||
|
startTime := time.Date(2021, time.January, 1, 0, 0, 0, 0, time.UTC)
|
||||||
|
endTime := time.Date(2021, time.January, 2, 0, 0, 0, 0, time.UTC)
|
||||||
|
q := &AsyncTimeRangedBatchQuery{
|
||||||
|
Type: time.Time{},
|
||||||
|
T: func(obj interface{}) time.Time {
|
||||||
|
return obj.(time.Time)
|
||||||
|
},
|
||||||
|
ID: func(obj interface{}) string {
|
||||||
|
return strconv.FormatInt(obj.(time.Time).UnixMilli(), 10)
|
||||||
|
},
|
||||||
|
Q: func(startTime, endTime time.Time) (interface{}, error) {
|
||||||
|
var cnt = 0
|
||||||
|
var data []time.Time
|
||||||
|
for startTime.Before(endTime) && cnt < 5 {
|
||||||
|
d := startTime
|
||||||
|
data = append(data, d)
|
||||||
|
cnt++
|
||||||
|
startTime = startTime.Add(time.Minute)
|
||||||
|
}
|
||||||
|
t.Logf("data: %v", data)
|
||||||
|
return data, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan time.Time, 100)
|
||||||
|
|
||||||
|
// consumer
|
||||||
|
go func() {
|
||||||
|
for d := range ch {
|
||||||
|
_ = d
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
errC := q.Query(context.Background(), ch, startTime, endTime)
|
||||||
|
<-errC
|
||||||
|
}
|
|
@ -2,113 +2,45 @@ package batch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/sirupsen/logrus"
|
|
||||||
"golang.org/x/time/rate"
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var closedErrChan = make(chan error)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
close(closedErrChan)
|
||||||
|
}
|
||||||
|
|
||||||
type TradeBatchQuery struct {
|
type TradeBatchQuery struct {
|
||||||
types.Exchange
|
types.ExchangeTradeHistoryService
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) {
|
func (e TradeBatchQuery) Query(ctx context.Context, symbol string, options *types.TradeQueryOptions) (c chan types.Trade, errC chan error) {
|
||||||
c = make(chan types.Trade, 500)
|
startTime := *options.StartTime
|
||||||
errC = make(chan error, 1)
|
endTime := *options.EndTime
|
||||||
|
query := &AsyncTimeRangedBatchQuery{
|
||||||
tradeHistoryService, ok := e.Exchange.(types.ExchangeTradeHistoryService)
|
Type: types.Trade{},
|
||||||
if !ok {
|
Limiter: rate.NewLimiter(rate.Every(5*time.Second), 2),
|
||||||
close(errC)
|
Q: func(startTime, endTime time.Time) (interface{}, error) {
|
||||||
close(c)
|
return e.ExchangeTradeHistoryService.QueryTrades(ctx, symbol, options)
|
||||||
// skip exchanges that does not support trading history services
|
},
|
||||||
logrus.Warnf("exchange %s does not implement ExchangeTradeHistoryService, skip syncing closed orders (TradeBatchQuery.Query)", e.Exchange.Name())
|
T: func(obj interface{}) time.Time {
|
||||||
return c, errC
|
return time.Time(obj.(types.Trade).Time)
|
||||||
|
},
|
||||||
|
ID: func(obj interface{}) string {
|
||||||
|
trade := obj.(types.Trade)
|
||||||
|
if trade.ID > options.LastTradeID {
|
||||||
|
options.LastTradeID = trade.ID
|
||||||
|
}
|
||||||
|
return trade.Key().String()
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
if options.StartTime == nil {
|
c = make(chan types.Trade, 100)
|
||||||
|
errC = query.Query(ctx, c, startTime, endTime)
|
||||||
errC <- errors.New("start time is required for syncing trades")
|
|
||||||
close(errC)
|
|
||||||
close(c)
|
|
||||||
return c, errC
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastTradeID = options.LastTradeID
|
|
||||||
var startTime = *options.StartTime
|
|
||||||
var endTime = *options.EndTime
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
limiter := rate.NewLimiter(rate.Every(5*time.Second), 2) // from binance (original 1200, use 1000 for safety)
|
|
||||||
|
|
||||||
defer close(c)
|
|
||||||
defer close(errC)
|
|
||||||
|
|
||||||
var tradeKeys = map[types.TradeKey]struct{}{}
|
|
||||||
|
|
||||||
for startTime.Before(endTime) {
|
|
||||||
if err := limiter.Wait(ctx); err != nil {
|
|
||||||
logrus.WithError(err).Error("rate limit error")
|
|
||||||
}
|
|
||||||
|
|
||||||
logrus.Infof("querying %s trades from id=%d limit=%d between %s <=> %s", symbol, lastTradeID, options.Limit, startTime, endTime)
|
|
||||||
|
|
||||||
var err error
|
|
||||||
var trades []types.Trade
|
|
||||||
|
|
||||||
trades, err = tradeHistoryService.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
|
|
||||||
StartTime: options.StartTime,
|
|
||||||
LastTradeID: lastTradeID,
|
|
||||||
})
|
|
||||||
|
|
||||||
// sort trades by time in ascending order
|
|
||||||
types.SortTradesAscending(trades)
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
errC <- err
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// if all trades are duplicated or empty, we end the batch query
|
|
||||||
if len(trades) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(trades) > 0 {
|
|
||||||
allExists := true
|
|
||||||
for _, td := range trades {
|
|
||||||
k := td.Key()
|
|
||||||
if _, exists := tradeKeys[k]; !exists {
|
|
||||||
allExists = false
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if allExists {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, td := range trades {
|
|
||||||
key := td.Key()
|
|
||||||
|
|
||||||
logrus.Debugf("checking trade key: %v trade: %+v", key, td)
|
|
||||||
|
|
||||||
if _, ok := tradeKeys[key]; ok {
|
|
||||||
logrus.Debugf("ignore duplicated trade: %+v", key)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
lastTradeID = td.ID
|
|
||||||
startTime = time.Time(td.Time)
|
|
||||||
tradeKeys[key] = struct{}{}
|
|
||||||
|
|
||||||
// ignore the first trade if last TradeID is given
|
|
||||||
c <- td
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
return c, errC
|
return c, errC
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,58 +0,0 @@
|
||||||
package binance
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func Test_Batch(t *testing.T) {
|
|
||||||
key := os.Getenv("BINANCE_API_KEY")
|
|
||||||
secret := os.Getenv("BINANCE_API_SECRET")
|
|
||||||
if len(key) == 0 && len(secret) == 0 {
|
|
||||||
t.Skip("api key/secret are not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
e := New(key, secret)
|
|
||||||
//stream := NewStream(key, secret, subAccount, e)
|
|
||||||
|
|
||||||
batch := &batch2.KLineBatchQuery{Exchange: e}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
// should use channel here
|
|
||||||
|
|
||||||
starttime, _ := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
|
|
||||||
endtime, _ := time.Parse("2006-1-2 15:04", "2021-12-14 00:19")
|
|
||||||
klineC, _ := batch.Query(ctx, "XRPUSDT", types.Interval1m, starttime, endtime)
|
|
||||||
|
|
||||||
var lastmintime time.Time
|
|
||||||
var lastmaxtime time.Time
|
|
||||||
for klines := range klineC {
|
|
||||||
assert.NotEmpty(t, klines)
|
|
||||||
|
|
||||||
var nowMinTime = klines[0].StartTime
|
|
||||||
var nowMaxTime = klines[0].StartTime
|
|
||||||
for _, item := range klines {
|
|
||||||
if nowMaxTime.Unix() < item.StartTime.Unix() {
|
|
||||||
nowMaxTime = item.StartTime
|
|
||||||
}
|
|
||||||
if nowMinTime.Unix() > item.StartTime.Unix() {
|
|
||||||
nowMinTime = item.StartTime
|
|
||||||
}
|
|
||||||
}
|
|
||||||
assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix())
|
|
||||||
assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix())
|
|
||||||
assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix())
|
|
||||||
|
|
||||||
lastmintime = nowMinTime.Time()
|
|
||||||
lastmaxtime = nowMaxTime.Time()
|
|
||||||
assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix())
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -1,89 +0,0 @@
|
||||||
package ftx
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
|
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"os"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestLastKline(t *testing.T) {
|
|
||||||
key := os.Getenv("FTX_API_KEY")
|
|
||||||
secret := os.Getenv("FTX_API_SECRET")
|
|
||||||
subAccount := os.Getenv("FTX_SUBACCOUNT")
|
|
||||||
if len(key) == 0 && len(secret) == 0 {
|
|
||||||
t.Skip("api key/secret are not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
e := NewExchange(key, secret, subAccount)
|
|
||||||
//stream := NewStream(key, secret, subAccount, e)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
klines := getLastClosedKLine(e, ctx, "XRPUSD", types.Interval1m)
|
|
||||||
assert.Equal(t, 1, len(klines))
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func Test_Batch(t *testing.T) {
|
|
||||||
key := os.Getenv("FTX_API_KEY")
|
|
||||||
secret := os.Getenv("FTX_API_SECRET")
|
|
||||||
subAccount := os.Getenv("FTX_SUBACCOUNT")
|
|
||||||
if len(key) == 0 && len(secret) == 0 {
|
|
||||||
t.Skip("api key/secret are not configured")
|
|
||||||
}
|
|
||||||
|
|
||||||
e := NewExchange(key, secret, subAccount)
|
|
||||||
//stream := NewStream(key, secret, subAccount, e)
|
|
||||||
|
|
||||||
batch := &batch2.KLineBatchQuery{Exchange: e}
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
// should use channel here
|
|
||||||
|
|
||||||
starttime, err := time.Parse("2006-1-2 15:04", "2021-08-01 00:00")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
endtime, err := time.Parse("2006-1-2 15:04", "2021-08-04 00:19")
|
|
||||||
assert.NoError(t, err)
|
|
||||||
|
|
||||||
klineC, errC := batch.Query(ctx, "XRPUSDT", types.Interval1d, starttime, endtime)
|
|
||||||
|
|
||||||
if err := <-errC; err != nil {
|
|
||||||
assert.NoError(t, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var lastmintime time.Time
|
|
||||||
var lastmaxtime time.Time
|
|
||||||
|
|
||||||
for klines := range klineC {
|
|
||||||
assert.NotEmpty(t, klines)
|
|
||||||
|
|
||||||
var nowMinTime = klines[0].StartTime
|
|
||||||
var nowMaxTime = klines[0].StartTime
|
|
||||||
for _, item := range klines {
|
|
||||||
|
|
||||||
if nowMaxTime.Unix() < item.StartTime.Unix() {
|
|
||||||
nowMaxTime = item.StartTime
|
|
||||||
}
|
|
||||||
if nowMinTime.Unix() > item.StartTime.Unix() {
|
|
||||||
nowMinTime = item.StartTime
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
if !lastmintime.IsZero() {
|
|
||||||
assert.True(t, nowMinTime.Unix() <= nowMaxTime.Unix())
|
|
||||||
assert.True(t, nowMinTime.Unix() > lastmaxtime.Unix())
|
|
||||||
assert.True(t, nowMaxTime.Unix() > lastmaxtime.Unix())
|
|
||||||
}
|
|
||||||
lastmintime = nowMinTime.Time()
|
|
||||||
lastmaxtime = nowMaxTime.Time()
|
|
||||||
assert.True(t, lastmintime.Unix() <= lastmaxtime.Unix())
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -245,7 +245,7 @@ func (e *Exchange) queryClosedOrdersByLastOrderID(ctx context.Context, symbol st
|
||||||
orders = append(orders, *order)
|
orders = append(orders, *order)
|
||||||
}
|
}
|
||||||
|
|
||||||
orders = types.SortOrderAscending(orders)
|
orders = types.SortOrdersAscending(orders)
|
||||||
return orders, nil
|
return orders, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
batch2 "github.com/c9s/bbgo/pkg/exchange/batch"
|
"github.com/c9s/bbgo/pkg/exchange/batch"
|
||||||
"github.com/c9s/bbgo/pkg/types"
|
"github.com/c9s/bbgo/pkg/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -23,26 +23,16 @@ type BacktestService struct {
|
||||||
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
|
||||||
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
|
log.Infof("synchronizing lastKLine for interval %s from exchange %s", interval, exchange.Name())
|
||||||
|
|
||||||
batch := &batch2.KLineBatchQuery{Exchange: exchange}
|
q := &batch.KLineBatchQuery{Exchange: exchange}
|
||||||
|
|
||||||
// should use channel here
|
klineC, errC := q.Query(ctx, symbol, interval, startTime, endTime)
|
||||||
klineC, errC := batch.Query(ctx, symbol, interval, startTime, endTime)
|
for kline := range klineC {
|
||||||
|
if err := s.Insert(kline); err != nil {
|
||||||
// var previousKLine types.KLine
|
|
||||||
count := 0
|
|
||||||
for klines := range klineC {
|
|
||||||
if err := s.BatchInsert(klines); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
count += len(klines)
|
|
||||||
}
|
|
||||||
log.Debugf("inserted klines %s %s data: %d", symbol, interval.String(), count)
|
|
||||||
|
|
||||||
if err := <-errC; err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return <-errC
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
|
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
|
||||||
|
@ -306,27 +296,6 @@ func (s *BacktestService) Insert(kline types.KLine) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// BatchInsert Note: all kline should be same exchange, or it will cause issue.
|
|
||||||
func (s *BacktestService) BatchInsert(kline []types.KLine) error {
|
|
||||||
if len(kline) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
if len(kline[0].Exchange) == 0 {
|
|
||||||
return errors.New("kline.Exchange field should not be empty")
|
|
||||||
}
|
|
||||||
|
|
||||||
tableName := s._targetKlineTable(kline[0].Exchange)
|
|
||||||
|
|
||||||
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
|
|
||||||
" VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume); ", tableName)
|
|
||||||
|
|
||||||
tx := s.DB.MustBegin()
|
|
||||||
if _, err := tx.NamedExec(sql, kline); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
|
func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
|
||||||
|
|
||||||
if len(k.Exchange) == 0 {
|
if len(k.Exchange) == 0 {
|
||||||
|
|
|
@ -58,7 +58,14 @@ func (s *OrderService) Sync(ctx context.Context, exchange types.Exchange, symbol
|
||||||
startTime = records[0].CreationTime.Time()
|
startTime = records[0].CreationTime.Time()
|
||||||
}
|
}
|
||||||
|
|
||||||
b := &batch.ClosedOrderBatchQuery{Exchange: exchange}
|
exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b := &batch.ClosedOrderBatchQuery{
|
||||||
|
ExchangeTradeHistoryService: exchangeTradeHistoryService,
|
||||||
|
}
|
||||||
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
|
ordersC, errC := b.Query(ctx, symbol, startTime, time.Now(), lastID)
|
||||||
for order := range ordersC {
|
for order := range ordersC {
|
||||||
select {
|
select {
|
||||||
|
|
|
@ -90,13 +90,21 @@ func (s *TradeService) Sync(ctx context.Context, exchange types.Exchange, symbol
|
||||||
tradeKeys[record.Key()] = struct{}{}
|
tradeKeys[record.Key()] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
end := len(records) - 1
|
end := len(records) - 1
|
||||||
last := records[end]
|
last := records[end]
|
||||||
lastTradeID = last.ID
|
lastTradeID = last.ID
|
||||||
startTime = last.Time.Time()
|
startTime = last.Time.Time()
|
||||||
}
|
}
|
||||||
|
|
||||||
b := &batch.TradeBatchQuery{Exchange: exchange}
|
exchangeTradeHistoryService, ok := exchange.(types.ExchangeTradeHistoryService)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
b := &batch.TradeBatchQuery{
|
||||||
|
ExchangeTradeHistoryService: exchangeTradeHistoryService,
|
||||||
|
}
|
||||||
|
|
||||||
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
|
tradeC, errC := b.Query(ctx, symbol, &types.TradeQueryOptions{
|
||||||
LastTradeID: lastTradeID,
|
LastTradeID: lastTradeID,
|
||||||
StartTime: &startTime,
|
StartTime: &startTime,
|
||||||
|
|
|
@ -12,9 +12,17 @@ func SortTradesAscending(trades []Trade) []Trade {
|
||||||
return trades
|
return trades
|
||||||
}
|
}
|
||||||
|
|
||||||
func SortOrderAscending(orders []Order) []Order {
|
func SortOrdersAscending(orders []Order) []Order {
|
||||||
sort.Slice(orders, func(i, j int) bool {
|
sort.Slice(orders, func(i, j int) bool {
|
||||||
return orders[i].CreationTime.Time().Before(orders[j].CreationTime.Time())
|
return orders[i].CreationTime.Time().Before(orders[j].CreationTime.Time())
|
||||||
})
|
})
|
||||||
return orders
|
return orders
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func SortKLinesAscending(klines []KLine) []KLine {
|
||||||
|
sort.Slice(klines, func(i, j int) bool {
|
||||||
|
return klines[i].StartTime.Unix() < klines[j].StartTime.Unix()
|
||||||
|
})
|
||||||
|
|
||||||
|
return klines
|
||||||
|
}
|
||||||
|
|
|
@ -229,3 +229,7 @@ type TradeKey struct {
|
||||||
ID uint64
|
ID uint64
|
||||||
Side SideType
|
Side SideType
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (k TradeKey) String() string {
|
||||||
|
return k.Exchange.String() + strconv.FormatUint(k.ID, 10) + k.Side.String()
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user