Merge pull request #721 from zenixls2/feature/heikinashi_session

feature: add heikinashi support
This commit is contained in:
Zenix 2022-06-17 12:24:02 +09:00 committed by GitHub
commit d33b12ae81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 177 additions and 65 deletions

View File

@ -4,6 +4,7 @@ sessions:
exchange: binance
futures: true
envVarPrefix: binance
heikinAshi: false
exchangeStrategies:

View File

@ -2,6 +2,7 @@
sessions:
binance:
exchange: binance
heikinAshi: true
envVarPrefix: binance
exchangeStrategies:
@ -11,10 +12,11 @@ exchangeStrategies:
symbol: BNBBUSD
backtest:
startTime: "2022-01-02"
endTime: "2022-01-19"
startTime: "2022-06-14"
endTime: "2022-06-15"
symbols:
- BNBBUSD
sessions: [binance]
account:
binance:
balances:

View File

@ -33,6 +33,8 @@ import (
"sync"
"time"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/cache"
"github.com/pkg/errors"
@ -42,6 +44,8 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
var log = logrus.WithField("cmd", "backtest")
var ErrUnimplemented = errors.New("unimplemented method")
type Exchange struct {
@ -53,7 +57,7 @@ type Exchange struct {
account *types.Account
config *bbgo.Backtest
userDataStream, marketDataStream *Stream
UserDataStream, MarketDataStream types.StandardStreamEmitter
trades map[string][]types.Trade
tradesMutex sync.Mutex
@ -147,12 +151,14 @@ func (e *Exchange) _addMatchingBook(symbol string, market types.Market) {
}
func (e *Exchange) NewStream() types.Stream {
return &Stream{exchange: e}
return &types.BacktestStream{
StandardStreamEmitter: &types.StandardStream{},
}
}
func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) {
if e.userDataStream == nil {
return createdOrders, fmt.Errorf("SubmitOrders should be called after userDataStream been initialized")
if e.UserDataStream == nil {
return createdOrders, fmt.Errorf("SubmitOrders should be called after UserDataStream been initialized")
}
for _, order := range orders {
symbol := order.Symbol
@ -175,7 +181,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder
e.addClosedOrder(*createdOrder)
}
e.userDataStream.EmitOrderUpdate(*createdOrder)
e.UserDataStream.EmitOrderUpdate(*createdOrder)
}
}
@ -201,8 +207,8 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since,
}
func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error {
if e.userDataStream == nil {
return fmt.Errorf("CancelOrders should be called after userDataStream been initialized")
if e.UserDataStream == nil {
return fmt.Errorf("CancelOrders should be called after UserDataStream been initialized")
}
for _, order := range orders {
matching, ok := e.matchingBook(order.Symbol)
@ -214,7 +220,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro
return err
}
e.userDataStream.EmitOrderUpdate(canceledOrder)
e.UserDataStream.EmitOrderUpdate(canceledOrder)
}
return nil
@ -297,15 +303,15 @@ func (e *Exchange) matchingBook(symbol string) (*SimplePriceMatching, bool) {
}
func (e *Exchange) InitMarketData() {
e.userDataStream.OnTradeUpdate(func(trade types.Trade) {
e.UserDataStream.OnTradeUpdate(func(trade types.Trade) {
e.addTrade(trade)
})
e.matchingBooksMutex.Lock()
for _, matching := range e.matchingBooks {
matching.OnTradeUpdate(e.userDataStream.EmitTradeUpdate)
matching.OnOrderUpdate(e.userDataStream.EmitOrderUpdate)
matching.OnBalanceUpdate(e.userDataStream.EmitBalanceUpdate)
matching.OnTradeUpdate(e.UserDataStream.EmitTradeUpdate)
matching.OnOrderUpdate(e.UserDataStream.EmitOrderUpdate)
matching.OnBalanceUpdate(e.UserDataStream.EmitBalanceUpdate)
}
e.matchingBooksMutex.Unlock()
}
@ -324,7 +330,7 @@ func (e *Exchange) SubscribeMarketData(extraIntervals ...types.Interval) (chan t
}
// collect subscriptions
for _, sub := range e.marketDataStream.Subscriptions {
for _, sub := range e.MarketDataStream.GetSubscriptions() {
loadedSymbols[sub.Symbol] = struct{}{}
switch sub.Channel {
@ -370,11 +376,11 @@ func (e *Exchange) ConsumeKLine(k types.KLine) {
matching.processKLine(k)
}
e.marketDataStream.EmitKLineClosed(k)
e.MarketDataStream.EmitKLineClosed(k)
}
func (e *Exchange) CloseMarketData() error {
if err := e.marketDataStream.Close(); err != nil {
if err := e.MarketDataStream.Close(); err != nil {
log.WithError(err).Error("stream close error")
return err
}

View File

@ -1,41 +0,0 @@
package backtest
import (
"context"
"github.com/sirupsen/logrus"
"github.com/c9s/bbgo/pkg/types"
)
var log = logrus.WithField("cmd", "backtest")
type Stream struct {
types.StandardStream
exchange *Exchange
}
func (s *Stream) Connect(ctx context.Context) error {
if s.PublicOnly {
if s.exchange.marketDataStream != nil {
panic("you should not set up more than 1 market data stream in back-test")
}
s.exchange.marketDataStream = s
} else {
// assign user data stream back
if s.exchange.userDataStream != nil {
panic("you should not set up more than 1 user data stream in back-test")
}
s.exchange.userDataStream = s
}
s.EmitConnect()
s.EmitStart()
return nil
}
func (s *Stream) Close() error {
return nil
}

View File

@ -213,6 +213,8 @@ type ExchangeSession struct {
Exchange types.Exchange `json:"-" yaml:"-"`
UseHeikinAshi bool `json:"heikinAshi,omitempty" yaml:"heikinAshi,omitempty"`
// Trades collects the executed trades from the exchange
// map: symbol -> []trade
Trades map[string]*types.TradeSlice `json:"-" yaml:"-"`
@ -346,6 +348,12 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
}
}
if session.UseHeikinAshi {
session.MarketDataStream = &types.HeikinAshiStream{
StandardStreamEmitter: session.MarketDataStream.(types.StandardStreamEmitter),
}
}
// query and initialize the balances
if !session.PublicOnly {
account, err := session.Exchange.QueryAccount(ctx)
@ -400,13 +408,23 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment)
}
// update last prices
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if _, ok := session.startPrices[kline.Symbol]; !ok {
session.startPrices[kline.Symbol] = kline.Open
}
if session.UseHeikinAshi {
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if _, ok := session.startPrices[kline.Symbol]; !ok {
session.startPrices[kline.Symbol] = kline.Open
}
session.lastPrices[kline.Symbol] = kline.Close
})
session.lastPrices[kline.Symbol] = session.MarketDataStream.(*types.HeikinAshiStream).LastOrigin[kline.Symbol][kline.Interval].Close
})
} else {
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if _, ok := session.startPrices[kline.Symbol]; !ok {
session.startPrices[kline.Symbol] = kline.Open
}
session.lastPrices[kline.Symbol] = kline.Close
})
}
session.MarketDataStream.OnMarketTrade(func(trade types.Trade) {
session.lastPrices[trade.Symbol] = trade.Price

View File

@ -255,7 +255,11 @@ var BacktestCmd = &cobra.Command{
if err != nil {
return errors.Wrap(err, "failed to create backtest exchange")
}
environ.AddExchange(name.String(), backtestExchange)
session := environ.AddExchange(name.String(), backtestExchange)
exchangeFromConfig := userConfig.Sessions[name.String()]
if exchangeFromConfig != nil {
session.UseHeikinAshi = exchangeFromConfig.UseHeikinAshi
}
}
if err := environ.Init(ctx); err != nil {
@ -640,6 +644,8 @@ func confirmation(s string) bool {
func toExchangeSources(sessions map[string]*bbgo.ExchangeSession, extraIntervals ...types.Interval) (exchangeSources []backtest.ExchangeDataSource, err error) {
for _, session := range sessions {
exchange := session.Exchange.(*backtest.Exchange)
exchange.UserDataStream = session.UserDataStream.(types.StandardStreamEmitter)
exchange.MarketDataStream = session.MarketDataStream.(types.StandardStreamEmitter)
exchange.InitMarketData()
c, err := exchange.SubscribeMarketData(extraIntervals...)

View File

@ -0,0 +1,19 @@
package types
import (
"context"
)
type BacktestStream struct {
StandardStreamEmitter
}
func (s *BacktestStream) Connect(ctx context.Context) error {
s.EmitConnect()
s.EmitStart()
return nil
}
func (s *BacktestStream) Close() error {
return nil
}

View File

@ -0,0 +1,72 @@
package types
import (
"github.com/c9s/bbgo/pkg/fixedpoint"
)
var Four fixedpoint.Value = fixedpoint.NewFromInt(4)
type HeikinAshiStream struct {
StandardStreamEmitter
lastAshi map[string]map[Interval]*KLine
LastOrigin map[string]map[Interval]*KLine
}
func (s *HeikinAshiStream) EmitKLineClosed(kline KLine) {
ashi := kline
if s.lastAshi == nil {
s.lastAshi = make(map[string]map[Interval]*KLine)
s.LastOrigin = make(map[string]map[Interval]*KLine)
}
if s.lastAshi[kline.Symbol] == nil {
s.lastAshi[kline.Symbol] = make(map[Interval]*KLine)
s.LastOrigin[kline.Symbol] = make(map[Interval]*KLine)
}
lastAshi := s.lastAshi[kline.Symbol][kline.Interval]
if lastAshi == nil {
ashi.Close = kline.Close.Add(kline.High).
Add(kline.Low).
Add(kline.Open).
Div(Four)
// High and Low are the same
s.lastAshi[kline.Symbol][kline.Interval] = &ashi
s.LastOrigin[kline.Symbol][kline.Interval] = &kline
} else {
ashi.Close = kline.Close.Add(kline.High).
Add(kline.Low).
Add(kline.Open).
Div(Four)
ashi.Open = lastAshi.Open.Add(lastAshi.Close).Div(Two)
// High and Low are the same
s.lastAshi[kline.Symbol][kline.Interval] = &ashi
s.LastOrigin[kline.Symbol][kline.Interval] = &kline
}
s.StandardStreamEmitter.EmitKLineClosed(ashi)
}
// No writeback to lastAshi
func (s *HeikinAshiStream) EmitKLine(kline KLine) {
ashi := kline
if s.lastAshi == nil {
s.lastAshi = make(map[string]map[Interval]*KLine)
}
if s.lastAshi[kline.Symbol] == nil {
s.lastAshi[kline.Symbol] = make(map[Interval]*KLine)
}
lastAshi := s.lastAshi[kline.Symbol][kline.Interval]
if lastAshi == nil {
ashi.Close = kline.Close.Add(kline.High).
Add(kline.Low).
Add(kline.Open).
Div(Four)
} else {
ashi.Close = kline.Close.Add(kline.High).
Add(kline.Low).
Add(kline.Open).
Div(Four)
ashi.Open = lastAshi.Open.Add(lastAshi.Close).Div(Two)
}
s.StandardStreamEmitter.EmitKLine(ashi)
}
var _ StandardStreamEmitter = &HeikinAshiStream{}

View File

@ -28,7 +28,9 @@ type Stream interface {
StandardStreamEventHub
Subscribe(channel Channel, symbol string, options SubscribeOptions)
GetSubscriptions() []Subscription
SetPublicOnly()
GetPublicOnly() bool
Connect(ctx context.Context) error
Close() error
}
@ -104,6 +106,25 @@ type StandardStream struct {
FuturesPositionSnapshotCallbacks []func(futuresPositions FuturesPositionMap)
}
type StandardStreamEmitter interface {
Stream
EmitStart()
EmitConnect()
EmitDisconnect()
EmitTradeUpdate(Trade)
EmitOrderUpdate(Order)
EmitBalanceSnapshot(BalanceMap)
EmitBalanceUpdate(BalanceMap)
EmitKLineClosed(KLine)
EmitKLine(KLine)
EmitBookUpdate(SliceOrderBook)
EmitBookTickerUpdate(BookTicker)
EmitBookSnapshot(SliceOrderBook)
EmitMarketTrade(Trade)
EmitFuturesPositionUpdate(FuturesPositionMap)
EmitFuturesPositionSnapshot(FuturesPositionMap)
}
func NewStandardStream() StandardStream {
return StandardStream{
ReconnectC: make(chan struct{}, 1),
@ -115,6 +136,10 @@ func (s *StandardStream) SetPublicOnly() {
s.PublicOnly = true
}
func (s *StandardStream) GetPublicOnly() bool {
return s.PublicOnly
}
func (s *StandardStream) SetEndpointCreator(creator EndpointCreator) {
s.endpointCreator = creator
}
@ -254,6 +279,10 @@ func (s *StandardStream) ping(ctx context.Context, conn *websocket.Conn, cancel
}
}
func (s *StandardStream) GetSubscriptions() []Subscription {
return s.Subscriptions
}
func (s *StandardStream) Subscribe(channel Channel, symbol string, options SubscribeOptions) {
s.Subscriptions = append(s.Subscriptions, Subscription{
Channel: channel,