From f5007752b2ff07774380aab299a409b382181710 Mon Sep 17 00:00:00 2001 From: zenix Date: Wed, 15 Jun 2022 16:32:04 +0900 Subject: [PATCH] feature: add heikinashi support --- config/ewo_dgtrd.yaml | 1 + config/skeleton.yaml | 6 ++- pkg/backtest/exchange.go | 36 ++++++++++------- pkg/backtest/stream.go | 41 ------------------- pkg/bbgo/session.go | 30 +++++++++++--- pkg/cmd/backtest.go | 8 +++- pkg/types/backtest_stream.go | 19 +++++++++ pkg/types/heikinashi_stream.go | 72 ++++++++++++++++++++++++++++++++++ pkg/types/stream.go | 29 ++++++++++++++ 9 files changed, 177 insertions(+), 65 deletions(-) delete mode 100644 pkg/backtest/stream.go create mode 100644 pkg/types/backtest_stream.go create mode 100644 pkg/types/heikinashi_stream.go diff --git a/config/ewo_dgtrd.yaml b/config/ewo_dgtrd.yaml index 9a447e802..5dcef4272 100644 --- a/config/ewo_dgtrd.yaml +++ b/config/ewo_dgtrd.yaml @@ -4,6 +4,7 @@ sessions: exchange: binance futures: true envVarPrefix: binance + useHeikinAshi: false exchangeStrategies: diff --git a/config/skeleton.yaml b/config/skeleton.yaml index f0db31245..676c2f326 100644 --- a/config/skeleton.yaml +++ b/config/skeleton.yaml @@ -2,6 +2,7 @@ sessions: binance: exchange: binance + useHeikinAshi: true envVarPrefix: binance exchangeStrategies: @@ -11,10 +12,11 @@ exchangeStrategies: symbol: BNBBUSD backtest: - startTime: "2022-01-02" - endTime: "2022-01-19" + startTime: "2022-06-14" + endTime: "2022-06-15" symbols: - BNBBUSD + sessions: [binance] account: binance: balances: diff --git a/pkg/backtest/exchange.go b/pkg/backtest/exchange.go index c034b44bc..3badc55e2 100644 --- a/pkg/backtest/exchange.go +++ b/pkg/backtest/exchange.go @@ -33,6 +33,8 @@ import ( "sync" "time" + "github.com/sirupsen/logrus" + "github.com/c9s/bbgo/pkg/cache" "github.com/pkg/errors" @@ -42,6 +44,8 @@ import ( "github.com/c9s/bbgo/pkg/types" ) +var log = logrus.WithField("cmd", "backtest") + var ErrUnimplemented = errors.New("unimplemented method") type Exchange struct { @@ -53,7 +57,7 @@ type Exchange struct { account *types.Account config *bbgo.Backtest - userDataStream, marketDataStream *Stream + UserDataStream, MarketDataStream types.StandardStreamEmitter trades map[string][]types.Trade tradesMutex sync.Mutex @@ -147,12 +151,14 @@ func (e *Exchange) _addMatchingBook(symbol string, market types.Market) { } func (e *Exchange) NewStream() types.Stream { - return &Stream{exchange: e} + return &types.BacktestStream{ + StandardStreamEmitter: &types.StandardStream{}, + } } func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error) { - if e.userDataStream == nil { - return createdOrders, fmt.Errorf("SubmitOrders should be called after userDataStream been initialized") + if e.UserDataStream == nil { + return createdOrders, fmt.Errorf("SubmitOrders should be called after UserDataStream been initialized") } for _, order := range orders { symbol := order.Symbol @@ -175,7 +181,7 @@ func (e *Exchange) SubmitOrders(ctx context.Context, orders ...types.SubmitOrder e.addClosedOrder(*createdOrder) } - e.userDataStream.EmitOrderUpdate(*createdOrder) + e.UserDataStream.EmitOrderUpdate(*createdOrder) } } @@ -201,8 +207,8 @@ func (e *Exchange) QueryClosedOrders(ctx context.Context, symbol string, since, } func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) error { - if e.userDataStream == nil { - return fmt.Errorf("CancelOrders should be called after userDataStream been initialized") + if e.UserDataStream == nil { + return fmt.Errorf("CancelOrders should be called after UserDataStream been initialized") } for _, order := range orders { matching, ok := e.matchingBook(order.Symbol) @@ -214,7 +220,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro return err } - e.userDataStream.EmitOrderUpdate(canceledOrder) + e.UserDataStream.EmitOrderUpdate(canceledOrder) } return nil @@ -297,15 +303,15 @@ func (e *Exchange) matchingBook(symbol string) (*SimplePriceMatching, bool) { } func (e *Exchange) InitMarketData() { - e.userDataStream.OnTradeUpdate(func(trade types.Trade) { + e.UserDataStream.OnTradeUpdate(func(trade types.Trade) { e.addTrade(trade) }) e.matchingBooksMutex.Lock() for _, matching := range e.matchingBooks { - matching.OnTradeUpdate(e.userDataStream.EmitTradeUpdate) - matching.OnOrderUpdate(e.userDataStream.EmitOrderUpdate) - matching.OnBalanceUpdate(e.userDataStream.EmitBalanceUpdate) + matching.OnTradeUpdate(e.UserDataStream.EmitTradeUpdate) + matching.OnOrderUpdate(e.UserDataStream.EmitOrderUpdate) + matching.OnBalanceUpdate(e.UserDataStream.EmitBalanceUpdate) } e.matchingBooksMutex.Unlock() } @@ -324,7 +330,7 @@ func (e *Exchange) SubscribeMarketData(extraIntervals ...types.Interval) (chan t } // collect subscriptions - for _, sub := range e.marketDataStream.Subscriptions { + for _, sub := range e.MarketDataStream.GetSubscriptions() { loadedSymbols[sub.Symbol] = struct{}{} switch sub.Channel { @@ -370,11 +376,11 @@ func (e *Exchange) ConsumeKLine(k types.KLine) { matching.processKLine(k) } - e.marketDataStream.EmitKLineClosed(k) + e.MarketDataStream.EmitKLineClosed(k) } func (e *Exchange) CloseMarketData() error { - if err := e.marketDataStream.Close(); err != nil { + if err := e.MarketDataStream.Close(); err != nil { log.WithError(err).Error("stream close error") return err } diff --git a/pkg/backtest/stream.go b/pkg/backtest/stream.go deleted file mode 100644 index 91e93a7b9..000000000 --- a/pkg/backtest/stream.go +++ /dev/null @@ -1,41 +0,0 @@ -package backtest - -import ( - "context" - - "github.com/sirupsen/logrus" - - "github.com/c9s/bbgo/pkg/types" -) - -var log = logrus.WithField("cmd", "backtest") - -type Stream struct { - types.StandardStream - - exchange *Exchange -} - -func (s *Stream) Connect(ctx context.Context) error { - if s.PublicOnly { - if s.exchange.marketDataStream != nil { - panic("you should not set up more than 1 market data stream in back-test") - } - s.exchange.marketDataStream = s - } else { - - // assign user data stream back - if s.exchange.userDataStream != nil { - panic("you should not set up more than 1 user data stream in back-test") - } - s.exchange.userDataStream = s - } - - s.EmitConnect() - s.EmitStart() - return nil -} - -func (s *Stream) Close() error { - return nil -} diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 1c7ae02d8..a0f842b2e 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -213,6 +213,8 @@ type ExchangeSession struct { Exchange types.Exchange `json:"-" yaml:"-"` + UseHeikinAshi bool `json:"useHeikinAshi,omitempty" yaml:"useHeikinAshi,omitempty"` + // Trades collects the executed trades from the exchange // map: symbol -> []trade Trades map[string]*types.TradeSlice `json:"-" yaml:"-"` @@ -346,6 +348,12 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) } } + if session.UseHeikinAshi { + session.MarketDataStream = &types.HeikinAshiStream{ + StandardStreamEmitter: session.MarketDataStream.(types.StandardStreamEmitter), + } + } + // query and initialize the balances if !session.PublicOnly { account, err := session.Exchange.QueryAccount(ctx) @@ -400,13 +408,23 @@ func (session *ExchangeSession) Init(ctx context.Context, environ *Environment) } // update last prices - session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { - if _, ok := session.startPrices[kline.Symbol]; !ok { - session.startPrices[kline.Symbol] = kline.Open - } + if session.UseHeikinAshi { + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { + if _, ok := session.startPrices[kline.Symbol]; !ok { + session.startPrices[kline.Symbol] = kline.Open + } - session.lastPrices[kline.Symbol] = kline.Close - }) + session.lastPrices[kline.Symbol] = session.MarketDataStream.(*types.HeikinAshiStream).LastOrigin[kline.Symbol][kline.Interval].Close + }) + } else { + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { + if _, ok := session.startPrices[kline.Symbol]; !ok { + session.startPrices[kline.Symbol] = kline.Open + } + + session.lastPrices[kline.Symbol] = kline.Close + }) + } session.MarketDataStream.OnMarketTrade(func(trade types.Trade) { session.lastPrices[trade.Symbol] = trade.Price diff --git a/pkg/cmd/backtest.go b/pkg/cmd/backtest.go index 5d1a706c5..5334727c0 100644 --- a/pkg/cmd/backtest.go +++ b/pkg/cmd/backtest.go @@ -255,7 +255,11 @@ var BacktestCmd = &cobra.Command{ if err != nil { return errors.Wrap(err, "failed to create backtest exchange") } - environ.AddExchange(name.String(), backtestExchange) + session := environ.AddExchange(name.String(), backtestExchange) + exchangeFromConfig := userConfig.Sessions[name.String()] + if exchangeFromConfig != nil { + session.UseHeikinAshi = exchangeFromConfig.UseHeikinAshi + } } if err := environ.Init(ctx); err != nil { @@ -640,6 +644,8 @@ func confirmation(s string) bool { func toExchangeSources(sessions map[string]*bbgo.ExchangeSession, extraIntervals ...types.Interval) (exchangeSources []backtest.ExchangeDataSource, err error) { for _, session := range sessions { exchange := session.Exchange.(*backtest.Exchange) + exchange.UserDataStream = session.UserDataStream.(types.StandardStreamEmitter) + exchange.MarketDataStream = session.MarketDataStream.(types.StandardStreamEmitter) exchange.InitMarketData() c, err := exchange.SubscribeMarketData(extraIntervals...) diff --git a/pkg/types/backtest_stream.go b/pkg/types/backtest_stream.go new file mode 100644 index 000000000..ee46d31fc --- /dev/null +++ b/pkg/types/backtest_stream.go @@ -0,0 +1,19 @@ +package types + +import ( + "context" +) + +type BacktestStream struct { + StandardStreamEmitter +} + +func (s *BacktestStream) Connect(ctx context.Context) error { + s.EmitConnect() + s.EmitStart() + return nil +} + +func (s *BacktestStream) Close() error { + return nil +} diff --git a/pkg/types/heikinashi_stream.go b/pkg/types/heikinashi_stream.go new file mode 100644 index 000000000..f3cc351a0 --- /dev/null +++ b/pkg/types/heikinashi_stream.go @@ -0,0 +1,72 @@ +package types + +import ( + "github.com/c9s/bbgo/pkg/fixedpoint" +) + +var Four fixedpoint.Value = fixedpoint.NewFromInt(4) + +type HeikinAshiStream struct { + StandardStreamEmitter + lastAshi map[string]map[Interval]*KLine + LastOrigin map[string]map[Interval]*KLine +} + +func (s *HeikinAshiStream) EmitKLineClosed(kline KLine) { + ashi := kline + if s.lastAshi == nil { + s.lastAshi = make(map[string]map[Interval]*KLine) + s.LastOrigin = make(map[string]map[Interval]*KLine) + } + if s.lastAshi[kline.Symbol] == nil { + s.lastAshi[kline.Symbol] = make(map[Interval]*KLine) + s.LastOrigin[kline.Symbol] = make(map[Interval]*KLine) + } + lastAshi := s.lastAshi[kline.Symbol][kline.Interval] + if lastAshi == nil { + ashi.Close = kline.Close.Add(kline.High). + Add(kline.Low). + Add(kline.Open). + Div(Four) + // High and Low are the same + s.lastAshi[kline.Symbol][kline.Interval] = &ashi + s.LastOrigin[kline.Symbol][kline.Interval] = &kline + } else { + ashi.Close = kline.Close.Add(kline.High). + Add(kline.Low). + Add(kline.Open). + Div(Four) + ashi.Open = lastAshi.Open.Add(lastAshi.Close).Div(Two) + // High and Low are the same + s.lastAshi[kline.Symbol][kline.Interval] = &ashi + s.LastOrigin[kline.Symbol][kline.Interval] = &kline + } + s.StandardStreamEmitter.EmitKLineClosed(ashi) +} + +// No writeback to lastAshi +func (s *HeikinAshiStream) EmitKLine(kline KLine) { + ashi := kline + if s.lastAshi == nil { + s.lastAshi = make(map[string]map[Interval]*KLine) + } + if s.lastAshi[kline.Symbol] == nil { + s.lastAshi[kline.Symbol] = make(map[Interval]*KLine) + } + lastAshi := s.lastAshi[kline.Symbol][kline.Interval] + if lastAshi == nil { + ashi.Close = kline.Close.Add(kline.High). + Add(kline.Low). + Add(kline.Open). + Div(Four) + } else { + ashi.Close = kline.Close.Add(kline.High). + Add(kline.Low). + Add(kline.Open). + Div(Four) + ashi.Open = lastAshi.Open.Add(lastAshi.Close).Div(Two) + } + s.StandardStreamEmitter.EmitKLine(ashi) +} + +var _ StandardStreamEmitter = &HeikinAshiStream{} diff --git a/pkg/types/stream.go b/pkg/types/stream.go index 1f7392df6..05a6b7dc6 100644 --- a/pkg/types/stream.go +++ b/pkg/types/stream.go @@ -28,7 +28,9 @@ type Stream interface { StandardStreamEventHub Subscribe(channel Channel, symbol string, options SubscribeOptions) + GetSubscriptions() []Subscription SetPublicOnly() + GetPublicOnly() bool Connect(ctx context.Context) error Close() error } @@ -104,6 +106,25 @@ type StandardStream struct { FuturesPositionSnapshotCallbacks []func(futuresPositions FuturesPositionMap) } +type StandardStreamEmitter interface { + Stream + EmitStart() + EmitConnect() + EmitDisconnect() + EmitTradeUpdate(Trade) + EmitOrderUpdate(Order) + EmitBalanceSnapshot(BalanceMap) + EmitBalanceUpdate(BalanceMap) + EmitKLineClosed(KLine) + EmitKLine(KLine) + EmitBookUpdate(SliceOrderBook) + EmitBookTickerUpdate(BookTicker) + EmitBookSnapshot(SliceOrderBook) + EmitMarketTrade(Trade) + EmitFuturesPositionUpdate(FuturesPositionMap) + EmitFuturesPositionSnapshot(FuturesPositionMap) +} + func NewStandardStream() StandardStream { return StandardStream{ ReconnectC: make(chan struct{}, 1), @@ -115,6 +136,10 @@ func (s *StandardStream) SetPublicOnly() { s.PublicOnly = true } +func (s *StandardStream) GetPublicOnly() bool { + return s.PublicOnly +} + func (s *StandardStream) SetEndpointCreator(creator EndpointCreator) { s.endpointCreator = creator } @@ -254,6 +279,10 @@ func (s *StandardStream) ping(ctx context.Context, conn *websocket.Conn, cancel } } +func (s *StandardStream) GetSubscriptions() []Subscription { + return s.Subscriptions +} + func (s *StandardStream) Subscribe(channel Channel, symbol string, options SubscribeOptions) { s.Subscriptions = append(s.Subscriptions, Subscription{ Channel: channel,