fix private stream interface for backtesting

This commit is contained in:
c9s 2020-09-19 10:59:43 +08:00
parent 42a32924a7
commit 33963f52e0
8 changed files with 113 additions and 51 deletions

View File

@ -24,7 +24,7 @@ func LoadAccount(ctx context.Context, exchange *binance.Exchange) (*Account, err
}, err }, err
} }
func (a *Account) BindPrivateStream(stream *binance.PrivateStream) { func (a *Account) BindPrivateStream(stream types.PrivateStream) {
stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) { stream.OnBalanceSnapshot(func(snapshot map[string]types.Balance) {
a.mu.Lock() a.mu.Lock()
defer a.mu.Unlock() defer a.mu.Unlock()
@ -34,20 +34,6 @@ func (a *Account) BindPrivateStream(stream *binance.PrivateStream) {
} }
}) })
stream.OnOutboundAccountInfoEvent(func(e *binance.OutboundAccountInfoEvent) {
})
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
a.mu.Lock()
defer a.mu.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := a.Balances[e.Asset]; ok {
balance.Available += delta
a.Balances[e.Asset] = balance
}
})
} }
func (a *Account) Print() { func (a *Account) Print() {

View File

@ -18,11 +18,15 @@ var log = logrus.WithFields(logrus.Fields{
"exchange": "binance", "exchange": "binance",
}) })
func init() {
_ = types.Exchange(&Exchange{})
}
type Exchange struct { type Exchange struct {
Client *binance.Client Client *binance.Client
} }
func NewExchange(key, secret string) *Exchange { func New(key, secret string) *Exchange {
var client = binance.NewClient(key, secret) var client = binance.NewClient(key, secret)
return &Exchange{ return &Exchange{
Client: client, Client: client,
@ -38,9 +42,27 @@ func (e *Exchange) QueryAveragePrice(ctx context.Context, symbol string) (float6
return util.MustParseFloat(resp.Price), nil return util.MustParseFloat(resp.Price), nil
} }
func (e *Exchange) NewPrivateStream() (*PrivateStream, error) { func (e *Exchange) NewPrivateStream() (types.PrivateStream, error) {
return NewPrivateStream(e.Client)
}
func NewPrivateStream(client *binance.Client) (*PrivateStream, error) {
// binance BalanceUpdate = withdrawal or deposit changes
/*
stream.OnBalanceUpdateEvent(func(e *binance.BalanceUpdateEvent) {
a.mu.Lock()
defer a.mu.Unlock()
delta := util.MustParseFloat(e.Delta)
if balance, ok := a.Balances[e.Asset]; ok {
balance.Available += delta
a.Balances[e.Asset] = balance
}
})
*/
return &PrivateStream{ return &PrivateStream{
Client: e.Client, Client: client,
}, nil }, nil
} }
@ -200,8 +222,8 @@ func (e *Exchange) QueryAccountBalances(ctx context.Context) (map[string]types.B
return account.Balances, nil return account.Balances, nil
} }
// TradingFeeCurrency // PlatformFeeCurrency
func (e *Exchange) TradingFeeCurrency() string { func (e *Exchange) PlatformFeeCurrency() string {
return "BNB" return "BNB"
} }

View File

@ -82,18 +82,17 @@ func (s *PrivateStream) connect(ctx context.Context) error {
}) })
} }
func (s *PrivateStream) Connect(ctx context.Context, eventC chan interface{}) error { func (s *PrivateStream) Connect(ctx context.Context) error {
err := s.connect(ctx) err := s.connect(ctx)
if err != nil { if err != nil {
return err return err
} }
go s.read(ctx, eventC) go s.read(ctx)
return nil return nil
} }
func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) { func (s *PrivateStream) read(ctx context.Context) {
defer close(eventC)
pingTicker := time.NewTicker(1 * time.Minute) pingTicker := time.NewTicker(1 * time.Minute)
defer pingTicker.Stop() defer pingTicker.Stop()
@ -207,8 +206,6 @@ func (s *PrivateStream) read(ctx context.Context, eventC chan interface{}) {
s.EmitTrade(trade) s.EmitTrade(trade)
} }
} }
eventC <- e
} }
} }
} }

