This commit is contained in:
c9s 2024-09-19 09:41:08 +08:00 committed by GitHub
commit ac31fd0da2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
42 changed files with 2332 additions and 90 deletions

2
.gitignore vendored
View File

@ -14,6 +14,7 @@
*.out
.idea
.vscode
# Dependency directories (remove the comment below to include it)
# vendor/
@ -48,6 +49,7 @@ testoutput
*.swp
/pkg/backtest/assets.go
/data/backtest
coverage.txt
coverage_dum.txt

View File

@ -1,9 +1,13 @@
## Back-testing
*Before you start back-testing, you need to setup [MySQL](../../README.md#configure-mysql-database) or [SQLite3
Currently bbgo supports two ways to run backtests:
1: Through csv data source (supported right now are binance, bybit and OkEx)
2: Alternatively run backtests through [MySQL](../../README.md#configure-mysql-database) or [SQLite3
](../../README.md#configure-sqlite3-database). Using MySQL is highly recommended.*
First, you need to add the back-testing config to your `bbgo.yaml`:
Let's start by adding the back-testing section to your config eg: `bbgo.yaml`:
```yaml
backtest:
@ -41,8 +45,11 @@ Note on date formats, the following date formats are supported:
* RFC822, which looks like `02 Jan 06 15:04 MST`
* You can also use `2021-11-26T15:04:56`
And then, you can sync remote exchange k-lines (candle bars) data for back-testing:
And then, you can sync remote exchange k-lines (candle bars) data for back-testing through csv data source:
```sh
bbgo backtest -v --csv --verify --config config/grid.yaml
```
or use the sql data source like so:
```sh
bbgo backtest -v --sync --config config/grid.yaml
```
@ -67,6 +74,11 @@ Run back-test:
```sh
bbgo backtest --base-asset-baseline --config config/grid.yaml
```
or through csv data source
```sh
bbgo backtest -v --csv --base-asset-baseline --config config/grid.yaml --output data/backtest
```
If you're developing a strategy, you might want to start with a command like this:

View File

@ -55,7 +55,7 @@ var ErrEmptyOrderType = errors.New("order type can not be empty string")
type Exchange struct {
sourceName types.ExchangeName
publicExchange types.Exchange
srv *service.BacktestService
srv service.BackTestable
currentTime time.Time
account *types.Account
@ -78,7 +78,7 @@ type Exchange struct {
}
func NewExchange(
sourceName types.ExchangeName, sourceExchange types.Exchange, srv *service.BacktestService, config *bbgo.Backtest,
sourceName types.ExchangeName, sourceExchange types.Exchange, srv service.BackTestable, config *bbgo.Backtest,
) (*Exchange, error) {
ex := sourceExchange
@ -366,6 +366,7 @@ func (e *Exchange) SubscribeMarketData(
loadedIntervals[sub.Options.Interval] = struct{}{}
default:
// todo support stream back test with csv tick source
// Since Environment is not yet been injected at this point, no hard error
log.Errorf("stream channel %s is not supported in backtest", sub.Channel)
}
@ -394,6 +395,7 @@ func (e *Exchange) SubscribeMarketData(
log.Infof("querying klines from database with exchange: %v symbols: %v and intervals: %v for back-testing", e.Name(), symbols, intervals)
}
log.Infof("querying klines from database with exchange: %v symbols: %v and intervals: %v for back-testing", e.Name(), symbols, intervals)
if len(symbols) == 0 {
log.Warnf("empty symbols, will not query kline data from the database")

View File

@ -150,7 +150,8 @@ type Backtest struct {
Sessions []string `json:"sessions" yaml:"sessions"`
// sync 1 second interval KLines
SyncSecKLines bool `json:"syncSecKLines,omitempty" yaml:"syncSecKLines,omitempty"`
SyncSecKLines bool `json:"syncSecKLines,omitempty" yaml:"syncSecKLines,omitempty"`
CsvSource *CsvSourceConfig `json:"csvConfig,omitempty" yaml:"csvConfig,omitempty"`
}
func (b *Backtest) GetAccount(n string) BacktestAccount {
@ -706,3 +707,8 @@ func reUnmarshal(conf interface{}, tpe interface{}) (interface{}, error) {
return val.Elem().Interface(), nil
}
type CsvSourceConfig struct {
Market types.MarketType `json:"market"`
Granularity types.MarketDataType `json:"granularity"`
}

View File

@ -42,9 +42,9 @@ var defaultSyncBufferPeriod = 30 * time.Minute
// IsBackTesting is a global variable that indicates the current environment is back-test or not.
var IsBackTesting = false
var BackTestService *service.BacktestService
var BackTestService service.BackTestable
func SetBackTesting(s *service.BacktestService) {
func SetBackTesting(s service.BackTestable) {
BackTestService = s
IsBackTesting = s != nil
}
@ -87,7 +87,7 @@ type Environment struct {
TradeService *service.TradeService
ProfitService *service.ProfitService
PositionService *service.PositionService
BacktestService *service.BacktestService
BacktestService service.BackTestable
RewardService *service.RewardService
MarginService *service.MarginService
SyncService *service.SyncService

View File

@ -12,12 +12,6 @@ import (
"github.com/fatih/color"
"github.com/google/uuid"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/data/tsv"
"github.com/c9s/bbgo/pkg/util"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -26,13 +20,18 @@ import (
"github.com/c9s/bbgo/pkg/accounting/pnl"
"github.com/c9s/bbgo/pkg/backtest"
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/data/tsv"
"github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
)
func init() {
BacktestCmd.Flags().Bool("csv", false, "use csv data source for exchange (if supported)")
BacktestCmd.Flags().Bool("sync", false, "sync backtest data")
BacktestCmd.Flags().Bool("sync-only", false, "sync backtest data only, do not run backtest")
BacktestCmd.Flags().String("sync-from", "", "sync backtest data from the given time, which will override the time range in the backtest config")
@ -77,6 +76,11 @@ var BacktestCmd = &cobra.Command{
return err
}
modeCsv, err := cmd.Flags().GetBool("csv")
if err != nil {
return err
}
wantSync, err := cmd.Flags().GetBool("sync")
if err != nil {
return err
@ -156,15 +160,29 @@ var BacktestCmd = &cobra.Command{
log.Infof("starting backtest with startTime %s", startTime.Format(time.RFC3339))
environ := bbgo.NewEnvironment()
if err := bbgo.BootstrapBacktestEnvironment(ctx, environ); err != nil {
return err
}
if environ.DatabaseService == nil {
return errors.New("database service is not enabled, please check your environment variables DB_DRIVER and DB_DSN")
if userConfig.Backtest.CsvSource == nil {
return fmt.Errorf("user config backtest section needs csvsource config")
}
backtestService := service.NewBacktestServiceCSV(
outputDirectory,
userConfig.Backtest.CsvSource.Market,
userConfig.Backtest.CsvSource.Granularity,
)
if modeCsv {
if err := bbgo.BootstrapEnvironmentLightweight(ctx, environ, userConfig); err != nil {
return err
}
} else {
backtestService = service.NewBacktestService(environ.DatabaseService.DB)
if err := bbgo.BootstrapBacktestEnvironment(ctx, environ); err != nil {
return err
}
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if environ.DatabaseService == nil {
return errors.New("database service is not enabled, please check your environment variables DB_DRIVER and DB_DSN")
}
}
environ.BacktestService = backtestService
bbgo.SetBackTesting(backtestService)
@ -692,7 +710,7 @@ func createSymbolReport(
}
func verify(
userConfig *bbgo.Config, backtestService *service.BacktestService,
userConfig *bbgo.Config, backtestService service.BackTestable,
sourceExchanges map[types.ExchangeName]types.Exchange, startTime, endTime time.Time,
) error {
for _, sourceExchange := range sourceExchanges {
@ -735,7 +753,7 @@ func getExchangeIntervals(ex types.Exchange) types.IntervalMap {
}
func sync(
ctx context.Context, userConfig *bbgo.Config, backtestService *service.BacktestService,
ctx context.Context, userConfig *bbgo.Config, backtestService service.BackTestable,
sourceExchanges map[types.ExchangeName]types.Exchange, syncFrom, syncTo time.Time,
) error {
for _, symbol := range userConfig.Backtest.Symbols {
@ -750,10 +768,8 @@ func sync(
var intervals = supportIntervals.Slice()
intervals.Sort()
for _, interval := range intervals {
if err := backtestService.Sync(ctx, sourceExchange, symbol, interval, syncFrom, syncTo); err != nil {
return err
}
if err := backtestService.Sync(ctx, sourceExchange, symbol, intervals, syncFrom, syncTo); err != nil {
return err
}
}
}

View File

@ -135,7 +135,8 @@ var PnLCmd = &cobra.Command{
// we need the backtest klines for the daily prices
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
if err := backtestService.Sync(ctx, exchange, symbol, types.Interval1d, since, until); err != nil {
intervals := []types.Interval{types.Interval1d}
if err := backtestService.Sync(ctx, exchange, symbol, intervals, since, until); err != nil {
return err
}
}

View File

@ -0,0 +1,161 @@
package csvsource
import (
"encoding/csv"
"errors"
"fmt"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
// MetaTraderTimeFormat is the time format expected by the MetaTrader decoder when cols [0] and [1] are used.
const MetaTraderTimeFormat = "02/01/2006 15:04"
var (
// ErrNotEnoughColumns is returned when the CSV price record does not have enough columns.
ErrNotEnoughColumns = errors.New("not enough columns")
// ErrInvalidTimeFormat is returned when the CSV price record does not have a valid time unix milli format.
ErrInvalidIDFormat = errors.New("cannot parse trade id string")
// ErrInvalidBoolFormat is returned when the CSV isBuyerMaker record does not have a valid bool representation.
ErrInvalidBoolFormat = errors.New("cannot parse bool to string")
// ErrInvalidTimeFormat is returned when the CSV price record does not have a valid time unix milli format.
ErrInvalidTimeFormat = errors.New("cannot parse time string")
// ErrInvalidOrderSideFormat is returned when the CSV side record does not have a valid buy or sell string.
ErrInvalidOrderSideFormat = errors.New("cannot parse order side string")
// ErrInvalidPriceFormat is returned when the CSV price record does not prices in expected format.
ErrInvalidPriceFormat = errors.New("OHLC prices must be valid number format")
// ErrInvalidVolumeFormat is returned when the CSV price record does not have a valid volume format.
ErrInvalidVolumeFormat = errors.New("volume must be valid number format")
)
// CSVKLineDecoder is an extension point for CSVKLineReader to support custom file formats.
type CSVKLineDecoder func(record []string, interval time.Duration) (types.KLine, error)
// NewBinanceCSVKLineReader creates a new CSVKLineReader for Binance CSV files.
func NewBinanceCSVKLineReader(csv *csv.Reader) *CSVKLineReader {
return &CSVKLineReader{
csv: csv,
decoder: BinanceCSVKLineDecoder,
}
}
// BinanceCSVKLineDecoder decodes a CSV record from Binance or Bybit into a KLine.
func BinanceCSVKLineDecoder(record []string, interval time.Duration) (types.KLine, error) {
var (
k, empty types.KLine
err error
)
if len(record) < 5 {
return k, ErrNotEnoughColumns
}
ts, err := strconv.ParseFloat(record[0], 64) // check for e numbers "1.70027E+12"
if err != nil {
return empty, ErrInvalidTimeFormat
}
open, err := fixedpoint.NewFromString(record[1])
if err != nil {
return empty, ErrInvalidPriceFormat
}
high, err := fixedpoint.NewFromString(record[2])
if err != nil {
return empty, ErrInvalidPriceFormat
}
low, err := fixedpoint.NewFromString(record[3])
if err != nil {
return empty, ErrInvalidPriceFormat
}
closing, err := fixedpoint.NewFromString(record[4])
if err != nil {
return empty, ErrInvalidPriceFormat
}
volume := fixedpoint.Zero
if len(record) == 6 {
volume, err = fixedpoint.NewFromString(record[5])
if err != nil {
return empty, ErrInvalidVolumeFormat
}
}
k.StartTime = types.Time(time.UnixMilli(int64(ts)))
k.EndTime = types.Time(k.StartTime.Time().Add(interval))
k.Open = open
k.High = high
k.Low = low
k.Close = closing
k.Volume = volume
return k, nil
}
// NewMetaTraderCSVKLineReader creates a new CSVKLineReader for MetaTrader CSV files.
func NewMetaTraderCSVKLineReader(csv *csv.Reader) *CSVKLineReader {
csv.Comma = ';'
return &CSVKLineReader{
csv: csv,
decoder: MetaTraderCSVKLineDecoder,
}
}
// MetaTraderCSVKLineDecoder decodes a CSV record from MetaTrader into a KLine.
func MetaTraderCSVKLineDecoder(record []string, interval time.Duration) (types.KLine, error) {
var (
k, empty types.KLine
err error
)
if len(record) < 6 {
return k, ErrNotEnoughColumns
}
tStr := fmt.Sprintf("%s %s", record[0], record[1])
t, err := time.Parse(MetaTraderTimeFormat, tStr)
if err != nil {
return empty, ErrInvalidTimeFormat
}
open, err := fixedpoint.NewFromString(record[2])
if err != nil {
return empty, ErrInvalidPriceFormat
}
high, err := fixedpoint.NewFromString(record[3])
if err != nil {
return empty, ErrInvalidPriceFormat
}
low, err := fixedpoint.NewFromString(record[4])
if err != nil {
return empty, ErrInvalidPriceFormat
}
closing, err := fixedpoint.NewFromString(record[5])
if err != nil {
return empty, ErrInvalidPriceFormat
}
volume, err := fixedpoint.NewFromString(record[6])
if err != nil {
return empty, ErrInvalidVolumeFormat
}
k.StartTime = types.NewTimeFromUnix(t.Unix(), 0)
k.EndTime = types.NewTimeFromUnix(t.Add(interval).Unix(), 0)
k.Open = open
k.High = high
k.Low = low
k.Close = closing
k.Volume = volume
return k, nil
}

View File

@ -0,0 +1,65 @@
package csvsource
import (
"encoding/csv"
"io"
"time"
"github.com/c9s/bbgo/pkg/types"
)
var _ KLineReader = (*CSVKLineReader)(nil)
// CSVKLineReader is a KLineReader that reads from a CSV file.
type CSVKLineReader struct {
csv *csv.Reader
decoder CSVKLineDecoder
}
// MakeCSVKLineReader is a factory method type that creates a new CSVKLineReader.
type MakeCSVKLineReader func(csv *csv.Reader) *CSVKLineReader
// NewCSVKLineReader creates a new CSVKLineReader with the default Binance decoder.
func NewCSVKLineReader(csv *csv.Reader) *CSVKLineReader {
return &CSVKLineReader{
csv: csv,
decoder: BinanceCSVKLineDecoder,
}
}
// NewCSVKLineReaderWithDecoder creates a new CSVKLineReader with the given decoder.
func NewCSVKLineReaderWithDecoder(csv *csv.Reader, decoder CSVKLineDecoder) *CSVKLineReader {
return &CSVKLineReader{
csv: csv,
decoder: decoder,
}
}
// Read reads the next KLine from the underlying CSV data.
func (r *CSVKLineReader) Read(interval time.Duration) (types.KLine, error) {
var k types.KLine
rec, err := r.csv.Read()
if err != nil {
return k, err
}
return r.decoder(rec, interval)
}
// ReadAll reads all the KLines from the underlying CSV data.
func (r *CSVKLineReader) ReadAll(interval time.Duration) ([]types.KLine, error) {
var ks []types.KLine
for {
k, err := r.Read(interval)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
ks = append(ks, k)
}
return ks, nil
}

View File

@ -0,0 +1,163 @@
package csvsource
import (
"encoding/csv"
"strings"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/fixedpoint"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types"
)
func assertKLineEq(t *testing.T, exp, act types.KLine, name string) {
assert.True(t, exp.StartTime.Equal(act.StartTime.Time()), name)
assert.Equal(t, 0, exp.Open.Compare(act.Open), name)
assert.Equal(t, 0, exp.High.Compare(act.High), name)
assert.Equal(t, 0, exp.Low.Compare(act.Low), name)
assert.Equal(t, 0, exp.Close.Compare(act.Close), name)
assert.Equal(t, 0, exp.Volume.Compare(act.Volume), name)
}
func TestCSVKLineReader_ReadWithBinanceDecoder(t *testing.T) {
tests := []struct {
name string
give string
want types.KLine
err error
}{
{
name: "Read DOHLCV",
give: "1609459200000,28923.63000000,29031.34000000,28690.17000000,28995.13000000,2311.81144500",
want: types.KLine{
StartTime: types.NewTimeFromUnix(1609459200, 0),
Open: Number(28923.63),
High: Number(29031.34),
Low: Number(28690.17),
Close: Number(28995.13),
// todo this should never happen >>
// mustNewFromString and NewFromFloat have different values after parse
Volume: fixedpoint.MustNewFromString("2311.81144500")},
err: nil,
},
{
name: "Read DOHLC",
give: "1609459200000,28923.63000000,29031.34000000,28690.17000000,28995.13000000",
want: types.KLine{
StartTime: types.NewTimeFromUnix(1609459200, 0),
Open: Number(28923.63),
High: Number(29031.34),
Low: Number(28690.17),
Close: Number(28995.13),
Volume: Number(0)},
err: nil,
},
{
name: "Not enough columns",
give: "1609459200000,28923.63000000,29031.34000000",
want: types.KLine{},
err: ErrNotEnoughColumns,
},
{
name: "Invalid time format",
give: "23/12/2021,28923.63000000,29031.34000000,28690.17000000,28995.13000000",
want: types.KLine{},
err: ErrInvalidTimeFormat,
},
{
name: "Invalid price format",
give: "1609459200000,sixty,29031.34000000,28690.17000000,28995.13000000",
want: types.KLine{},
err: ErrInvalidPriceFormat,
},
{
name: "Invalid volume format",
give: "1609459200000,28923.63000000,29031.34000000,28690.17000000,28995.13000000,vol",
want: types.KLine{},
err: ErrInvalidVolumeFormat,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := NewBinanceCSVKLineReader(csv.NewReader(strings.NewReader(tt.give)))
kline, err := reader.Read(time.Hour)
assert.Equal(t, tt.err, err)
if err == nil {
spew.Dump(tt.want)
spew.Dump(kline)
assertKLineEq(t, tt.want, kline, tt.name)
}
})
}
}
func TestCSVKLineReader_ReadAllWithDefaultDecoder(t *testing.T) {
records := []string{
"1609459200000,28923.63000000,29031.34000000,28690.17000000,28995.13000000,2311.81144500",
"1609459300000,28928.63000000,30031.34000000,22690.17000000,28495.13000000,3000.00",
}
reader := NewCSVKLineReader(csv.NewReader(strings.NewReader(strings.Join(records, "\n"))))
klines, err := reader.ReadAll(time.Hour)
assert.NoError(t, err)
assert.Len(t, klines, 2)
}
func TestCSVKLineReader_ReadWithMetaTraderDecoder(t *testing.T) {
tests := []struct {
name string
give string
want types.KLine
err error
}{
{
name: "Read DOHLCV",
give: "11/12/2008;16:00;779.527679;780.964756;777.527679;779.964756;5",
want: types.KLine{
StartTime: types.NewTimeFromUnix(time.Date(2008, 12, 11, 16, 0, 0, 0, time.UTC).Unix(), 0),
Open: Number(779.527679),
High: Number(780.964756),
Low: Number(777.527679),
Close: Number(779.964756),
Volume: Number(5)},
err: nil,
},
{
name: "Not enough columns",
give: "1609459200000;28923.63000000;29031.34000000",
want: types.KLine{},
err: ErrNotEnoughColumns,
},
{
name: "Invalid time format",
give: "23/12/2021;t;28923.63000000;29031.34000000;28690.17000000;28995.13000000",
want: types.KLine{},
err: ErrInvalidTimeFormat,
},
{
name: "Invalid price format",
give: "11/12/2008;00:00;sixty;29031.34000000;28690.17000000;28995.13000000",
want: types.KLine{},
err: ErrInvalidPriceFormat,
},
{
name: "Invalid volume format",
give: "11/12/2008;00:00;779.527679;780.964756;777.527679;779.964756;vol",
want: types.KLine{},
err: ErrInvalidVolumeFormat,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := NewMetaTraderCSVKLineReader(csv.NewReader(strings.NewReader(tt.give)))
kline, err := reader.Read(time.Hour)
assert.Equal(t, tt.err, err)
assertKLineEq(t, tt.want, kline, tt.name)
})
}
}

View File

@ -0,0 +1,167 @@
package csvsource
import (
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type ICSVTickConverter interface {
CsvTickToKLine(tick *CsvTick) (closesKLine bool)
GetTicks() []*CsvTick
LatestKLine(interval types.Interval) (k *types.KLine)
GetKLineResults() map[types.Interval][]types.KLine
}
// CSVTickConverter takes a tick and internally converts it to a KLine slice
type CSVTickConverter struct {
ticks []*CsvTick
intervals []types.Interval
klines map[types.Interval][]types.KLine
}
func NewCSVTickConverter(intervals []types.Interval) ICSVTickConverter {
return &CSVTickConverter{
ticks: []*CsvTick{},
intervals: intervals,
klines: make(map[types.Interval][]types.KLine),
}
}
func (c *CSVTickConverter) GetTicks() []*CsvTick {
return c.ticks
}
func (c *CSVTickConverter) AddKLine(interval types.Interval, k types.KLine) {
c.klines[interval] = append(c.klines[interval], k)
}
// GetKLineResult returns the converted ticks as kLine of interval
func (c *CSVTickConverter) LatestKLine(interval types.Interval) (k *types.KLine) {
if _, ok := c.klines[interval]; !ok || len(c.klines[interval]) == 0 {
return nil
}
return &c.klines[interval][len(c.klines[interval])-1]
}
// GetKLineResults returns the converted ticks as kLine of all constructed intervals
func (c *CSVTickConverter) GetKLineResults() map[types.Interval][]types.KLine {
if len(c.klines) == 0 {
return nil
}
return c.klines
}
// Convert ticks to KLine with interval
func (c *CSVTickConverter) CsvTickToKLine(tick *CsvTick) (closesKLine bool) {
for _, interval := range c.intervals {
var (
currentCandle = types.KLine{}
high = fixedpoint.Zero
low = fixedpoint.Zero
)
isOpen, t := c.detCandleStart(tick.Timestamp.Time(), interval)
if isOpen {
latestKline := c.LatestKLine(interval)
if latestKline != nil {
latestKline.Closed = true // k is pointer
closesKLine = true
c.addMissingKLines(interval, t)
}
c.AddKLine(interval, types.KLine{
Exchange: tick.Exchange,
Symbol: tick.Symbol,
Interval: interval,
StartTime: types.NewTimeFromUnix(t.Unix(), 0),
EndTime: types.NewTimeFromUnix(t.Add(interval.Duration()).Unix(), 0),
Open: tick.Price,
High: tick.Price,
Low: tick.Price,
Close: tick.Price,
Volume: tick.HomeNotional,
QuoteVolume: tick.ForeignNotional,
Closed: false,
})
return
}
currentCandle = c.klines[interval][len(c.klines[interval])-1]
if tick.Price.Compare(currentCandle.High) > 0 {
high = tick.Price
} else {
high = currentCandle.High
}
if tick.Price.Compare(currentCandle.Low) < 0 {
low = tick.Price
} else {
low = currentCandle.Low
}
c.klines[interval][len(c.klines[interval])-1] = types.KLine{
StartTime: currentCandle.StartTime,
EndTime: currentCandle.EndTime,
Exchange: tick.Exchange,
Symbol: tick.Symbol,
Interval: interval,
Open: currentCandle.Open,
High: high,
Low: low,
Close: tick.Price,
Volume: currentCandle.Volume.Add(tick.HomeNotional),
QuoteVolume: currentCandle.QuoteVolume.Add(tick.ForeignNotional),
Closed: false,
}
}
return
}
func (c *CSVTickConverter) detCandleStart(ts time.Time, interval types.Interval) (isOpen bool, t time.Time) {
if len(c.klines) == 0 {
return true, interval.Truncate(ts)
}
var end = c.LatestKLine(interval).EndTime.Time()
if ts.After(end) {
return true, end
}
return false, t
}
// appendMissingKLines appends an empty kline till startNext falls within a kline interval
func (c *CSVTickConverter) addMissingKLines(
interval types.Interval,
startNext time.Time,
) {
for {
last := c.LatestKLine(interval)
newEndTime := types.NewTimeFromUnix(
// one second is the smallest interval
last.EndTime.Time().Add(time.Duration(last.Interval.Seconds())*time.Second).Unix(),
0,
)
if last.EndTime.Time().Before(startNext) {
c.AddKLine(interval, types.KLine{
StartTime: last.EndTime,
EndTime: newEndTime,
Exchange: last.Exchange,
Symbol: last.Symbol,
Interval: last.Interval,
Open: last.Close,
High: last.Close,
Low: last.Close,
Close: last.Close,
Volume: 0,
QuoteVolume: 0,
Closed: true,
})
} else {
break
}
}
}

View File

@ -0,0 +1,193 @@
package csvsource
import (
"encoding/csv"
"strconv"
"time"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
// CSVTickDecoder is an extension point for CSVTickReader to support custom file formats.
type CSVTickDecoder func(record []string, index int) (*CsvTick, error)
// NewBinanceCSVTickReader creates a new CSVTickReader for Binance CSV files.
func NewBinanceCSVTickReader(csv *csv.Reader) *CSVTickReader {
return &CSVTickReader{
csv: csv,
decoder: BinanceCSVTickDecoder,
}
}
// BinanceCSVKLineDecoder decodes a CSV record from Binance into a CsvTick.
func BinanceCSVTickDecoder(row []string, _ int) (*CsvTick, error) {
if len(row) < 5 {
return nil, ErrNotEnoughColumns
}
// example csv row for some reason some properties are duplicated in their csv
// id, price, qty, base_qty, base_qty, time, is_buyer_maker, is_buyer_maker,
// 11782578,6.00000000,1.00000000,14974844,14974844,1698623884463,True
id, err := strconv.ParseUint(row[0], 10, 64)
if err != nil {
return nil, ErrInvalidIDFormat
}
price, err := fixedpoint.NewFromString(row[1])
if err != nil {
return nil, ErrInvalidPriceFormat
}
qty, err := fixedpoint.NewFromString(row[2])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
baseQty, err := fixedpoint.NewFromString(row[3])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
isBuyerMaker, err := strconv.ParseBool(row[6])
if err != nil {
return nil, err
}
// isBuyerMaker=false trade will qualify as BUY.
side := types.SideTypeBuy
if isBuyerMaker {
side = types.SideTypeSell
}
n, err := strconv.ParseFloat(row[5], 64)
if err != nil {
return nil, ErrInvalidTimeFormat
}
ts := time.Unix(int64(n), 0)
return &CsvTick{
TradeID: id,
Exchange: types.ExchangeBinance,
Side: side,
Size: qty,
Price: price,
IsBuyerMaker: isBuyerMaker,
HomeNotional: price.Mul(qty),
ForeignNotional: price.Mul(baseQty),
Timestamp: types.NewMillisecondTimestampFromInt(ts.UnixMilli()),
// Symbol: must be overwritten - info not in csv,
// TickDirection: would need to keep last tick in memory to compare tick direction,
}, nil
}
// NewBinanceCSVTickReader creates a new CSVTickReader for Bybit CSV files.
func NewBybitCSVTickReader(csv *csv.Reader) *CSVTickReader {
return &CSVTickReader{
csv: csv,
decoder: BybitCSVTickDecoder,
}
}
// BybitCSVTickDecoder decodes a CSV record from Bybit into a CsvTick.
func BybitCSVTickDecoder(row []string, index int) (*CsvTick, error) {
// example csv row
// timestamp,symbol,side,size,price,tickDirection,trdMatchID,grossValue,homeNotional,foreignNotional
// 1649054912,FXSUSDT,Buy,0.01,38.32,PlusTick,9c30abaf-80ae-5ebf-9850-58fe7ed4bac8,3.832e+07,0.01,0.3832
if len(row) < 9 {
return nil, ErrNotEnoughColumns
}
if index == 0 {
return nil, nil
}
side, err := types.StrToSideType(row[2])
if err != nil {
return nil, ErrInvalidOrderSideFormat
}
size, err := fixedpoint.NewFromString(row[3])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
price, err := fixedpoint.NewFromString(row[4])
if err != nil {
return nil, ErrInvalidPriceFormat
}
hn, err := fixedpoint.NewFromString(row[8])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
fn, err := fixedpoint.NewFromString(row[9])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
n, err := strconv.ParseFloat(row[0], 64) // startTime eg 1696982287.4922
if err != nil {
return nil, ErrInvalidTimeFormat
}
ts := time.Unix(int64(n), 0)
return &CsvTick{
TradeID: uint64(index),
Symbol: row[1],
Exchange: types.ExchangeBybit,
Side: side,
Size: size,
Price: price,
HomeNotional: hn,
ForeignNotional: fn,
TickDirection: row[5], // todo does this seem promising to define for other exchanges too?
Timestamp: types.NewMillisecondTimestampFromInt(ts.UnixMilli()),
}, nil
}
// NewOKExCSVTickReader creates a new CSVTickReader for OKEx CSV files.
func NewOKExCSVTickReader(csv *csv.Reader) *CSVTickReader {
return &CSVTickReader{
csv: csv,
decoder: OKExCSVTickDecoder,
}
}
// OKExCSVKLineDecoder decodes a CSV record from OKEx into a CsvTick.
func OKExCSVTickDecoder(row []string, index int) (*CsvTick, error) {
if len(row) < 5 {
return nil, ErrNotEnoughColumns
}
if index == 0 {
return nil, nil
}
// example csv row for OKeX
// trade_id, side, size, price, created_time
// 134642, sell, 6.2638 6.507 1.69975E+12
id, err := strconv.ParseInt(row[0], 10, 64)
if err != nil {
return nil, ErrInvalidIDFormat
}
price, err := fixedpoint.NewFromString(row[3])
if err != nil {
return nil, ErrInvalidPriceFormat
}
qty, err := fixedpoint.NewFromString(row[2])
if err != nil {
return nil, ErrInvalidVolumeFormat
}
side := types.SideTypeBuy
isBuyerMaker := false
if row[1] == "sell" {
side = types.SideTypeSell
isBuyerMaker = true
}
n, err := strconv.ParseFloat(row[4], 64) // startTime
if err != nil {
return nil, ErrInvalidTimeFormat
}
ts := time.UnixMilli(int64(n))
return &CsvTick{
TradeID: uint64(id),
Exchange: types.ExchangeOKEx,
Side: side,
Size: qty,
Price: price,
IsBuyerMaker: isBuyerMaker,
HomeNotional: price.Mul(qty),
Timestamp: types.NewMillisecondTimestampFromInt(ts.UnixMilli()),
// ForeignNotional: // info not in csv
// Symbol: must be overwritten - info not in csv
// TickDirection: would need to keep last tick in memory to compare tick direction,
}, nil
}

View File

@ -0,0 +1,66 @@
package csvsource
import (
"encoding/csv"
"io"
)
var _ TickReader = (*CSVTickReader)(nil)
// CSVTickReader is a CSVTickReader that reads from a CSV file.
type CSVTickReader struct {
csv *csv.Reader
decoder CSVTickDecoder
ticks []*CsvTick
}
// MakeCSVTickReader is a factory method type that creates a new CSVTickReader.
type MakeCSVTickReader func(csv *csv.Reader) *CSVTickReader
// NewCSVKLineReader creates a new CSVKLineReader with the default Binance decoder.
func NewCSVTickReader(csv *csv.Reader) *CSVTickReader {
return &CSVTickReader{
csv: csv,
decoder: BinanceCSVTickDecoder,
}
}
// NewCSVTickReaderWithDecoder creates a new CSVKLineReader with the given decoder.
func NewCSVTickReaderWithDecoder(csv *csv.Reader, decoder CSVTickDecoder) *CSVTickReader {
return &CSVTickReader{
csv: csv,
decoder: decoder,
}
}
// ReadAll reads all the KLines from the underlying CSV data.
func (r *CSVTickReader) ReadAll() (ticks []*CsvTick, err error) {
var i int
for {
tick, err := r.Read(i)
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
i++ // used as jump logic inside decoder to skip csv headers in case
if tick == nil {
continue
}
ticks = append(ticks, tick)
}
return ticks, nil
}
// Read reads the next KLine from the underlying CSV data.
func (r *CSVTickReader) Read(i int) (*CsvTick, error) {
rec, err := r.csv.Read()
if err != nil {
return nil, err
}
return r.decoder(rec, i)
}

View File

@ -0,0 +1,75 @@
package csvsource
import (
"encoding/csv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
. "github.com/c9s/bbgo/pkg/testing/testhelper"
"github.com/c9s/bbgo/pkg/types"
)
func TestCSVTickReader_ReadWithBinanceDecoder(t *testing.T) {
tests := []struct {
name string
give string
want *CsvTick
err error
}{
{
name: "Read Tick",
give: "11782578,6.00000000,1.00000000,14974844,14974844,1698623884463,True,True",
want: &CsvTick{
Timestamp: types.NewMillisecondTimestampFromInt(1698623884463),
Size: Number(1),
Price: Number(6),
HomeNotional: Number(6),
},
err: nil,
},
{
name: "Not enough columns",
give: "1609459200000,28923.63000000,29031.34000000",
want: nil,
err: ErrNotEnoughColumns,
},
{
name: "Invalid time format",
give: "11782578,6.00000000,1.00000000,14974844,14974844,23/12/2021,True,True",
want: nil,
err: ErrInvalidTimeFormat,
},
{
name: "Invalid price format",
give: "11782578,sixty,1.00000000,14974844,14974844,1698623884463,True,True",
want: nil,
err: ErrInvalidPriceFormat,
},
{
name: "Invalid size format",
give: "11782578,1.00000000,one,14974844,14974844,1698623884463,True,True",
want: nil,
err: ErrInvalidVolumeFormat,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
reader := NewBinanceCSVTickReader(csv.NewReader(strings.NewReader(tt.give)))
tick, err := reader.Read(0)
if err == nil {
assertTickEqual(t, tt.want, tick)
}
assert.Equal(t, tt.err, err)
})
}
}
func assertTickEqual(t *testing.T, exp, act *CsvTick) {
assert.Equal(t, exp.Timestamp.Time(), act.Timestamp.Time())
assert.Equal(t, 0, exp.Price.Compare(act.Price))
assert.Equal(t, 0, exp.Size.Compare(act.Size))
assert.Equal(t, 0, exp.HomeNotional.Compare(act.HomeNotional))
}

View File

@ -0,0 +1,59 @@
package csvsource
import (
"encoding/csv"
"io/fs"
"os"
"path/filepath"
"time"
"github.com/c9s/bbgo/pkg/types"
)
// KLineReader is an interface for reading candlesticks.
type KLineReader interface {
Read(interval time.Duration) (types.KLine, error)
ReadAll(interval time.Duration) ([]types.KLine, error)
}
// ReadKLinesFromCSV reads all the .csv files in a given directory or a single file into a slice of KLines.
// Wraps a default CSVKLineReader with Binance decoder for convenience.
// For finer grained memory management use the base kline reader.
func ReadKLinesFromCSV(path string, interval time.Duration) ([]types.KLine, error) {
return ReadKLinesFromCSVWithDecoder(path, interval, MakeCSVKLineReader(NewBinanceCSVKLineReader))
}
// ReadKLinesFromCSVWithDecoder permits using a custom CSVKLineReader.
func ReadKLinesFromCSVWithDecoder(path string, interval time.Duration, maker MakeCSVKLineReader) ([]types.KLine, error) {
var klines []types.KLine
err := filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if filepath.Ext(path) != ".csv" {
return nil
}
file, err := os.Open(path)
if err != nil {
return err
}
//nolint:errcheck // Read ops only so safe to ignore err return
defer file.Close()
reader := maker(csv.NewReader(file))
newKlines, err := reader.ReadAll(interval)
if err != nil {
return err
}
klines = append(klines, newKlines...)
return nil
})
if err != nil {
return nil, err
}
return klines, nil
}

View File

@ -0,0 +1,21 @@
package csvsource
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestReadKLinesFromCSV(t *testing.T) {
klines, err := ReadKLinesFromCSV("./testdata/binance/BTCUSDT-1h-2023-11-18.csv", time.Hour)
assert.NoError(t, err)
assert.Len(t, klines, 24)
assert.Equal(t, int64(1700265600), klines[0].StartTime.Unix(), "StartTime")
assert.Equal(t, int64(1700269200), klines[0].EndTime.Unix(), "EndTime")
assert.Equal(t, 36613.91, klines[0].Open.Float64(), "Open")
assert.Equal(t, 36613.92, klines[0].High.Float64(), "High")
assert.Equal(t, 36388.12, klines[0].Low.Float64(), "Low")
assert.Equal(t, 36400.01, klines[0].Close.Float64(), "Close")
assert.Equal(t, 1005.75727, klines[0].Volume.Float64(), "Volume")
}

View File

@ -0,0 +1,89 @@
package csvsource
import (
"encoding/csv"
"io/fs"
"os"
"path/filepath"
"sort"
"github.com/c9s/bbgo/pkg/types"
)
// TickReader is an interface for reading candlesticks.
type TickReader interface {
Read(i int) (*CsvTick, error)
ReadAll() (ticks []*CsvTick, err error)
}
// ReadTicksFromCSV reads all the .csv files in a given directory or a single file into a slice of Ticks.
// Wraps a default CSVTickReader with Binance decoder for convenience.
// For finer grained memory management use the base kline reader.
func ReadTicksFromCSV(
path, symbol string,
intervals []types.Interval,
) (
klineMap map[types.Interval][]types.KLine,
err error,
) {
return ReadTicksFromCSVWithDecoder(
path,
symbol,
intervals,
MakeCSVTickReader(NewBinanceCSVTickReader),
)
}
// ReadTicksFromCSVWithDecoder permits using a custom CSVTickReader.
func ReadTicksFromCSVWithDecoder(
path, symbol string,
intervals []types.Interval,
maker MakeCSVTickReader,
) (
klineMap map[types.Interval][]types.KLine,
err error,
) {
converter := NewCSVTickConverter(intervals)
ticks := []*CsvTick{}
// read all ticks into memory
err = filepath.WalkDir(path, func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}
if d.IsDir() {
return nil
}
if filepath.Ext(path) != ".csv" {
return nil
}
file, err := os.Open(path)
if err != nil {
return err
}
//nolint:errcheck // Read ops only so safe to ignore err return
defer file.Close()
reader := maker(csv.NewReader(file))
newTicks, err := reader.ReadAll()
if err != nil {
return err
}
ticks = append(ticks, newTicks...)
return nil
})
if err != nil {
return nil, err
}
// sort ticks by timestamp (okex sorts csv by price ascending ;(
sort.Slice(ticks, func(i, j int) bool {
return ticks[i].Timestamp.Time().Before(ticks[j].Timestamp.Time())
})
for _, tick := range ticks {
tick.Symbol = symbol
converter.CsvTickToKLine(tick)
}
return converter.GetKLineResults(), nil
}

View File

@ -0,0 +1,67 @@
package csvsource
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/types"
)
func TestReadTicksFromBinanceCSV(t *testing.T) {
path := "./testdata/binance/FXSUSDT-ticks-2023-10-29.csv"
symbol := "FXSUSDT"
intervals := []types.Interval{types.Interval1h}
klineMap, err := ReadTicksFromCSVWithDecoder(
path, symbol, intervals, MakeCSVTickReader(NewBinanceCSVTickReader),
)
klines := klineMap[types.Interval1h]
assert.NoError(t, err)
assert.Len(t, klines, 1)
assert.Equal(t, int64(1698620400), klines[0].StartTime.Unix(), "StartTime")
assert.Equal(t, int64(1698624000), klines[0].EndTime.Unix(), "EndTime")
assert.Equal(t, 6.0, klines[0].Open.Float64(), "Open")
assert.Equal(t, 6.0, klines[0].High.Float64(), "High")
assert.Equal(t, 6.0, klines[0].Low.Float64(), "Low")
assert.Equal(t, 6.0, klines[0].Close.Float64(), "Close")
assert.Equal(t, 111.0, klines[0].Volume.Float64(), "Volume")
}
func TestReadTicksFromBybitCSV(t *testing.T) {
path := "./testdata/bybit/FXSUSDT2023-10-10.csv"
symbol := "FXSUSDT"
intervals := []types.Interval{types.Interval1h}
klineMap, err := ReadTicksFromCSVWithDecoder(
path, symbol, intervals, MakeCSVTickReader(NewBybitCSVTickReader),
)
klines := klineMap[types.Interval1h]
assert.NoError(t, err)
assert.Len(t, klines, 1)
assert.Equal(t, int64(1696978800), klines[0].StartTime.Unix(), "StartTime")
assert.Equal(t, int64(1696982400), klines[0].EndTime.Unix(), "EndTime")
assert.Equal(t, 5.239, klines[0].Open.Float64(), "Open")
assert.Equal(t, 5.2495, klines[0].High.Float64(), "High")
assert.Equal(t, 5.239, klines[0].Low.Float64(), "Low")
assert.Equal(t, 5.2495, klines[0].Close.Float64(), "Close")
assert.Equal(t, 147.05, klines[0].Volume.Float64(), "Volume")
}
func TestReadTicksFromOkexCSV(t *testing.T) {
path := "./testdata/okex/BTC-USDT-aggtrades-2023-11-18.csv"
symbol := "BTCUSDT"
intervals := []types.Interval{types.Interval1h}
klineMap, err := ReadTicksFromCSVWithDecoder(
path, symbol, intervals, MakeCSVTickReader(NewOKExCSVTickReader),
)
klines := klineMap[types.Interval1h]
assert.NoError(t, err)
assert.Len(t, klines, 1)
assert.Equal(t, int64(1700236800), klines[0].StartTime.Unix(), "StartTime")
assert.Equal(t, int64(1700240400), klines[0].EndTime.Unix(), "EndTime")
assert.Equal(t, 35910.6, klines[0].Open.Float64(), "Open")
assert.Equal(t, 35914.4, klines[0].High.Float64(), "High")
assert.Equal(t, 35910.6, klines[0].Low.Float64(), "Low")
assert.Equal(t, 35914.4, klines[0].Close.Float64(), "Close")
assert.Equal(t, 51525.38700081, klines[0].Volume.Float64(), "Volume")
}

View File

@ -0,0 +1,24 @@
1.70027E+12,36613.91,36613.92,36388.12,36400.01,1005.75727,1.70027E+12,36712312.49831390,35985,440.81212,16088534.76584510,0
1.70027E+12,36400.01,36456.53,36377.88,36405.46,507.51514,1.70027E+12,18486014.80771630,25492,236.98883,8631509.902,0
1.70027E+12,36405.47,36447.75,36390.44,36408.09,341.53256,1.70028E+12,12438100.14362490,19727,163.41072,5951076.256,0
1.70028E+12,36408.1,36424.01,36360.22,36371.81,444.73045,1.70028E+12,16180477.67130620,25089,207.88416,7562956.27,0
1.70028E+12,36371.8,36426.51,36369.45,36369.45,378.50007,1.70028E+12,13775839.75999540,20728,197.99667,7205858.497,0
1.70028E+12,36369.45,36378.65,36303.98,36334,629.09574,1.70029E+12,22862757.37912180,33883,269.3097,9787132.913,0
1.70029E+12,36334.01,36361.1,36250.01,36252,615.52755,1.70029E+12,22350527.58640450,30392,238.90543,8675479.987,0
1.70029E+12,36251.99,36428,36178.58,36417.16,1191.24433,1.70029E+12,43265058.27238300,41466,628.67499,22834722.23764540,0
1.70029E+12,36417.15,36479.22,36375.28,36448.01,600.66262,1.7003E+12,21883116.44525150,29227,301.84047,10996141.34025750,0
1.7003E+12,36448,36453.09,36392,36397.45,398.07607,1.7003E+12,14499345.43090060,22193,159.60456,5813290.376,0
1.7003E+12,36397.44,36486.48,36397.44,36472.46,601.46574,1.70031E+12,21917527.53081410,24881,354.42545,12916946.80705190,0
1.70031E+12,36472.46,36538.61,36400,36402.8,549.76216,1.70031E+12,20053594.27145890,29706,248.4342,9062453.31,0
1.70031E+12,36402.79,36484.31,36393.44,36449.3,513.24545,1.70031E+12,18705069.10380380,26631,244.2024,8898609.715,0
1.70031E+12,36449.3,36483.13,36347.69,36430.21,887.7206,1.70032E+12,32327899.78688460,41973,391.19851,14246544.95513180,0
1.70032E+12,36430.21,36568.76,36421.1,36507.03,803.12819,1.70032E+12,29307346.75876810,36941,447.83113,16341815.04367800,0
1.70032E+12,36507.04,36682.2,36505.14,36664.16,1440.91018,1.70032E+12,52738306.52534310,50174,755.06676,27635771.88129150,0
1.70032E+12,36664.17,36845.49,36639.96,36674,1669.58835,1.70033E+12,61326939.61543430,61313,823.2455,30239331.84409360,0
1.70033E+12,36674,36701.76,36600.1,36620,933.50168,1.70033E+12,34203500.13911480,39514,402.07999,14731130.96536590,0
1.70033E+12,36620.01,36745.5,36611.47,36707.19,583.0753,1.70033E+12,21373512.11709470,29144,289.6688,10617175.15516390,0
1.70033E+12,36707.18,36768,36653.51,36679.1,598.67548,1.70034E+12,21974467.38315690,31067,277.3739,10180364.08398050,0
1.70034E+12,36679.09,36707.16,36536.08,36598,779.88183,1.70034E+12,28546655.20047820,43054,314.94592,11526766.07883920,0
1.70034E+12,36597.99,36611.78,36524.31,36542.01,581.49791,1.70034E+12,21265094.72761260,36319,230.4279,8425965.352,0
1.70034E+12,36542.01,36556.56,36443.1,36506,543.12589,1.70035E+12,19821336.79152930,27472,245.46068,8956420.024,0
1.70035E+12,36506,36571.33,36498.34,36568.1,504.0213,1.70035E+12,18419269.57915190,22127,231.75877,8469107.669,0
1 1.70027E+12 36613.91 36613.92 36388.12 36400.01 1005.75727 1.70027E+12 36712312.49831390 35985 440.81212 16088534.76584510 0
2 1.70027E+12 36400.01 36456.53 36377.88 36405.46 507.51514 1.70027E+12 18486014.80771630 25492 236.98883 8631509.902 0
3 1.70027E+12 36405.47 36447.75 36390.44 36408.09 341.53256 1.70028E+12 12438100.14362490 19727 163.41072 5951076.256 0
4 1.70028E+12 36408.1 36424.01 36360.22 36371.81 444.73045 1.70028E+12 16180477.67130620 25089 207.88416 7562956.27 0
5 1.70028E+12 36371.8 36426.51 36369.45 36369.45 378.50007 1.70028E+12 13775839.75999540 20728 197.99667 7205858.497 0
6 1.70028E+12 36369.45 36378.65 36303.98 36334 629.09574 1.70029E+12 22862757.37912180 33883 269.3097 9787132.913 0
7 1.70029E+12 36334.01 36361.1 36250.01 36252 615.52755 1.70029E+12 22350527.58640450 30392 238.90543 8675479.987 0
8 1.70029E+12 36251.99 36428 36178.58 36417.16 1191.24433 1.70029E+12 43265058.27238300 41466 628.67499 22834722.23764540 0
9 1.70029E+12 36417.15 36479.22 36375.28 36448.01 600.66262 1.7003E+12 21883116.44525150 29227 301.84047 10996141.34025750 0
10 1.7003E+12 36448 36453.09 36392 36397.45 398.07607 1.7003E+12 14499345.43090060 22193 159.60456 5813290.376 0
11 1.7003E+12 36397.44 36486.48 36397.44 36472.46 601.46574 1.70031E+12 21917527.53081410 24881 354.42545 12916946.80705190 0
12 1.70031E+12 36472.46 36538.61 36400 36402.8 549.76216 1.70031E+12 20053594.27145890 29706 248.4342 9062453.31 0
13 1.70031E+12 36402.79 36484.31 36393.44 36449.3 513.24545 1.70031E+12 18705069.10380380 26631 244.2024 8898609.715 0
14 1.70031E+12 36449.3 36483.13 36347.69 36430.21 887.7206 1.70032E+12 32327899.78688460 41973 391.19851 14246544.95513180 0
15 1.70032E+12 36430.21 36568.76 36421.1 36507.03 803.12819 1.70032E+12 29307346.75876810 36941 447.83113 16341815.04367800 0
16 1.70032E+12 36507.04 36682.2 36505.14 36664.16 1440.91018 1.70032E+12 52738306.52534310 50174 755.06676 27635771.88129150 0
17 1.70032E+12 36664.17 36845.49 36639.96 36674 1669.58835 1.70033E+12 61326939.61543430 61313 823.2455 30239331.84409360 0
18 1.70033E+12 36674 36701.76 36600.1 36620 933.50168 1.70033E+12 34203500.13911480 39514 402.07999 14731130.96536590 0
19 1.70033E+12 36620.01 36745.5 36611.47 36707.19 583.0753 1.70033E+12 21373512.11709470 29144 289.6688 10617175.15516390 0
20 1.70033E+12 36707.18 36768 36653.51 36679.1 598.67548 1.70034E+12 21974467.38315690 31067 277.3739 10180364.08398050 0
21 1.70034E+12 36679.09 36707.16 36536.08 36598 779.88183 1.70034E+12 28546655.20047820 43054 314.94592 11526766.07883920 0
22 1.70034E+12 36597.99 36611.78 36524.31 36542.01 581.49791 1.70034E+12 21265094.72761260 36319 230.4279 8425965.352 0
23 1.70034E+12 36542.01 36556.56 36443.1 36506 543.12589 1.70035E+12 19821336.79152930 27472 245.46068 8956420.024 0
24 1.70035E+12 36506 36571.33 36498.34 36568.1 504.0213 1.70035E+12 18419269.57915190 22127 231.75877 8469107.669 0

View File

@ -0,0 +1,5 @@
11782578,6.00000000,1.00000000,14974844,14974844,1698623884463,True,True
11782579,6.00000000,1.00000000,14974845,14974845,1698623884666,True,True
11782580,6.00000000,1.00000000,14974846,14974846,1698623893793,True,True
11782581,6.00000000,5.00000000,14974847,14974847,1698623920955,True,True
11782582,6.00000000,10.50000000,14974848,14974848,1698623939783,False,True
1 11782578 6.00000000 1.00000000 14974844 14974844 1698623884463 True True
2 11782579 6.00000000 1.00000000 14974845 14974845 1698623884666 True True
3 11782580 6.00000000 1.00000000 14974846 14974846 1698623893793 True True
4 11782581 6.00000000 5.00000000 14974847 14974847 1698623920955 True True
5 11782582 6.00000000 10.50000000 14974848 14974848 1698623939783 False True

View File

@ -0,0 +1,16 @@
timestamp,symbol,side,size,price,tickDirection,trdMatchID,grossValue,homeNotional,foreignNotional
1696982287.4922,FXSUSDT,Sell,0.86,5.2390,ZeroMinusTick,f7496ecb-b174-51b9-ba56-150186ba6c27,4.50554e+08,0.86,4.50554
1696982322.0561,FXSUSDT,Buy,0.13,5.2395,PlusTick,2089f1f4-d890-5762-a652-49a743fab436,6.81135e+07,0.13,0.6811349999999999
1696982333.0308,FXSUSDT,Buy,48.9,5.2420,PlusTick,8e7d405a-0003-5aa1-972d-46b08fe520c0,2.563338e+10,48.9,256.3338
1696982333.0377,FXSUSDT,Buy,0.77,5.2425,PlusTick,9f250e94-da5b-5a94-9126-084e46c9c692,4.0367249999999994e+08,0.77,4.036725
1696982359.7441,FXSUSDT,Buy,0.12,5.2450,PlusTick,08a0c666-da06-53f6-8eec-9d3462582b4f,6.293999999999999e+07,0.12,0.6294
1696982359.7441,FXSUSDT,Buy,0.19,5.2450,ZeroMinusTick,8a61753b-2a8e-5881-8e9b-9ad66806ee23,9.9655e+07,0.19,0.99655
1696982359.7443,FXSUSDT,Buy,12.12,5.2450,ZeroMinusTick,34b2f272-2f68-5d0a-a4ad-6c02f5342ca1,6.356939999999999e+09,12.12,63.569399999999995
1696982359.7443,FXSUSDT,Buy,2.19,5.2450,ZeroMinusTick,0cae9717-0fe1-51dd-bb10-1a61d5e83d98,1.148655e+09,2.19,11.48655
1696982359.7449,FXSUSDT,Buy,35.66,5.2450,ZeroMinusTick,0a5f0734-af3a-5439-9f17-d98ce7ea4f24,1.870367e+10,35.66,187.0367
1696982359.7512,FXSUSDT,Buy,10.97,5.2450,ZeroMinusTick,d8529e38-d3f5-5a7a-97f7-55cf3335de77,5.753765000000001e+09,10.97,57.537650000000006
1696982359.7512,FXSUSDT,Buy,22.97,5.2450,ZeroMinusTick,44361b86-78e1-533b-a3d0-7dd12d538992,1.2047765e+10,22.97,120.47765
1696982369.5962,FXSUSDT,Buy,0.05,5.2470,PlusTick,6800a047-b6e5-520a-9817-eb4d463a3cce,2.6235000000000004e+07,0.05,0.26235
1696982389.6288,FXSUSDT,Buy,0.02,5.2495,PlusTick,a4bc238f-3e6a-58a3-a012-1342563c2ced,1.0499000000000002e+07,0.02,0.10499000000000001
1696982389.6288,FXSUSDT,Buy,6.06,5.2495,ZeroMinusTick,eb27200e-c34e-537a-a0a0-4636dab66f07,3.181197e+09,6.06,31.81197
1696982389.6297,FXSUSDT,Buy,6.04,5.2495,ZeroMinusTick,c6badf81-05c5-5b35-b932-3c71941340fb,3.170698e+09,6.04,31.70698
1 timestamp symbol side size price tickDirection trdMatchID grossValue homeNotional foreignNotional
2 1696982287.4922 FXSUSDT Sell 0.86 5.2390 ZeroMinusTick f7496ecb-b174-51b9-ba56-150186ba6c27 4.50554e+08 0.86 4.50554
3 1696982322.0561 FXSUSDT Buy 0.13 5.2395 PlusTick 2089f1f4-d890-5762-a652-49a743fab436 6.81135e+07 0.13 0.6811349999999999
4 1696982333.0308 FXSUSDT Buy 48.9 5.2420 PlusTick 8e7d405a-0003-5aa1-972d-46b08fe520c0 2.563338e+10 48.9 256.3338
5 1696982333.0377 FXSUSDT Buy 0.77 5.2425 PlusTick 9f250e94-da5b-5a94-9126-084e46c9c692 4.0367249999999994e+08 0.77 4.036725
6 1696982359.7441 FXSUSDT Buy 0.12 5.2450 PlusTick 08a0c666-da06-53f6-8eec-9d3462582b4f 6.293999999999999e+07 0.12 0.6294
7 1696982359.7441 FXSUSDT Buy 0.19 5.2450 ZeroMinusTick 8a61753b-2a8e-5881-8e9b-9ad66806ee23 9.9655e+07 0.19 0.99655
8 1696982359.7443 FXSUSDT Buy 12.12 5.2450 ZeroMinusTick 34b2f272-2f68-5d0a-a4ad-6c02f5342ca1 6.356939999999999e+09 12.12 63.569399999999995
9 1696982359.7443 FXSUSDT Buy 2.19 5.2450 ZeroMinusTick 0cae9717-0fe1-51dd-bb10-1a61d5e83d98 1.148655e+09 2.19 11.48655
10 1696982359.7449 FXSUSDT Buy 35.66 5.2450 ZeroMinusTick 0a5f0734-af3a-5439-9f17-d98ce7ea4f24 1.870367e+10 35.66 187.0367
11 1696982359.7512 FXSUSDT Buy 10.97 5.2450 ZeroMinusTick d8529e38-d3f5-5a7a-97f7-55cf3335de77 5.753765000000001e+09 10.97 57.537650000000006
12 1696982359.7512 FXSUSDT Buy 22.97 5.2450 ZeroMinusTick 44361b86-78e1-533b-a3d0-7dd12d538992 1.2047765e+10 22.97 120.47765
13 1696982369.5962 FXSUSDT Buy 0.05 5.2470 PlusTick 6800a047-b6e5-520a-9817-eb4d463a3cce 2.6235000000000004e+07 0.05 0.26235
14 1696982389.6288 FXSUSDT Buy 0.02 5.2495 PlusTick a4bc238f-3e6a-58a3-a012-1342563c2ced 1.0499000000000002e+07 0.02 0.10499000000000001
15 1696982389.6288 FXSUSDT Buy 6.06 5.2495 ZeroMinusTick eb27200e-c34e-537a-a0a0-4636dab66f07 3.181197e+09 6.06 31.81197
16 1696982389.6297 FXSUSDT Buy 6.04 5.2495 ZeroMinusTick c6badf81-05c5-5b35-b932-3c71941340fb 3.170698e+09 6.04 31.70698

View File

@ -0,0 +1,16 @@
trade_id/<2F><><EFBFBD>id,side/<2F><><EFBFBD>׷<EFBFBD><D7B7><EFBFBD>,size/<2F><><EFBFBD><EFBFBD>,price/<2F>۸<EFBFBD>,created_time/<2F>ɽ<EFBFBD>ʱ<EFBFBD><CAB1>
450372093,buy,0.00418025,35910.6,1700239042832
450372094,buy,0.0104,35911.9,1700239043163
450372860,buy,0.17316796,35911.9,1700239133047
450372095,buy,0.2227,35912.3,1700239043283
450372874,buy,0.63393,35913.4,1700239135563
450372876,buy,0.01751154,35913.6,1700239135563
450372096,buy,0.0478082,35913.7,1700239043339
450372877,buy,0.00030629,35913.7,1700239135563
450372878,buy,0.00030629,35913.8,1700239135563
450372880,buy,0.32111425,35913.9,1700239135563
450372881,buy,0.00027844,35914.0,1700239135563
450372882,buy,0.00058473,35914.2,1700239135563
450372032,buy,0.00132007,35914.3,1700239040621
450372883,buy,0.00058473,35914.3,1700239135563
450372884,buy,0.00052904,35914.4,1700239135563
1 trade_id/���id side/���׷��� size/���� price/�۸� created_time/�ɽ�ʱ��
2 450372093 buy 0.00418025 35910.6 1700239042832
3 450372094 buy 0.0104 35911.9 1700239043163
4 450372860 buy 0.17316796 35911.9 1700239133047
5 450372095 buy 0.2227 35912.3 1700239043283
6 450372874 buy 0.63393 35913.4 1700239135563
7 450372876 buy 0.01751154 35913.6 1700239135563
8 450372096 buy 0.0478082 35913.7 1700239043339
9 450372877 buy 0.00030629 35913.7 1700239135563
10 450372878 buy 0.00030629 35913.8 1700239135563
11 450372880 buy 0.32111425 35913.9 1700239135563
12 450372881 buy 0.00027844 35914.0 1700239135563
13 450372882 buy 0.00058473 35914.2 1700239135563
14 450372032 buy 0.00132007 35914.3 1700239040621
15 450372883 buy 0.00058473 35914.3 1700239135563
16 450372884 buy 0.00052904 35914.4 1700239135563

View File

@ -0,0 +1,244 @@
package csvsource
import (
"archive/zip"
"bytes"
"compress/gzip"
"errors"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/exchange/kucoin"
"github.com/c9s/bbgo/pkg/types"
)
func Download(
path, symbol string,
exchange types.ExchangeName,
market types.MarketType,
granularity types.MarketDataType,
since, until time.Time,
) (err error) {
for {
var (
fileName = fmt.Sprintf("%s-%s.csv", symbol, since.Format("2006-01-02"))
)
if fileExists(filepath.Join(path, fileName)) {
since = since.AddDate(0, 0, 1)
continue
}
var url, err = buildURL(exchange, symbol, market, granularity, fileName, since)
if err != nil {
log.Error(err)
break
}
log.Info("fetching ", url)
csvContent, err := readCSVFromUrl(exchange, url)
if err != nil {
log.Error(err)
break
}
err = write(csvContent, fmt.Sprintf("%s/%s", path, granularity), fileName)
if err != nil {
log.Error(err)
break
}
since = since.AddDate(0, 0, 1)
if since.After(until) {
break
}
}
return err
}
func buildURL(
exchange types.ExchangeName,
symbol string,
market types.MarketType,
granularity types.MarketDataType,
fileName string,
start time.Time,
) (url string, err error) {
switch exchange {
case types.ExchangeBybit:
// bybit doesn't seem to differentiate between spot and futures market or trade type in their csv dumps ;(
url = fmt.Sprintf("https://public.bybit.com/trading/%s/%s%s.csv.gz",
symbol,
symbol,
start.Format("2006-01-02"),
)
case types.ExchangeBinance:
marketType := "spot"
if market == types.MarketTypeFutures {
marketType = "futures/um"
}
dataType := "aggTrades"
if granularity == types.MarketDataTypeTrades {
dataType = "trades"
}
url = fmt.Sprintf("https://data.binance.vision/data/%s/daily/%s/%s/%s-%s-%s.zip",
marketType,
dataType,
symbol,
symbol,
dataType,
start.Format("2006-01-02"))
case types.ExchangeOKEx:
// todo temporary find a better solution ?!
coins := strings.Split(kucoin.ToLocalSymbol(symbol), "-")
if len(coins) == 0 {
err = fmt.Errorf("%s not supported yet for OKEx.. care to fix it? PR's welcome ;)", symbol)
return
}
baseCoin := coins[0]
quoteCoin := coins[1]
marketType := "" // for spot market
if market == types.MarketTypeFutures {
marketType = "-SWAP"
}
dataType := "aggtrades"
if granularity == types.MarketDataTypeTrades {
dataType = "trades"
}
url = fmt.Sprintf("https://static.okx.com/cdn/okex/traderecords/%s/daily/%s/%s-%s%s-%s-%s.zip",
dataType,
start.Format("20060102"),
baseCoin,
quoteCoin,
marketType,
dataType,
start.Format("2006-01-02"))
default:
err = fmt.Errorf("%s not supported yet as csv data source.. care to fix it? PR's welcome ;)", exchange.String())
}
return url, err
}
func readCSVFromUrl(exchange types.ExchangeName, url string) (csvContent []byte, err error) {
resp, err := http.Get(url)
if err != nil {
return nil, fmt.Errorf("http get error, url %s: %w", url, err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("unable to read response: %w", err)
}
switch exchange {
case types.ExchangeBybit:
csvContent, err = gunzip(body)
if err != nil {
return nil, fmt.Errorf("gunzip data %s: %w", exchange, err)
}
case types.ExchangeBinance:
csvContent, err = unzip(body)
if err != nil {
return nil, fmt.Errorf("unzip data %s: %w", exchange, err)
}
case types.ExchangeOKEx:
csvContent, err = unzip(body)
if err != nil {
return nil, fmt.Errorf("unzip data %s: %w", exchange, err)
}
}
return csvContent, nil
}
func write(content []byte, path, fileName string) error {
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return fmt.Errorf("mkdir %s: %w", path, err)
}
}
dest := filepath.Join(path, fileName)
err := os.WriteFile(dest, content, 0666)
if err != nil {
return fmt.Errorf("write %s: %w", dest, err)
}
return nil
}
func unzip(data []byte) (resData []byte, err error) {
zipReader, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
if err != nil {
log.Error(err)
}
if zipReader == nil || len(zipReader.File) == 0 {
return nil, errors.New("no data to unzip")
}
// Read all the files from zip archive
for _, zipFile := range zipReader.File {
resData, err = readZipFile(zipFile)
if err != nil {
log.Error(err)
break
}
}
return
}
func readZipFile(zf *zip.File) ([]byte, error) {
f, err := zf.Open()
if err != nil {
return nil, err
}
defer f.Close()
return io.ReadAll(f)
}
func gunzip(data []byte) (resData []byte, err error) {
b := bytes.NewBuffer(data)
var r io.Reader
r, err = gzip.NewReader(b)
if err != nil {
return
}
var resB bytes.Buffer
_, err = resB.ReadFrom(r)
if err != nil {
return
}
resData = resB.Bytes()
return
}
func fileExists(fileName string) bool {
info, err := os.Stat(fileName)
if os.IsNotExist(err) {
return false
}
return !info.IsDir()
}

View File

@ -0,0 +1,103 @@
package csvsource
import (
"fmt"
"os"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/types"
)
type DownloadTester struct {
Exchange types.ExchangeName
Reader MakeCSVTickReader
Market types.MarketType
Granularity types.MarketDataType
Symbol string
Path string
}
var (
expectedCandles = []int{1440, 48, 24}
intervals = []types.Interval{types.Interval1m, types.Interval30m, types.Interval1h}
until = time.Now().Round(0)
since = until.Add(-24 * time.Hour)
)
func Test_CSV_Download(t *testing.T) {
if _, ok := os.LookupEnv("TEST_CSV_DOWNLOADER"); !ok {
t.Skip()
}
var tests = []DownloadTester{
{
Exchange: types.ExchangeBinance,
Reader: NewBinanceCSVTickReader,
Market: types.MarketTypeSpot,
Granularity: types.MarketDataTypeAggTrades,
Symbol: "FXSUSDT",
Path: "testdata/binance/FXSUSDT",
},
{
Exchange: types.ExchangeBybit,
Reader: NewBybitCSVTickReader,
Market: types.MarketTypeFutures,
Granularity: types.MarketDataTypeAggTrades,
Symbol: "FXSUSDT",
Path: "testdata/bybit/FXSUSDT",
},
{
Exchange: types.ExchangeOKEx,
Reader: NewOKExCSVTickReader,
Market: types.MarketTypeSpot,
Granularity: types.MarketDataTypeAggTrades,
Symbol: "BTCUSDT",
Path: "testdata/okex/BTCUSDT",
},
}
for _, tt := range tests {
err := Download(
tt.Path,
tt.Symbol,
tt.Exchange,
tt.Market,
tt.Granularity,
since,
until,
)
assert.NoError(t, err)
klineMap, err := ReadTicksFromCSVWithDecoder(
tt.Path,
tt.Symbol,
intervals,
MakeCSVTickReader(tt.Reader),
)
assert.NoError(t, err)
for i, interval := range intervals {
klines := klineMap[interval]
assert.Equal(
t,
expectedCandles[i],
len(klines),
fmt.Sprintf("%s: %s/%s should have %d kLines",
tt.Exchange.String(),
tt.Symbol,
interval.String(),
expectedCandles[i],
),
)
err = WriteKLines(tt.Path, tt.Symbol, klines)
assert.NoError(t, err)
}
err = os.RemoveAll(tt.Path)
assert.NoError(t, err)
}
}

View File

@ -0,0 +1,46 @@
package csvsource
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)
type CsvTick struct {
Exchange types.ExchangeName `json:"exchange"`
Market types.MarketType `json:"market"`
TradeID uint64 `json:"tradeID"`
Symbol string `json:"symbol"`
TickDirection string `json:"tickDirection"`
Side types.SideType `json:"side"`
IsBuyerMaker bool
Size fixedpoint.Value `json:"size"`
Price fixedpoint.Value `json:"price"`
HomeNotional fixedpoint.Value `json:"homeNotional"`
ForeignNotional fixedpoint.Value `json:"foreignNotional"`
Timestamp types.MillisecondTimestamp `json:"timestamp"`
}
func (c *CsvTick) ToGlobalTrade() (*types.Trade, error) {
var isFutures bool
if c.Market == types.MarketTypeFutures {
isFutures = true
}
return &types.Trade{
ID: c.TradeID,
// OrderID: // not implemented
Exchange: c.Exchange,
Price: c.Price,
Quantity: c.Size,
QuoteQuantity: c.Price.Mul(c.Size), // todo this does not seem right use of propert.. looses info on foreign notional
Symbol: c.Symbol,
Side: c.Side,
IsBuyer: c.Side == types.SideTypeBuy,
IsMaker: c.IsBuyerMaker,
Time: types.Time(c.Timestamp),
// Fee: trade.ExecFee, // info is overwritten by stream?
// FeeCurrency: trade.FeeTokenId,
IsFutures: isFutures,
IsMargin: false,
IsIsolated: false,
}, nil
}

View File

@ -0,0 +1,77 @@
package csvsource
import (
"encoding/csv"
"fmt"
"os"
"github.com/pkg/errors"
"github.com/c9s/bbgo/pkg/types"
)
// WriteKLines writes csv to path.
func WriteKLines(path, symbol string, klines []types.KLine) (err error) {
if len(klines) == 0 {
return fmt.Errorf("no klines to write")
}
from := klines[0].StartTime.Time()
end := klines[len(klines)-1].EndTime.Time()
to := ""
if from.AddDate(0, 0, 1).After(end) {
to = "-" + end.Format("2006-01-02")
}
path = fmt.Sprintf("%s/klines/%s",
path,
klines[0].Interval.String(),
)
fileName := fmt.Sprintf("%s/%s-%s%s.csv",
path,
symbol,
from.Format("2006-01-02"),
to,
)
if _, err := os.Stat(path); errors.Is(err, os.ErrNotExist) {
err := os.MkdirAll(path, os.ModePerm)
if err != nil {
return fmt.Errorf("mkdir %s: %w", path, err)
}
}
file, err := os.Create(fileName)
if err != nil {
return errors.Wrap(err, "failed to open file")
}
defer func() {
err = file.Close()
if err != nil {
panic("failed to close file")
}
}()
w := csv.NewWriter(file)
defer w.Flush()
// Using Write
for _, kline := range klines {
row := []string{
fmt.Sprintf("%d", kline.StartTime.Unix()),
kline.Open.String(),
kline.High.String(),
kline.Low.String(),
kline.Close.String(),
kline.Volume.String(),
}
if err := w.Write(row); err != nil {
return errors.Wrap(err, "writing record to file")
}
}
if err != nil {
return err
}
return nil
}

View File

@ -115,10 +115,10 @@ func convertSubscriptions(ss []types.Subscription) ([]WebSocketCommand, error) {
switch s.Channel {
case types.BookChannel:
// see https://docs.kucoin.com/#level-2-market-data
subscribeTopic = "/market/level2" + ":" + toLocalSymbol(s.Symbol)
subscribeTopic = "/market/level2" + ":" + ToLocalSymbol(s.Symbol)
case types.KLineChannel:
subscribeTopic = "/market/candles" + ":" + toLocalSymbol(s.Symbol) + "_" + toLocalInterval(types.Interval(s.Options.Interval))
subscribeTopic = "/market/candles" + ":" + ToLocalSymbol(s.Symbol) + "_" + toLocalInterval(types.Interval(s.Options.Interval))
default:
return nil, fmt.Errorf("websocket channel %s is not supported by kucoin", s.Channel)

View File

@ -165,7 +165,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
}
req := e.client.MarketDataService.NewGetKLinesRequest()
req.Symbol(toLocalSymbol(symbol))
req.Symbol(ToLocalSymbol(symbol))
req.Interval(toLocalInterval(interval))
if options.StartTime != nil {
req.StartAt(*options.StartTime)
@ -208,7 +208,7 @@ func (e *Exchange) QueryKLines(ctx context.Context, symbol string, interval type
func (e *Exchange) SubmitOrder(ctx context.Context, order types.SubmitOrder) (createdOrder *types.Order, err error) {
req := e.client.TradeService.NewPlaceOrderRequest()
req.Symbol(toLocalSymbol(order.Symbol))
req.Symbol(ToLocalSymbol(order.Symbol))
req.Side(toLocalSide(order.Side))
if order.ClientOrderID != "" {
@ -298,7 +298,7 @@ You will not be able to query for cancelled orders that have happened more than
*/
func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders []types.Order, err error) {
req := e.client.TradeService.NewListOrdersRequest()
req.Symbol(toLocalSymbol(symbol))
req.Symbol(ToLocalSymbol(symbol))
req.Status("active")
orderList, err := req.Do(ctx)
if err != nil {
@ -316,7 +316,7 @@ func (e *Exchange) QueryOpenOrders(ctx context.Context, symbol string) (orders [
func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, until time.Time, lastOrderID uint64) (orders []types.Order, err error) {
req := e.client.TradeService.NewListOrdersRequest()
req.Symbol(toLocalSymbol(symbol))
req.Symbol(ToLocalSymbol(symbol))
req.Status("done")
req.StartAt(since)
@ -350,7 +350,7 @@ var launchDate = time.Date(2017, 9, 0, 0, 0, 0, 0, time.UTC)
func (e *Exchange) QueryTrades(ctx context.Context, symbol string, options *types.TradeQueryOptions) (trades []types.Trade, err error) {
req := e.client.TradeService.NewGetFillsRequest()
req.Symbol(toLocalSymbol(symbol))
req.Symbol(ToLocalSymbol(symbol))
// we always sync trades in the ascending order, and kucoin does not support last trade ID query
// hence we need to set the start time here
@ -422,7 +422,7 @@ func (e *Exchange) NewStream() types.Stream {
}
func (e *Exchange) QueryDepth(ctx context.Context, symbol string) (types.SliceOrderBook, int64, error) {
orderBook, err := e.client.MarketDataService.GetOrderBook(toLocalSymbol(symbol), 100)
orderBook, err := e.client.MarketDataService.GetOrderBook(ToLocalSymbol(symbol), 100)
if err != nil {
return types.SliceOrderBook{}, 0, err
}

View File

@ -17,13 +17,13 @@ import (
var packageTemplate = template.Must(template.New("").Parse(`// Code generated by go generate; DO NOT EDIT.
package kucoin
var symbolMap = map[string]string{
var SymbolMap = map[string]string{
{{- range $k, $v := . }}
{{ printf "%q" $k }}: {{ printf "%q" $v }},
{{- end }}
}
func toLocalSymbol(symbol string) string {
func ToLocalSymbol(symbol string) string {
s, ok := symbolMap[symbol]
if ok {
return s

View File

@ -1,7 +1,7 @@
// Code generated by go generate; DO NOT EDIT.
package kucoin
var symbolMap = map[string]string{
var SymbolMap = map[string]string{
"1EARTHUSDT": "1EARTH-USDT",
"1INCHUSDT": "1INCH-USDT",
"2CRZBTC": "2CRZ-BTC",
@ -1107,8 +1107,8 @@ var symbolMap = map[string]string{
"ZRXETH": "ZRX-ETH",
}
func toLocalSymbol(symbol string) string {
s, ok := symbolMap[symbol]
func ToLocalSymbol(symbol string) string {
s, ok := SymbolMap[symbol]
if ok {
return s
}

View File

@ -0,0 +1,10 @@
1651795200000,36533.70,36540.00,36501.00,36505.20,264.779,1651795259999,9670700.33840,3057,71.011,2593768.86330,0
1651795260000,36506.30,36523.10,36492.30,36522.70,180.741,1651795319999,6598288.01340,2214,70.811,2585241.60220,0
1651795320000,36522.70,36559.10,36518.90,36549.60,280.910,1651795379999,10263878.29160,2898,155.711,5689249.26850,0
1651795380000,36549.90,36550.00,36490.00,36534.40,235.291,1651795439999,8591157.31110,2690,78.925,2881502.53680,0
1651795440000,36534.40,36577.50,36534.40,36574.80,218.490,1651795499999,7988553.23400,2184,133.092,4866125.50710,0
1651795500000,36574.90,36679.30,36561.40,36611.60,1180.452,1651795559999,43233700.14416,8720,852.525,31228536.48026,0
1651795560000,36611.60,36614.60,36588.20,36612.70,252.435,1651795619999,9240546.27360,2494,104.381,3821126.58030,0
1651795620000,36612.80,36647.10,36586.10,36594.50,361.987,1651795679999,13254573.37270,3565,220.110,8060195.17170,0
1651795680000,36594.60,36598.10,36543.00,36566.60,236.064,1651795739999,8631772.05423,2650,66.766,2441168.29810,0
1651795740000,36565.90,36565.90,36525.90,36530.80,129.389,1651795799999,4728306.04240,1697,45.836,1674990.33390,0
1 1651795200000 36533.70 36540.00 36501.00 36505.20 264.779 1651795259999 9670700.33840 3057 71.011 2593768.86330 0
2 1651795260000 36506.30 36523.10 36492.30 36522.70 180.741 1651795319999 6598288.01340 2214 70.811 2585241.60220 0
3 1651795320000 36522.70 36559.10 36518.90 36549.60 280.910 1651795379999 10263878.29160 2898 155.711 5689249.26850 0
4 1651795380000 36549.90 36550.00 36490.00 36534.40 235.291 1651795439999 8591157.31110 2690 78.925 2881502.53680 0
5 1651795440000 36534.40 36577.50 36534.40 36574.80 218.490 1651795499999 7988553.23400 2184 133.092 4866125.50710 0
6 1651795500000 36574.90 36679.30 36561.40 36611.60 1180.452 1651795559999 43233700.14416 8720 852.525 31228536.48026 0
7 1651795560000 36611.60 36614.60 36588.20 36612.70 252.435 1651795619999 9240546.27360 2494 104.381 3821126.58030 0
8 1651795620000 36612.80 36647.10 36586.10 36594.50 361.987 1651795679999 13254573.37270 3565 220.110 8060195.17170 0
9 1651795680000 36594.60 36598.10 36543.00 36566.60 236.064 1651795739999 8631772.05423 2650 66.766 2441168.29810 0
10 1651795740000 36565.90 36565.90 36525.90 36530.80 129.389 1651795799999 4728306.04240 1697 45.836 1674990.33390 0

View File

@ -0,0 +1,194 @@
package indicatorv2
import (
"math"
"golang.org/x/exp/slices"
"gonum.org/v1/gonum/floats"
"gonum.org/v1/gonum/stat"
bbgofloats "github.com/c9s/bbgo/pkg/datatype/floats"
"github.com/c9s/bbgo/pkg/types"
)
// DefaultValueAreaPercentage is the percentage of the total volume used to calculate the value area.
const DefaultValueAreaPercentage = 0.68
type VolumeProfileStream struct {
*types.Float64Series
VP VolumeProfileDetails
window int
}
// VolumeProfileDetails is a histogram of market price and volume.
// Intent is to show the price points with most volume during a period.
// The profile gives key features such as:
//
// Point of control (POC)
//
// Value area high (VAH)
//
// Value area low (VAL)
//
// Session High/Low
type VolumeProfileDetails struct {
// Bins is the histogram bins.
Bins []float64
// Hist is the histogram values.
Hist []float64
// POC is the point of control.
POC float64
// VAH is the value area high.
VAH float64
// VAL is the value area low.
VAL float64
// High is the highest price in the profile.
High float64
// Low is the lowest price in the profile.
Low float64
}
// VolumeLevel is a price and volume pair used to build a volume profile.
type VolumeLevel struct {
// Price is the market price, typically the high/low average of the kline.
Price float64
// Volume is the total buy and sell volume at the price.
Volume float64
}
func VolumeProfile(source KLineSubscription, window int) *VolumeProfileStream {
prices := HLC3(source)
volumes := Volumes(source)
s := &VolumeProfileStream{
Float64Series: types.NewFloat64Series(),
window: window,
}
source.AddSubscriber(func(v types.KLine) {
if source.Length() < window {
s.PushAndEmit(0)
return
}
var nBins = 10
// nBins = int(math.Floor((prices.Slice.Max()-prices.Slice.Min())/binWidth)) + 1
s.VP.High = prices.Slice.Max()
s.VP.Low = prices.Slice.Min()
sortedPrices, sortedVolumes := buildVolumeLevel(prices.Slice, volumes.Slice)
s.VP.Bins = make([]float64, nBins)
s.VP.Bins = floats.Span(s.VP.Bins, s.VP.Low, s.VP.High+1)
s.VP.Hist = stat.Histogram(nil, s.VP.Bins, sortedPrices, sortedVolumes)
pocIdx := floats.MaxIdx(s.VP.Hist)
s.VP.POC = midBin(s.VP.Bins, pocIdx)
// TODO the results are of by small difference whereas it is expected they work the same
// vaTotalVol := volumes.Sum() * DefaultValueAreaPercentage
// Calculate Value Area with POC as the centre point\
vaTotalVol := floats.Sum(volumes.Slice) * DefaultValueAreaPercentage
vaCumVol := s.VP.Hist[pocIdx]
var vahVol, valVol float64
vahIdx, valIdx := pocIdx+1, pocIdx-1
stepVAH, stepVAL := true, true
for (vaCumVol <= vaTotalVol) &&
(vahIdx <= len(s.VP.Hist)-1 && valIdx >= 0) {
if stepVAH {
vahVol = 0
for vahVol == 0 && vahIdx+1 < len(s.VP.Hist)-1 {
vahVol = s.VP.Hist[vahIdx] + s.VP.Hist[vahIdx+1]
vahIdx += 2
}
stepVAH = false
}
if stepVAL {
valVol = 0
for valVol == 0 && valIdx-1 >= 0 {
valVol = s.VP.Hist[valIdx] + s.VP.Hist[valIdx-1]
valIdx -= 2
}
stepVAL = false
}
switch {
case vahVol > valVol:
vaCumVol += vahVol
stepVAH, stepVAL = true, false
case vahVol < valVol:
vaCumVol += valVol
stepVAH, stepVAL = false, true
case vahVol == valVol:
vaCumVol += valVol + vahVol
stepVAH, stepVAL = true, true
}
if vahIdx >= len(s.VP.Hist)-1 {
stepVAH = false
}
if valIdx <= 0 {
stepVAL = false
}
}
s.VP.VAH = midBin(s.VP.Bins, vahIdx)
s.VP.VAL = midBin(s.VP.Bins, valIdx)
})
return s
}
func (s *VolumeProfileStream) Truncate() {
s.Slice = s.Slice.Truncate(5000)
}
func buildVolumeLevel(p, v bbgofloats.Slice) (sortedp, sortedv bbgofloats.Slice) {
var levels []VolumeLevel
for i := range p {
levels = append(levels, VolumeLevel{
Price: p[i],
Volume: v[i],
})
}
slices.SortStableFunc(levels, func(i, j VolumeLevel) bool {
return i.Price < j.Price
})
for _, v := range levels {
sortedp.Append(v.Price)
sortedv.Append(v.Volume)
}
return
}
func midBin(bins []float64, idx int) float64 {
if len(bins) == 0 {
return math.NaN()
}
if idx >= len(bins)-1 {
return bins[len(bins)-1]
}
if idx < 0 {
return bins[0]
}
return stat.Mean([]float64{bins[idx], bins[idx+1]}, nil)
}

View File

@ -0,0 +1,37 @@
package indicatorv2
import (
"encoding/csv"
"os"
"path"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/datasource/csvsource"
"github.com/c9s/bbgo/pkg/types"
)
func TestVolumeProfile(t *testing.T) {
file, _ := os.Open(path.Join("testdata", "BTCUSDT-1m-2022-05-06.csv"))
defer func() {
assert.NoError(t, file.Close())
}()
candles, err := csvsource.NewCSVKLineReader(csv.NewReader(file)).ReadAll(time.Minute)
assert.NoError(t, err)
stream := &types.StandardStream{}
kLines := KLines(stream, "", "")
ind := VolumeProfile(kLines, 10)
for _, candle := range candles {
stream.EmitKLineClosed(candle)
}
assert.InDelta(t, 36512.7, ind.VP.Low, 0.01, "VP.LOW")
assert.InDelta(t, 36512.7, ind.VP.VAL, 0.01, "VP.VAL")
assert.InDelta(t, 36518.574, ind.VP.POC, 0.01, "VP.POC")
assert.InDelta(t, 36530.32, ind.VP.VAH, 0.01, "VP.VAH")
assert.InDelta(t, 36617.433, ind.VP.High, 0.01, "VP.HIGH")
}

176
pkg/service/backtest_csv.go Normal file
View File

@ -0,0 +1,176 @@
package service
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/datasource/csvsource"
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/types"
)
type BacktestServiceCSV struct {
kLines map[types.Interval][]types.KLine
path string
market types.MarketType
granularity types.MarketDataType
}
func NewBacktestServiceCSV(
path string,
market types.MarketType,
granularity types.MarketDataType,
) BackTestable {
return &BacktestServiceCSV{
kLines: make(map[types.Interval][]types.KLine),
path: path,
market: market,
granularity: granularity,
}
}
func (s *BacktestServiceCSV) Verify(
sourceExchange types.Exchange, symbols []string, startTime time.Time, endTime time.Time,
) error {
// TODO: use isFutures here
_, _, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(sourceExchange)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
err := csvsource.Download(
s.path,
isolatedSymbol,
sourceExchange.Name(),
s.market,
s.granularity,
startTime,
endTime,
)
if err != nil {
return errors.Errorf("downloading csv data: %v", err)
}
}
return nil
}
func (s *BacktestServiceCSV) Sync(
ctx context.Context, exchange types.Exchange, symbol string, intervals []types.Interval,
startTime, endTime time.Time,
) error {
log.Infof("starting fresh csv sync %s %s: %s <=> %s", exchange.Name(), symbol, startTime, endTime)
path := fmt.Sprintf("%s/%s/%s", s.path, exchange.Name().String(), symbol)
var reader csvsource.MakeCSVTickReader
switch exchange.Name() {
case types.ExchangeBinance:
reader = csvsource.NewBinanceCSVTickReader
case types.ExchangeBybit:
reader = csvsource.NewBybitCSVTickReader
case types.ExchangeOKEx:
reader = csvsource.NewOKExCSVTickReader
default:
return fmt.Errorf("%s not supported yet.. care to fix it? PR's welcome ;)", exchange.Name().String())
}
kLineMap, err := csvsource.ReadTicksFromCSVWithDecoder(
path,
symbol,
intervals,
csvsource.MakeCSVTickReader(reader),
)
if err != nil {
return errors.Errorf("reading csv data: %v", err)
}
s.kLines = kLineMap
return nil
}
// QueryKLine queries the klines from the database
func (s *BacktestServiceCSV) QueryKLine(
ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int,
) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
if _, ok := s.kLines[interval]; !ok || len(s.kLines[interval]) == 0 {
return nil, errors.New("interval not initialized")
}
return &s.kLines[interval][len(s.kLines[interval])-1], nil
}
// QueryKLinesForward is used for querying klines to back-testing
func (s *BacktestServiceCSV) QueryKLinesForward(
exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int,
) ([]types.KLine, error) {
// Sample implementation (modify as needed):
var result []types.KLine
// Access klines data based on exchange, symbol, and interval
exchangeKLines, ok := s.kLines[interval]
if !ok {
return nil, fmt.Errorf("no kLines for specified interval %s", interval.String())
}
// Filter klines based on startTime and limit
for _, kline := range exchangeKLines {
if kline.StartTime.After(startTime) {
result = append(result, kline)
if len(result) >= limit {
break
}
}
}
return result, nil
}
func (s *BacktestServiceCSV) QueryKLinesBackward(
exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int,
) ([]types.KLine, error) {
var result []types.KLine
// Access klines data based on interval
exchangeKLines, ok := s.kLines[interval]
if !ok {
return nil, fmt.Errorf("no kLines for specified interval %s", interval.String())
}
// Reverse iteration through klines and filter based on endTime and limit
for i := len(exchangeKLines) - 1; i >= 0; i-- {
kline := exchangeKLines[i]
if kline.StartTime.Before(endTime) {
result = append(result, kline)
if len(result) >= limit {
break
}
}
}
return result, nil
}
func (s *BacktestServiceCSV) QueryKLinesCh(
since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval,
) (chan types.KLine, chan error) {
if len(symbols) == 0 {
return returnError(errors.Errorf("symbols is empty when querying kline, please check your strategy setting. "))
}
ch := make(chan types.KLine, len(s.kLines))
go func() {
defer close(ch)
for _, kline := range s.kLines[intervals[0]] {
ch <- kline
}
}()
return ch, nil
}

View File

@ -18,14 +18,26 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
type BackTestable interface {
Verify(sourceExchange types.Exchange, symbols []string, startTime time.Time, endTime time.Time) error
Sync(ctx context.Context, ex types.Exchange, symbol string, intervals []types.Interval, since, until time.Time) error
QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error)
QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error)
QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error)
QueryKLinesCh(since, until time.Time, exchange types.Exchange, symbols []string, intervals []types.Interval) (chan types.KLine, chan error)
}
type BacktestService struct {
DB *sqlx.DB
}
func (s *BacktestService) SyncKLineByInterval(
ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time,
) error {
func NewBacktestService(db *sqlx.DB) *BacktestService {
return &BacktestService{DB: db}
}
func (s *BacktestService) syncKLineByInterval(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
_, isFutures, isIsolated, isolatedSymbol := exchange2.GetSessionAttributes(exchange)
log.Infof("synchronizing %s klines with interval %s: %s <=> %s", exchange.Name(), interval, startTime, endTime)
// override symbol if isolatedSymbol is not empty
if isIsolated && len(isolatedSymbol) > 0 {
@ -101,7 +113,7 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string
for interval := range types.SupportedIntervals {
log.Infof("verifying %s %s backtesting data: %s to %s...", symbol, interval, startTime, endTime)
timeRanges, err := s.FindMissingTimeRanges(context.Background(), sourceExchange, symbol, interval,
timeRanges, err := s.findMissingTimeRanges(context.Background(), sourceExchange, symbol, interval,
startTime, endTime)
if err != nil {
return err
@ -129,13 +141,11 @@ func (s *BacktestService) Verify(sourceExchange types.Exchange, symbols []string
return nil
}
func (s *BacktestService) SyncFresh(
ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time,
) error {
func (s *BacktestService) SyncFresh(ctx context.Context, exchange types.Exchange, symbol string, interval types.Interval, startTime, endTime time.Time) error {
log.Infof("starting fresh sync %s %s %s: %s <=> %s", exchange.Name(), symbol, interval, startTime, endTime)
startTime = startTime.Truncate(time.Minute).Add(-2 * time.Second)
endTime = endTime.Truncate(time.Minute).Add(2 * time.Second)
return s.SyncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
return s.syncKLineByInterval(ctx, exchange, symbol, interval, startTime, endTime)
}
// QueryKLine queries the klines from the database
@ -373,20 +383,28 @@ func (t *TimeRange) String() string {
return t.Start.String() + " ~ " + t.End.String()
}
func (s *BacktestService) Sync(
ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time,
) error {
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
func (s *BacktestService) Sync(ctx context.Context, ex types.Exchange, symbol string, intervals []types.Interval, since, until time.Time) error {
for _, interval := range intervals {
t1, t2, err := s.queryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
err := s.syncFresh(ctx, ex, symbol, interval, since, until)
if err != nil {
return err
}
} else {
err := s.syncPartial(ctx, ex, symbol, interval, since, until)
if err != nil {
return err
}
}
}
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
return s.SyncFresh(ctx, ex, symbol, interval, since, until)
}
return s.SyncPartial(ctx, ex, symbol, interval, since, until)
return nil
}
// SyncPartial
@ -394,22 +412,20 @@ func (s *BacktestService) Sync(
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(
ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time,
) error {
func (s *BacktestService) syncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
log.Infof("starting partial sync %s %s %s: %s <=> %s", ex.Name(), symbol, interval, since, until)
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
t1, t2, err := s.queryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil && err != sql.ErrNoRows {
return err
}
if err == sql.ErrNoRows || t1 == nil || t2 == nil {
// fallback to fresh sync
return s.SyncFresh(ctx, ex, symbol, interval, since, until)
return s.syncFresh(ctx, ex, symbol, interval, since, until)
}
timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
timeRanges, err := s.findMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
if err != nil {
return err
}
@ -436,7 +452,7 @@ func (s *BacktestService) SyncPartial(
}
for _, timeRange := range timeRanges {
err = s.SyncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
err = s.syncKLineByInterval(ctx, ex, symbol, interval, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
if err != nil {
return err
}
@ -447,9 +463,7 @@ func (s *BacktestService) SyncPartial(
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
func (s *BacktestService) FindMissingTimeRanges(
ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time,
) ([]TimeRange, error) {
func (s *BacktestService) findMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
query := s.SelectKLineTimePoints(ex, symbol, interval, since, until)
sql, args, err := query.ToSql()
if err != nil {
@ -492,9 +506,7 @@ func (s *BacktestService) FindMissingTimeRanges(
return timeRanges, nil
}
func (s *BacktestService) QueryExistingDataRange(
ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time,
) (start, end *types.Time, err error) {
func (s *BacktestService) queryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
sel := s.SelectKLineTimeRange(ex, symbol, interval, tArgs...)
sql, args, err := sel.ToSql()
if err != nil {

View File

@ -40,7 +40,7 @@ func TestBacktestService_FindMissingTimeRanges_EmptyData(t *testing.T) {
now := time.Now()
startTime1 := now.AddDate(0, 0, -7).Truncate(time.Hour)
endTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime1)
timeRanges, err := service.findMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime1)
assert.NoError(t, err)
assert.NotEmpty(t, timeRanges)
}
@ -70,7 +70,7 @@ func TestBacktestService_QueryExistingDataRange(t *testing.T) {
startTime1 := now.AddDate(0, 0, -7).Truncate(time.Hour)
endTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
// empty range
t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h, startTime1, endTime1)
t1, t2, err := service.queryExistingDataRange(ctx, ex, symbol, types.Interval1h, startTime1, endTime1)
assert.Error(t, sql.ErrNoRows, err)
assert.Nil(t, t1)
assert.Nil(t, t2)
@ -105,22 +105,22 @@ func TestBacktestService_SyncPartial(t *testing.T) {
endTime2 := now.AddDate(0, 0, -4).Truncate(time.Hour)
// kline query is exclusive
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
err = service.syncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
assert.NoError(t, err)
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
err = service.syncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
assert.NoError(t, err)
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
timeRanges, err := service.findMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
assert.NoError(t, err)
assert.NotEmpty(t, timeRanges)
assert.Len(t, timeRanges, 1)
t.Run("fill missing time ranges", func(t *testing.T) {
err = service.SyncPartial(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
err = service.syncPartial(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
assert.NoError(t, err, "sync partial should not return error")
timeRanges2, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
timeRanges2, err := service.findMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
assert.NoError(t, err)
assert.Empty(t, timeRanges2)
})
@ -155,19 +155,19 @@ func TestBacktestService_FindMissingTimeRanges(t *testing.T) {
endTime2 := now.AddDate(0, 0, -3).Truncate(time.Hour)
// kline query is exclusive
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
err = service.syncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
assert.NoError(t, err)
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
err = service.syncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
assert.NoError(t, err)
t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h)
t1, t2, err := service.queryExistingDataRange(ctx, ex, symbol, types.Interval1h)
if assert.NoError(t, err) {
assert.Equal(t, startTime1, t1.Time(), "start time point should match")
assert.Equal(t, endTime2, t2.Time(), "end time point should match")
}
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
timeRanges, err := service.findMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
if assert.NoError(t, err) {
assert.NotEmpty(t, timeRanges)
assert.Len(t, timeRanges, 1, "should find one missing time range")
@ -176,11 +176,11 @@ func TestBacktestService_FindMissingTimeRanges(t *testing.T) {
log.SetLevel(log.DebugLevel)
for _, timeRange := range timeRanges {
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
err = service.syncKLineByInterval(ctx, ex, symbol, types.Interval1h, timeRange.Start.Add(time.Second), timeRange.End.Add(-time.Second))
assert.NoError(t, err)
}
timeRanges, err = service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
timeRanges, err = service.findMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
assert.NoError(t, err)
assert.Empty(t, timeRanges, "after partial sync, missing time ranges should be back-filled")
}

View File

@ -66,7 +66,7 @@ func RunBacktest(t *testing.T, strategy bbgo.SingleExchangeStrategy) {
return
}
backtestService := &service.BacktestService{DB: environ.DatabaseService.DB}
backtestService := service.NewBacktestService(environ.DatabaseService.DB)
defer func() {
err := environ.DatabaseService.DB.Close()
assert.NoError(t, err)

40
pkg/types/csvsource.go Normal file
View File

@ -0,0 +1,40 @@
package types
type MarketType string
const (
MarketTypeSpot MarketType = "spot"
MarketTypeFutures MarketType = "futures"
)
type MarketDataType string
const (
MarketDataTypeTrades MarketDataType = "trades"
MarketDataTypeAggTrades MarketDataType = "aggTrades"
// TODO: could be extended to the following:
// LEVEL2 = 2
// https://data.binance.vision/data/futures/um/daily/bookTicker/ADAUSDT/ADAUSDT-bookTicker-2023-11-19.zip
// update_id best_bid_price best_bid_qty best_ask_price best_ask_qty transaction_time event_time
// 3.52214E+12 0.3772 1632 0.3773 67521 1.70035E+12 1.70035E+12
// METRICS = 3
// https://data.binance.vision/data/futures/um/daily/metrics/ADAUSDT/ADAUSDT-metrics-2023-11-19.zip
// create_time symbol sum_open_interest sum_open_interest_value count_toptrader_long_short_ratio sum_toptrader_long_short_ratio count_long_short_ratio sum_taker_long_short_vol_ratio
// 19/11/2023 00:00 ADAUSDT 141979878.00000000 53563193.89339590 2.33412322 1.21401178 2.46604727 0.55265805
// KLINES MarketDataType = 4
// https://public.bybit.com/kline_for_metatrader4/BNBUSDT/2021/BNBUSDT_15_2021-07-01_2021-07-31.csv.gz
// only few symbols but supported interval options 1m/ 5m/ 15m/ 30m/ 60m/ and only monthly
// https://data.binance.vision/data/futures/um/daily/klines/1INCHBTC/30m/1INCHBTC-30m-2023-11-18.zip
// supported interval options 1s/ 1m/ 3m/ 5m/ 15m/ 30m/ 1h/ 2h/ 4h/ 6h/ 8h/ 12h/ 1d/ daily or monthly futures
// this might be useful for backtesting against mark or index price
// especially index price can be used across exchanges
// https://data.binance.vision/data/futures/um/daily/indexPriceKlines/ADAUSDT/1h/ADAUSDT-1h-2023-11-19.zip
// https://data.binance.vision/data/futures/um/daily/markPriceKlines/ADAUSDT/1h/ADAUSDT-1h-2023-11-19.zip
// OKex or Bybit do not support direct kLine, metrics or level2 csv download
)

View File

@ -64,6 +64,61 @@ func (i Interval) Duration() time.Duration {
return time.Duration(i.Milliseconds()) * time.Millisecond
}
// Truncate determines the candle open time from a given timestamp
// eg interval 1 hour and tick at timestamp 00:58:45 will return timestamp shifted to 00:00:00
func (i Interval) Truncate(ts time.Time) (start time.Time) {
switch i {
case Interval1s:
return ts.Truncate(time.Second)
case Interval1m:
return ts.Truncate(time.Minute)
case Interval3m:
return shiftMinute(ts, 3)
case Interval5m:
return shiftMinute(ts, 5)
case Interval15m:
return shiftMinute(ts, 15)
case Interval30m:
return shiftMinute(ts, 30)
case Interval1h:
return ts.Truncate(time.Hour)
case Interval2h:
return shiftHour(ts, 2)
case Interval4h:
return shiftHour(ts, 4)
case Interval6h:
return shiftHour(ts, 6)
case Interval12h:
return shiftHour(ts, 12)
case Interval1d:
return ts.Truncate(time.Hour * 24)
case Interval3d:
return shiftDay(ts, 3)
case Interval1w:
return shiftDay(ts, 7)
case Interval2w:
return shiftDay(ts, 14)
case Interval1mo:
return time.Date(ts.Year(), ts.Month(), 0, 0, 0, 0, 0, time.UTC)
}
return start
}
func shiftDay(ts time.Time, shift int) time.Time {
day := ts.Day() - (ts.Day() % shift)
return time.Date(ts.Year(), ts.Month(), day, 0, 0, 0, 0, ts.Location())
}
func shiftHour(ts time.Time, shift int) time.Time {
hour := ts.Hour() - (ts.Hour() % shift)
return time.Date(ts.Year(), ts.Month(), ts.Day(), hour, 0, 0, 0, ts.Location())
}
func shiftMinute(ts time.Time, shift int) time.Time {
minute := ts.Minute() - (ts.Minute() % shift)
return time.Date(ts.Year(), ts.Month(), ts.Day(), ts.Hour(), minute, 0, 0, ts.Location())
}
func (i *Interval) UnmarshalJSON(b []byte) (err error) {
var a string
err = json.Unmarshal(b, &a)

View File

@ -2,10 +2,24 @@ package types
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
)
func TestTruncate(t *testing.T) {
ts := time.Date(2023, 11, 5, 17, 36, 43, 716, time.UTC)
expectedDay := time.Date(ts.Year(), ts.Month(), ts.Day(), 0, 0, 0, 0, time.UTC)
assert.Equal(t, expectedDay, Interval1d.Truncate(ts))
expected2h := time.Date(ts.Year(), ts.Month(), ts.Day(), 16, 0, 0, 0, time.UTC)
assert.Equal(t, expected2h, Interval2h.Truncate(ts))
expectedHour := time.Date(ts.Year(), ts.Month(), ts.Day(), 17, 0, 0, 0, time.UTC)
assert.Equal(t, expectedHour, Interval1h.Truncate(ts))
expected30m := time.Date(ts.Year(), ts.Month(), ts.Day(), 17, 30, 0, 0, time.UTC)
assert.Equal(t, expected30m, Interval30m.Truncate(ts))
}
func TestParseInterval(t *testing.T) {
assert.Equal(t, ParseInterval("1s"), 1)
assert.Equal(t, ParseInterval("3m"), 3*60)

View File

@ -37,6 +37,15 @@ func NewMillisecondTimestampFromInt(i int64) MillisecondTimestamp {
return MillisecondTimestamp(time.Unix(0, i*int64(time.Millisecond)))
}
func ParseMillisecondTimestamp(a string) (ts MillisecondTimestamp, err error) {
m, err := strconv.ParseInt(a, 10, 64) // startTime
if err != nil {
return ts, err
}
return NewMillisecondTimestampFromInt(m), nil
}
func MustParseMillisecondTimestamp(a string) MillisecondTimestamp {
m, err := strconv.ParseInt(a, 10, 64) // startTime
if err != nil {

View File

@ -8,7 +8,6 @@ import (
"time"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
"github.com/c9s/bbgo/pkg/datatype/floats"