refactor exchange factory and solve the incorrect pkg import dependency from ftx

This commit is contained in:
c9s 2022-06-04 02:23:23 +08:00
parent 9b8239abba
commit 9083881442
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
11 changed files with 287 additions and 101 deletions

View File

@ -14,6 +14,7 @@ import (
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/types"
)
@ -52,7 +53,7 @@ var rootCmd = &cobra.Command{
return err
}
exchange, err := cmdutil.NewExchange(exchangeName)
exchange, err := exchange2.New(exchangeName)
if err != nil {
return err
}

View File

@ -21,7 +21,7 @@ import (
"github.com/spf13/viper"
"gopkg.in/tucnak/telebot.v2"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/interact"
"github.com/c9s/bbgo/pkg/notifier/slacknotifier"
@ -221,7 +221,7 @@ func (environ *Environment) ConfigureExchangeSessions(userConfig *Config) error
func (environ *Environment) AddExchangesByViperKeys() error {
for _, n := range types.SupportedExchanges {
if viper.IsSet(string(n) + "-api-key") {
exchange, err := cmdutil.NewExchangeWithEnvVarPrefix(n, "")
exchange, err := exchange2.NewWithEnvVarPrefix(n, "")
if err != nil {
return err
}

View File

@ -9,13 +9,13 @@ import (
"github.com/slack-go/slack"
"github.com/c9s/bbgo/pkg/cache"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/spf13/viper"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/cache"
exchange2 "github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/service"
@ -740,17 +740,17 @@ func (session *ExchangeSession) FindPossibleSymbols() (symbols []string, err err
// InitExchange initialize the exchange instance and allocate memory for fields
// In this stage, the session var could be loaded from the JSON config, so the pointer fields are still nil
// The Init method will be called after this stage, environment.Init will call the session.Init method later.
func (session *ExchangeSession) InitExchange(name string, exchange types.Exchange) error {
func (session *ExchangeSession) InitExchange(name string, ex types.Exchange) error {
var err error
var exchangeName = session.ExchangeName
if exchange == nil {
if ex == nil {
if session.PublicOnly {
exchange, err = cmdutil.NewExchangePublic(exchangeName)
ex, err = exchange2.NewPublic(exchangeName)
} else {
if session.Key != "" && session.Secret != "" {
exchange, err = cmdutil.NewExchangeStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount)
ex, err = exchange2.NewStandard(exchangeName, session.Key, session.Secret, session.Passphrase, session.SubAccount)
} else {
exchange, err = cmdutil.NewExchangeWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
ex, err = exchange2.NewWithEnvVarPrefix(exchangeName, session.EnvVarPrefix)
}
}
}
@ -761,7 +761,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
// configure exchange
if session.Margin {
marginExchange, ok := exchange.(types.MarginExchange)
marginExchange, ok := ex.(types.MarginExchange)
if !ok {
return fmt.Errorf("exchange %s does not support margin", exchangeName)
}
@ -774,7 +774,7 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
}
if session.Futures {
futuresExchange, ok := exchange.(types.FuturesExchange)
futuresExchange, ok := ex.(types.FuturesExchange)
if !ok {
return fmt.Errorf("exchange %s does not support futures", exchangeName)
}
@ -792,9 +792,9 @@ func (session *ExchangeSession) InitExchange(name string, exchange types.Exchang
SessionChannelRouter: NewPatternChannelRouter(nil),
ObjectChannelRouter: NewObjectChannelRouter(),
}
session.Exchange = exchange
session.UserDataStream = exchange.NewStream()
session.MarketDataStream = exchange.NewStream()
session.Exchange = ex
session.UserDataStream = ex.NewStream()
session.MarketDataStream = ex.NewStream()
session.MarketDataStream.SetPublicOnly()
// pointer fields

View File

@ -22,6 +22,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/cmd/cmdutil"
"github.com/c9s/bbgo/pkg/data/tsv"
"github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
@ -186,7 +187,7 @@ var BacktestCmd = &cobra.Command{
return err
}
publicExchange, err := cmdutil.NewExchangePublic(exName)
publicExchange, err := exchange.NewPublic(exName)
if err != nil {
return err
}

View File

@ -1,65 +1,2 @@
package cmdutil
import (
"fmt"
"os"
"strings"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/exchange/kucoin"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/exchange/okex"
"github.com/c9s/bbgo/pkg/types"
)
func NewExchangePublic(exchangeName types.ExchangeName) (types.Exchange, error) {
return NewExchangeStandard(exchangeName, "", "", "", "")
}
func NewExchangeStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) {
switch n {
case types.ExchangeFTX:
return ftx.NewExchange(key, secret, subAccount), nil
case types.ExchangeBinance:
return binance.New(key, secret), nil
case types.ExchangeMax:
return max.New(key, secret), nil
case types.ExchangeOKEx:
return okex.New(key, secret, passphrase), nil
case types.ExchangeKucoin:
return kucoin.New(key, secret, passphrase), nil
default:
return nil, fmt.Errorf("unsupported exchange: %v", n)
}
}
func NewExchangeWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) {
if len(varPrefix) == 0 {
varPrefix = n.String()
}
varPrefix = strings.ToUpper(varPrefix)
key := os.Getenv(varPrefix + "_API_KEY")
secret := os.Getenv(varPrefix + "_API_SECRET")
if len(key) == 0 || len(secret) == 0 {
return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix)
}
passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE")
subAccount := os.Getenv(varPrefix + "_SUBACCOUNT")
return NewExchangeStandard(n, key, secret, passphrase, subAccount)
}
// NewExchange constructor exchange object from viper config.
func NewExchange(n types.ExchangeName) (types.Exchange, error) {
return NewExchangeWithEnvVarPrefix(n, "")
}

65
pkg/exchange/factory.go Normal file
View File

@ -0,0 +1,65 @@
package exchange
import (
"fmt"
"os"
"strings"
"github.com/c9s/bbgo/pkg/exchange/binance"
"github.com/c9s/bbgo/pkg/exchange/ftx"
"github.com/c9s/bbgo/pkg/exchange/kucoin"
"github.com/c9s/bbgo/pkg/exchange/max"
"github.com/c9s/bbgo/pkg/exchange/okex"
"github.com/c9s/bbgo/pkg/types"
)
func NewPublic(exchangeName types.ExchangeName) (types.Exchange, error) {
return NewStandard(exchangeName, "", "", "", "")
}
func NewStandard(n types.ExchangeName, key, secret, passphrase, subAccount string) (types.Exchange, error) {
switch n {
case types.ExchangeFTX:
return ftx.NewExchange(key, secret, subAccount), nil
case types.ExchangeBinance:
return binance.New(key, secret), nil
case types.ExchangeMax:
return max.New(key, secret), nil
case types.ExchangeOKEx:
return okex.New(key, secret, passphrase), nil
case types.ExchangeKucoin:
return kucoin.New(key, secret, passphrase), nil
default:
return nil, fmt.Errorf("unsupported exchange: %v", n)
}
}
func NewWithEnvVarPrefix(n types.ExchangeName, varPrefix string) (types.Exchange, error) {
if len(varPrefix) == 0 {
varPrefix = n.String()
}
varPrefix = strings.ToUpper(varPrefix)
key := os.Getenv(varPrefix + "_API_KEY")
secret := os.Getenv(varPrefix + "_API_SECRET")
if len(key) == 0 || len(secret) == 0 {
return nil, fmt.Errorf("can not initialize exchange %s: empty key or secret, env var prefix: %s", n, varPrefix)
}
passphrase := os.Getenv(varPrefix + "_API_PASSPHRASE")
subAccount := os.Getenv(varPrefix + "_SUBACCOUNT")
return NewStandard(n, key, secret, passphrase, subAccount)
}
// New constructor exchange object from viper config.
func New(n types.ExchangeName) (types.Exchange, error) {
return NewWithEnvVarPrefix(n, "")
}

View File

@ -9,7 +9,7 @@ import (
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/service"
"github.com/c9s/bbgo/pkg/net/websocketbase"
"github.com/c9s/bbgo/pkg/types"
)
@ -18,7 +18,7 @@ const endpoint = "wss://ftx.com/ws/"
type Stream struct {
*types.StandardStream
ws *service.WebsocketClientBase
ws *websocketbase.WebsocketClientBase
exchange *Exchange
key string
@ -42,7 +42,7 @@ func NewStream(key, secret string, subAccount string, e *Exchange) *Stream {
secret: secret,
subAccount: subAccount,
StandardStream: &types.StandardStream{},
ws: service.NewWebsocketClientBase(endpoint, 3*time.Second),
ws: websocketbase.NewWebsocketClientBase(endpoint, 3*time.Second),
}
s.ws.OnMessage((&messageHandler{StandardStream: s.StandardStream}).handleMessage)

View File

@ -1,4 +1,4 @@
package service
package websocketbase
import (
"context"
@ -8,6 +8,8 @@ import (
"github.com/gorilla/websocket"
)
// WebsocketClientBase is a legacy base client
// Deprecated: please use standard stream instead.
//go:generate callbackgen -type WebsocketClientBase
type WebsocketClientBase struct {
baseURL string

View File

@ -1,6 +1,6 @@
// Code generated by "callbackgen -type WebsocketClientBase"; DO NOT EDIT.
package service
package websocketbase
import (
"github.com/gorilla/websocket"

View File

@ -33,7 +33,8 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
tasks := []SyncTask{
{
Type: types.KLine{},
Type: types.KLine{},
Select: SelectLastKLines(exchange.Name(), symbol, interval, startTime, 100),
Time: func(obj interface{}) time.Time {
return obj.(types.KLine).StartTime.Time().UTC()
},
@ -41,7 +42,6 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
kline := obj.(types.KLine)
return kline.Symbol + kline.Interval.String() + strconv.FormatInt(kline.StartTime.UnixMilli(), 10)
},
Select: SelectLastKLines(exchange.Name(), symbol, interval, 100),
BatchQuery: func(ctx context.Context, startTime, endTime time.Time) (interface{}, chan error) {
q := &batch.KLineBatchQuery{Exchange: exchange}
return q.Query(ctx, symbol, interval, startTime, endTime)
@ -54,13 +54,12 @@ func (s *BacktestService) SyncKLineByInterval(ctx context.Context, exchange type
}
for _, sel := range tasks {
if err := sel.execute(ctx, s.DB, startTime); err != nil {
if err := sel.execute(ctx, s.DB, startTime, endTime); err != nil {
return err
}
}
return nil
}
func (s *BacktestService) Verify(symbols []string, startTime time.Time, endTime time.Time, sourceExchange types.Exchange, verboseCnt int) error {
@ -129,12 +128,11 @@ func (s *BacktestService) QueryFirstKLine(ex types.ExchangeName, symbol string,
func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, interval types.Interval, orderBy string, limit int) (*types.KLine, error) {
log.Infof("querying last kline exchange = %s AND symbol = %s AND interval = %s", ex, symbol, interval)
tableName := s._targetKlineTable(ex)
tableName := targetKlineTable(ex)
// make the SQL syntax IDE friendly, so that it can analyze it.
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
sql := fmt.Sprintf("SELECT * FROM `%s` WHERE `symbol` = :symbol AND `interval` = :interval ORDER BY end_time "+orderBy+" LIMIT "+strconv.Itoa(limit), tableName)
rows, err := s.DB.NamedQuery(sql, map[string]interface{}{
"exchange": ex.String(),
"interval": interval,
"symbol": symbol,
})
@ -160,7 +158,7 @@ func (s *BacktestService) QueryKLine(ex types.ExchangeName, symbol string, inter
// QueryKLinesForward is used for querying klines to back-testing
func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit int) ([]types.KLine, error) {
tableName := s._targetKlineTable(exchange)
tableName := targetKlineTable(exchange)
sql := "SELECT * FROM `binance_klines` WHERE `end_time` >= :start_time AND `symbol` = :symbol AND `interval` = :interval and exchange = :exchange ORDER BY end_time ASC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
@ -179,7 +177,7 @@ func (s *BacktestService) QueryKLinesForward(exchange types.ExchangeName, symbol
}
func (s *BacktestService) QueryKLinesBackward(exchange types.ExchangeName, symbol string, interval types.Interval, endTime time.Time, limit int) ([]types.KLine, error) {
tableName := s._targetKlineTable(exchange)
tableName := targetKlineTable(exchange)
sql := "SELECT * FROM `binance_klines` WHERE `end_time` <= :end_time and exchange = :exchange AND `symbol` = :symbol AND `interval` = :interval ORDER BY end_time DESC LIMIT :limit"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
@ -205,7 +203,7 @@ func (s *BacktestService) QueryKLinesCh(since, until time.Time, exchange types.E
return returnError(errors.Errorf("symbols is empty when querying kline, plesae check your strategy setting. "))
}
tableName := s._targetKlineTable(exchange.Name())
tableName := targetKlineTable(exchange.Name())
sql := "SELECT * FROM `binance_klines` WHERE `end_time` BETWEEN :since AND :until AND `symbol` IN (:symbols) AND `interval` IN (:intervals) and exchange = :exchange ORDER BY end_time ASC"
sql = strings.ReplaceAll(sql, "binance_klines", tableName)
@ -288,7 +286,7 @@ func (s *BacktestService) scanRows(rows *sqlx.Rows) (klines []types.KLine, err e
return klines, rows.Err()
}
func (s *BacktestService) _targetKlineTable(exchangeName types.ExchangeName) string {
func targetKlineTable(exchangeName types.ExchangeName) string {
return strings.ToLower(exchangeName.String()) + "_klines"
}
@ -299,7 +297,7 @@ func (s *BacktestService) Insert(kline types.KLine) error {
return errExchangeFieldIsUnset
}
tableName := s._targetKlineTable(kline.Exchange)
tableName := targetKlineTable(kline.Exchange)
sql := fmt.Sprintf("INSERT INTO `%s` (`exchange`, `start_time`, `end_time`, `symbol`, `interval`, `open`, `high`, `low`, `close`, `closed`, `volume`, `quote_volume`, `taker_buy_base_volume`, `taker_buy_quote_volume`)"+
"VALUES (:exchange, :start_time, :end_time, :symbol, :interval, :open, :high, :low, :close, :closed, :volume, :quote_volume, :taker_buy_base_volume, :taker_buy_quote_volume)", tableName)
@ -313,7 +311,7 @@ func (s *BacktestService) _deleteDuplicatedKLine(k types.KLine) error {
return errors.New("kline.Exchange field should not be empty")
}
tableName := s._targetKlineTable(k.Exchange)
tableName := targetKlineTable(k.Exchange)
sql := fmt.Sprintf("DELETE FROM `%s` WHERE gid = :gid ", tableName)
_, err := s.DB.NamedExec(sql, k)
return err
@ -346,14 +344,138 @@ func (s *BacktestService) SyncExist(ctx context.Context, exchange types.Exchange
return nil
}
type TimeRange struct {
Start time.Time
End time.Time
}
// SyncPartial
// find the existing data time range (t1, t2)
// scan if there is a missing part
// create a time range slice []TimeRange
// iterate the []TimeRange slice to sync data.
func (s *BacktestService) SyncPartial(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) error {
t1, t2, err := s.QueryExistingDataRange(ctx, ex, symbol, interval, since, until)
if err != nil {
return err
}
timeRanges, err := s.FindMissingTimeRanges(ctx, ex, symbol, interval, t1.Time(), t2.Time())
if err != nil {
return err
}
_ = timeRanges
return nil
}
// FindMissingTimeRanges returns the missing time ranges, the start/end time represents the existing data time points.
// So when sending kline query to the exchange API, we need to add one second to the start time and minus one second to the end time.
func (s *BacktestService) FindMissingTimeRanges(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, since, until time.Time) ([]TimeRange, error) {
query := SelectKLineTimePoints(ex.Name(), symbol, interval, since, until)
sql, args, err := query.ToSql()
if err != nil {
return nil, err
}
rows, err := s.DB.QueryContext(ctx, sql, args...)
if err != nil {
return nil, err
}
var timeRanges []TimeRange
var timePoints = make(map[int64]struct{}, 1000) // we can use this to find duplicates
var lastTime time.Time
for rows.Next() {
var tt types.Time
if err := rows.Scan(&tt); err != nil {
return nil, err
}
var t = time.Time(tt)
if lastTime != (time.Time{}) && t.Sub(lastTime) > interval.Duration() {
timeRanges = append(timeRanges, TimeRange{
Start: lastTime.Add(interval.Duration()),
End: t,
})
}
lastTime = t
timePoints[t.Unix()] = struct{}{}
}
return timeRanges, nil
}
func (s *BacktestService) QueryExistingDataRange(ctx context.Context, ex types.Exchange, symbol string, interval types.Interval, tArgs ...time.Time) (start, end *types.Time, err error) {
sel := SelectKLineTimeRange(ex.Name(), symbol, interval, tArgs...)
sql, args, err := sel.ToSql()
if err != nil {
return nil, nil, err
}
var t1, t2 types.Time
row := s.DB.QueryRowContext(ctx, sql, args...)
if err := row.Scan(&t1, &t2); err != nil {
return nil, nil, err
}
if err := row.Err(); err != nil {
return nil, nil, err
}
return &t1, &t2, nil
}
func SelectKLineTimePoints(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
}
if len(args) == 2 {
since := args[0]
until := args[1]
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
}
tableName := targetKlineTable(ex)
return sq.Select("start_time").
From(tableName).
Where(conditions).
OrderBy("start_time ASC")
}
// SelectKLineTimeRange returns the existing klines time range (since < kline.start_time < until)
func SelectKLineTimeRange(ex types.ExchangeName, symbol string, interval types.Interval, args ...time.Time) sq.SelectBuilder {
conditions := sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"`interval`": interval.String()},
}
if len(args) == 2 {
since := args[0]
until := args[1]
conditions = append(conditions, sq.Expr("`start_time` BETWEEN ? AND ?", since, until))
}
tableName := targetKlineTable(ex)
return sq.Select("MIN(start_time) AS t1, MAX(start_time) AS t2").
From(tableName).
Where(conditions)
}
// TODO: add is_futures column since the klines data is different
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, limit uint64) sq.SelectBuilder {
func SelectLastKLines(ex types.ExchangeName, symbol string, interval types.Interval, startTime time.Time, limit uint64) sq.SelectBuilder {
tableName := targetKlineTable(ex)
return sq.Select("*").
From(strings.ToLower(ex.String()) + "_klines").
From(tableName).
Where(sq.And{
sq.Eq{"symbol": symbol},
sq.Eq{"exchange": ex},
sq.Eq{"`interval`": interval.String()},
sq.GtOrEq{"`start_time`": startTime},
}).
OrderBy("start_time DESC").
Limit(limit)

View File

@ -0,0 +1,58 @@
package service
import (
"context"
"testing"
"time"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/c9s/bbgo/pkg/exchange"
"github.com/c9s/bbgo/pkg/types"
)
func TestBacktestService(t *testing.T) {
db, err := prepareDB(t)
if err != nil {
t.Fatal(err)
}
defer db.Close()
ctx := context.Background()
dbx := sqlx.NewDb(db.DB, "sqlite3")
ex, err := exchange.NewPublic(types.ExchangeBinance)
assert.NoError(t, err)
service := &BacktestService{DB: dbx}
symbol := "BTCUSDT"
now := time.Now()
startTime1 := now.AddDate(0, 0, -6).Truncate(time.Hour)
endTime1 := now.AddDate(0, 0, -5).Truncate(time.Hour)
startTime2 := now.AddDate(0, 0, -4).Truncate(time.Hour)
endTime2 := now.AddDate(0, 0, -3).Truncate(time.Hour)
// kline query is exclusive
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime1.Add(-time.Second), endTime1.Add(time.Second))
assert.NoError(t, err)
err = service.SyncKLineByInterval(ctx, ex, symbol, types.Interval1h, startTime2.Add(-time.Second), endTime2.Add(time.Second))
assert.NoError(t, err)
t1, t2, err := service.QueryExistingDataRange(ctx, ex, symbol, types.Interval1h)
if assert.NoError(t, err) {
assert.Equal(t, startTime1, t1.Time(), "start time point should match")
assert.Equal(t, endTime2, t2.Time(), "end time point should match")
}
timeRanges, err := service.FindMissingTimeRanges(ctx, ex, symbol, types.Interval1h, startTime1, endTime2)
if assert.NoError(t, err) {
assert.NotEmpty(t, timeRanges)
assert.Len(t, timeRanges, 1, "should find one missing time range")
t.Logf("found timeRanges: %+v", timeRanges)
}
}