View File

@ -11,7 +11,20 @@ import (
"github.com/c9s/bbgo/pkg/util" "github.com/c9s/bbgo/pkg/util"
) )
type KLineRegressionTrader struct { type BackTestStream struct {
types.StandardPrivateStream
}
func (s *BackTestStream) Connect(ctx context.Context) error {
return nil
}
func (s *BackTestStream) Close() error {
return nil
}
type BackTestTrader struct {
// Context is trading Context // Context is trading Context
Context *Context Context *Context
SourceKLines []types.KLine SourceKLines []types.KLine
@ -21,11 +34,11 @@ type KLineRegressionTrader struct {
pendingOrders []*types.SubmitOrder pendingOrders []*types.SubmitOrder
} }
func (trader *KLineRegressionTrader) SubmitOrder(cxt context.Context, order *types.SubmitOrder) { func (trader *BackTestTrader) SubmitOrder(cxt context.Context, order *types.SubmitOrder) {
trader.pendingOrders = append(trader.pendingOrders, order) trader.pendingOrders = append(trader.pendingOrders, order)
} }
func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) { func (trader *BackTestTrader) RunStrategy(ctx context.Context, strategy Strategy) (chan struct{}, error) {
logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines)) logrus.Infof("[regression] number of kline data: %d", len(trader.SourceKLines))
maxExposure := 0.4 maxExposure := 0.4
@ -43,8 +56,8 @@ func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy S
return nil, err return nil, err
} }
standardStream := types.StandardPrivateStream{} stream := &BackTestStream{}
if err := strategy.OnNewStream(&standardStream); err != nil { if err := strategy.OnNewStream(stream); err != nil {
return nil, err return nil, err
} }
@ -54,7 +67,7 @@ func (trader *KLineRegressionTrader) RunStrategy(ctx context.Context, strategy S
fmt.Print(".") fmt.Print(".")
standardStream.EmitKLineClosed(kline) stream.EmitKLineClosed(kline)
for _, order := range trader.pendingOrders { for _, order := range trader.pendingOrders {
switch order.Side { switch order.Side {

View File

@ -28,7 +28,7 @@ func NewMarketDataStore() *MarketDataStore {
} }
} }
func (store *MarketDataStore) BindPrivateStream(stream *types.StandardPrivateStream) { func (store *MarketDataStore) BindPrivateStream(stream types.PrivateStream) {
stream.OnKLineClosed(store.handleKLineClosed) stream.OnKLineClosed(store.handleKLineClosed)
} }

View File

@ -21,7 +21,7 @@ import (
type Strategy interface { type Strategy interface {
Load(tradingContext *Context, trader types.Trader) error Load(tradingContext *Context, trader types.Trader) error
OnNewStream(stream *types.StandardPrivateStream) error OnNewStream(stream types.PrivateStream) error
} }
type Trader struct { type Trader struct {
@ -41,9 +41,15 @@ type Trader struct {
ProfitAndLossCalculator *accounting.ProfitAndLossCalculator ProfitAndLossCalculator *accounting.ProfitAndLossCalculator
Account *Account Account *Account
Exchanges map[string]*binance.Exchange
ExchangeAccounts map[string]*Account
ExchangeStreams map[string]types.PrivateStream
} }
func NewTrader(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader { func New(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader {
tradeService := &service.TradeService{DB: db} tradeService := &service.TradeService{DB: db}
return &Trader{ return &Trader{
Symbol: symbol, Symbol: symbol,
@ -56,8 +62,28 @@ func NewTrader(db *sqlx.DB, exchange *binance.Exchange, symbol string) *Trader {
} }
} }
func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error { func (trader *Trader) AddExchange(name string, exchange *binance.Exchange) {
trader.Exchanges[name] = exchange
}
func (trader *Trader) Connect(ctx context.Context) error {
for n, ex := range trader.Exchanges {
stream, err := ex.NewPrivateStream()
if err != nil {
return err
}
trader.ExchangeStreams[n] = stream
if err := stream.Connect(ctx) ; err != nil {
return err
}
}
return nil
}
func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error {
log.Info("syncing trades from exchange...") log.Info("syncing trades from exchange...")
if err := trader.TradeSync.Sync(ctx, trader.Symbol, startTime); err != nil { if err := trader.TradeSync.Sync(ctx, trader.Symbol, startTime); err != nil {
return err return err
@ -65,7 +91,7 @@ func (trader *Trader) Initialize(ctx context.Context, startTime time.Time) error
var err error var err error
var trades []types.Trade var trades []types.Trade
tradingFeeCurrency := trader.Exchange.TradingFeeCurrency() tradingFeeCurrency := trader.Exchange.PlatformFeeCurrency()
if strings.HasPrefix(trader.Symbol, tradingFeeCurrency) { if strings.HasPrefix(trader.Symbol, tradingFeeCurrency) {
trades, err = trader.TradeService.QueryForTradingFeeCurrency(trader.Symbol, tradingFeeCurrency) trades, err = trader.TradeService.QueryForTradingFeeCurrency(trader.Symbol, tradingFeeCurrency)
} else { } else {
@ -228,11 +254,11 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
// bind kline store to the stream // bind kline store to the stream
klineStore := NewMarketDataStore() klineStore := NewMarketDataStore()
klineStore.BindPrivateStream(&stream.StandardPrivateStream) klineStore.BindPrivateStream(stream)
trader.Account.BindPrivateStream(stream) trader.Account.BindPrivateStream(stream)
if err := strategy.OnNewStream(&stream.StandardPrivateStream); err != nil { if err := strategy.OnNewStream(stream); err != nil {
return nil, err return nil, err
} }
@ -270,8 +296,7 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
trader.Context.SetCurrentPrice(kline.Close) trader.Context.SetCurrentPrice(kline.Close)
}) })
var eventC = make(chan interface{}, 20) if err := stream.Connect(ctx); err != nil {
if err := stream.Connect(ctx, eventC); err != nil {
return nil, err return nil, err
} }
@ -281,16 +306,11 @@ func (trader *Trader) RunStrategy(ctx context.Context, strategy Strategy) (chan
defer close(done) defer close(done)
defer stream.Close() defer stream.Close()
for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
// drain the event channel
case <-eventC:
}
} }
}() }()

View File

@ -6,6 +6,12 @@ import (
) )
type Exchange interface { type Exchange interface {
PlatformFeeCurrency() string
NewPrivateStream() (PrivateStream, error)
QueryAccountBalances(ctx context.Context) (map[string]Balance, error)
QueryKLines(ctx context.Context, symbol string, interval string, options KLineQueryOptions) ([]KLine, error) QueryKLines(ctx context.Context, symbol string, interval string, options KLineQueryOptions) ([]KLine, error)
QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error) QueryTrades(ctx context.Context, symbol string, options *TradeQueryOptions) ([]Trade, error)

View File

@ -1,10 +1,19 @@
package types package types
import ( import (
"context"
"fmt" "fmt"
"strings" "strings"
) )
type PrivateStream interface {
StandardPrivateStreamEventHub
Subscribe(channel string, symbol string, options SubscribeOptions)
Connect(ctx context.Context) error
Close() error
}
//go:generate callbackgen -type StandardPrivateStream -interface //go:generate callbackgen -type StandardPrivateStream -interface
type StandardPrivateStream struct { type StandardPrivateStream struct {
Subscriptions []Subscription Subscriptions []Subscription
@ -43,7 +52,16 @@ type Subscription struct {
} }
func (s *Subscription) String() string { func (s *Subscription) String() string {
// binance uses lower case symbol name // binance uses lower case symbol name,
// for kline, it's "<symbol>@kline_<interval>"
// for depth, it's "<symbol>@depth OR <symbol>@depth@100ms"
switch s.Channel {
case "kline":
return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String()) return fmt.Sprintf("%s@%s_%s", strings.ToLower(s.Symbol), s.Channel, s.Options.String())
case "depth", "book":
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
default:
return fmt.Sprintf("%s@%s", strings.ToLower(s.Symbol), s.Channel)
}
} }