From 9f937f529e0f9f334719bdf49e8bc7e77ec6cdc5 Mon Sep 17 00:00:00 2001 From: c9s Date: Thu, 21 Jul 2022 01:04:49 +0800 Subject: [PATCH] bbgo: refactor standard indicator --- pkg/bbgo/marketdatastore.go | 14 +-- pkg/bbgo/marketdatastore_callbacks.go | 10 ++ pkg/bbgo/session.go | 137 +------------------------- pkg/bbgo/standard_indicator_set.go | 135 +++++++++++++++++++++++++ pkg/indicator/boll.go | 36 ++++--- pkg/indicator/ewma.go | 31 ++++-- pkg/indicator/hull.go | 2 +- pkg/indicator/sma.go | 30 +++--- pkg/indicator/stoch.go | 17 +++- pkg/indicator/till.go | 2 +- 10 files changed, 235 insertions(+), 179 deletions(-) create mode 100644 pkg/bbgo/standard_indicator_set.go diff --git a/pkg/bbgo/marketdatastore.go b/pkg/bbgo/marketdatastore.go index 03f17d1f2..11f06902e 100644 --- a/pkg/bbgo/marketdatastore.go +++ b/pkg/bbgo/marketdatastore.go @@ -5,7 +5,7 @@ import "github.com/c9s/bbgo/pkg/types" const MaxNumOfKLines = 5_000 const MaxNumOfKLinesTruncate = 100 -// MarketDataStore receives and maintain the public market data +// MarketDataStore receives and maintain the public market data of a single symbol //go:generate callbackgen -type MarketDataStore type MarketDataStore struct { Symbol string @@ -14,6 +14,7 @@ type MarketDataStore struct { KLineWindows map[types.Interval]*types.KLineWindow `json:"-"` kLineWindowUpdateCallbacks []func(interval types.Interval, klines types.KLineWindow) + kLineClosedCallbacks []func(k types.KLine) } func NewMarketDataStore(symbol string) *MarketDataStore { @@ -47,18 +48,19 @@ func (store *MarketDataStore) handleKLineClosed(kline types.KLine) { store.AddKLine(kline) } -func (store *MarketDataStore) AddKLine(kline types.KLine) { - window, ok := store.KLineWindows[kline.Interval] +func (store *MarketDataStore) AddKLine(k types.KLine) { + window, ok := store.KLineWindows[k.Interval] if !ok { var tmp = make(types.KLineWindow, 0, 1000) - store.KLineWindows[kline.Interval] = &tmp + store.KLineWindows[k.Interval] = &tmp window = &tmp } - window.Add(kline) + window.Add(k) if len(*window) > MaxNumOfKLines { *window = (*window)[MaxNumOfKLinesTruncate-1:] } - store.EmitKLineWindowUpdate(kline.Interval, *window) + store.EmitKLineClosed(k) + store.EmitKLineWindowUpdate(k.Interval, *window) } diff --git a/pkg/bbgo/marketdatastore_callbacks.go b/pkg/bbgo/marketdatastore_callbacks.go index 4acaccb10..0cc2fd8a7 100644 --- a/pkg/bbgo/marketdatastore_callbacks.go +++ b/pkg/bbgo/marketdatastore_callbacks.go @@ -15,3 +15,13 @@ func (store *MarketDataStore) EmitKLineWindowUpdate(interval types.Interval, kli cb(interval, klines) } } + +func (store *MarketDataStore) OnKLineClosed(cb func(k types.KLine)) { + store.kLineClosedCallbacks = append(store.kLineClosedCallbacks, cb) +} + +func (store *MarketDataStore) EmitKLineClosed(k types.KLine) { + for _, cb := range store.kLineClosedCallbacks { + cb(k) + } +} diff --git a/pkg/bbgo/session.go b/pkg/bbgo/session.go index 4d7ab00e4..3ab47188b 100644 --- a/pkg/bbgo/session.go +++ b/pkg/bbgo/session.go @@ -17,146 +17,11 @@ import ( exchange2 "github.com/c9s/bbgo/pkg/exchange" "github.com/c9s/bbgo/pkg/fixedpoint" - "github.com/c9s/bbgo/pkg/indicator" "github.com/c9s/bbgo/pkg/service" "github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/util" ) -var ( - debugEWMA = false - debugSMA = false -) - -func init() { - // when using --dotenv option, the dotenv is loaded from command.PersistentPreRunE, not init. - // hence here the env var won't enable the debug flag - util.SetEnvVarBool("DEBUG_EWMA", &debugEWMA) - util.SetEnvVarBool("DEBUG_SMA", &debugSMA) -} - -type StandardIndicatorSet struct { - Symbol string - // Standard indicators - // interval -> window - sma map[types.IntervalWindow]*indicator.SMA - ewma map[types.IntervalWindow]*indicator.EWMA - boll map[types.IntervalWindowBandWidth]*indicator.BOLL - stoch map[types.IntervalWindow]*indicator.STOCH - volatility map[types.IntervalWindow]*indicator.Volatility - - store *MarketDataStore -} - -func NewStandardIndicatorSet(symbol string, store *MarketDataStore) *StandardIndicatorSet { - set := &StandardIndicatorSet{ - Symbol: symbol, - sma: make(map[types.IntervalWindow]*indicator.SMA), - ewma: make(map[types.IntervalWindow]*indicator.EWMA), - boll: make(map[types.IntervalWindowBandWidth]*indicator.BOLL), - stoch: make(map[types.IntervalWindow]*indicator.STOCH), - volatility: make(map[types.IntervalWindow]*indicator.Volatility), - store: store, - } - - // let us pre-defined commonly used intervals - for interval := range types.SupportedIntervals { - for _, window := range []int{7, 25, 99} { - iw := types.IntervalWindow{Interval: interval, Window: window} - set.sma[iw] = &indicator.SMA{IntervalWindow: iw} - set.sma[iw].Bind(store) - if debugSMA { - set.sma[iw].OnUpdate(func(value float64) { - log.Infof("%s SMA %s: %f", symbol, iw.String(), value) - }) - } - - set.ewma[iw] = &indicator.EWMA{IntervalWindow: iw} - set.ewma[iw].Bind(store) - - // if debug EWMA is enabled, we add the debug handler - if debugEWMA { - set.ewma[iw].OnUpdate(func(value float64) { - log.Infof("%s EWMA %s: %f", symbol, iw.String(), value) - }) - } - - } - - // setup boll indicator, we may refactor boll indicator by subscribing SMA indicator, - // however, since general used BOLLINGER band use window 21, which is not in the existing SMA indicator sets. - // Pull out the bandwidth configuration as the boll Key - iw := types.IntervalWindow{Interval: interval, Window: 21} - - // set efault band width to 2.0 - iwb := types.IntervalWindowBandWidth{IntervalWindow: iw, BandWidth: 2.0} - set.boll[iwb] = &indicator.BOLL{IntervalWindow: iw, K: iwb.BandWidth} - set.boll[iwb].Bind(store) - } - - return set -} - -// BOLL returns the bollinger band indicator of the given interval, the window and bandwidth -func (set *StandardIndicatorSet) BOLL(iw types.IntervalWindow, bandWidth float64) *indicator.BOLL { - iwb := types.IntervalWindowBandWidth{IntervalWindow: iw, BandWidth: bandWidth} - inc, ok := set.boll[iwb] - if !ok { - inc = &indicator.BOLL{IntervalWindow: iw, K: bandWidth} - inc.Bind(set.store) - set.boll[iwb] = inc - } - - return inc -} - -// SMA returns the simple moving average indicator of the given interval and the window size. -func (set *StandardIndicatorSet) SMA(iw types.IntervalWindow) *indicator.SMA { - inc, ok := set.sma[iw] - if !ok { - inc = &indicator.SMA{IntervalWindow: iw} - inc.Bind(set.store) - set.sma[iw] = inc - } - - return inc -} - -// EWMA returns the exponential weighed moving average indicator of the given interval and the window size. -func (set *StandardIndicatorSet) EWMA(iw types.IntervalWindow) *indicator.EWMA { - inc, ok := set.ewma[iw] - if !ok { - inc = &indicator.EWMA{IntervalWindow: iw} - inc.Bind(set.store) - set.ewma[iw] = inc - } - - return inc -} - -func (set *StandardIndicatorSet) STOCH(iw types.IntervalWindow) *indicator.STOCH { - inc, ok := set.stoch[iw] - if !ok { - inc = &indicator.STOCH{IntervalWindow: iw} - inc.Bind(set.store) - set.stoch[iw] = inc - } - - return inc -} - -// VOLATILITY returns the volatility(stddev) indicator of the given interval and the window size. -func (set *StandardIndicatorSet) VOLATILITY(iw types.IntervalWindow) *indicator.Volatility { - inc, ok := set.volatility[iw] - if !ok { - inc = &indicator.Volatility{IntervalWindow: iw} - inc.Bind(set.store) - set.volatility[iw] = inc - } - - return inc -} - // ExchangeSession presents the exchange connection Session // It also maintains and collects the data returned from the stream. type ExchangeSession struct { @@ -504,7 +369,7 @@ func (session *ExchangeSession) initSymbol(ctx context.Context, environ *Environ marketDataStore.BindStream(session.MarketDataStream) session.marketDataStores[symbol] = marketDataStore - standardIndicatorSet := NewStandardIndicatorSet(symbol, marketDataStore) + standardIndicatorSet := NewStandardIndicatorSet(symbol, session.MarketDataStream, marketDataStore) session.standardIndicatorSets[symbol] = standardIndicatorSet // used kline intervals by the given symbol diff --git a/pkg/bbgo/standard_indicator_set.go b/pkg/bbgo/standard_indicator_set.go new file mode 100644 index 000000000..2076a9f6b --- /dev/null +++ b/pkg/bbgo/standard_indicator_set.go @@ -0,0 +1,135 @@ +package bbgo + +import ( + "github.com/sirupsen/logrus" + + "github.com/c9s/bbgo/pkg/indicator" + "github.com/c9s/bbgo/pkg/types" + "github.com/c9s/bbgo/pkg/util" +) + +var ( + debugEWMA = false + debugSMA = false + debugBOLL = false +) + +func init() { + // when using --dotenv option, the dotenv is loaded from command.PersistentPreRunE, not init. + // hence here the env var won't enable the debug flag + util.SetEnvVarBool("DEBUG_EWMA", &debugEWMA) + util.SetEnvVarBool("DEBUG_SMA", &debugSMA) + util.SetEnvVarBool("DEBUG_BOLL", &debugBOLL) +} + +type StandardIndicatorSet struct { + Symbol string + // Standard indicators + // interval -> window + sma map[types.IntervalWindow]*indicator.SMA + ewma map[types.IntervalWindow]*indicator.EWMA + boll map[types.IntervalWindowBandWidth]*indicator.BOLL + stoch map[types.IntervalWindow]*indicator.STOCH + volatility map[types.IntervalWindow]*indicator.Volatility + + stream types.Stream + store *MarketDataStore +} + +func NewStandardIndicatorSet(symbol string, stream types.Stream, store *MarketDataStore) *StandardIndicatorSet { + return &StandardIndicatorSet{ + Symbol: symbol, + sma: make(map[types.IntervalWindow]*indicator.SMA), + ewma: make(map[types.IntervalWindow]*indicator.EWMA), + boll: make(map[types.IntervalWindowBandWidth]*indicator.BOLL), + stoch: make(map[types.IntervalWindow]*indicator.STOCH), + store: store, + stream: stream, + } +} + +// BOLL returns the bollinger band indicator of the given interval, the window and bandwidth +func (set *StandardIndicatorSet) BOLL(iw types.IntervalWindow, bandWidth float64) *indicator.BOLL { + iwb := types.IntervalWindowBandWidth{IntervalWindow: iw, BandWidth: bandWidth} + inc, ok := set.boll[iwb] + if !ok { + inc = &indicator.BOLL{IntervalWindow: iw, K: bandWidth} + + if klines, ok := set.store.KLinesOfInterval(iw.Interval); ok { + inc.LoadK(*klines) + } + + if debugBOLL { + inc.OnUpdate(func(sma float64, upBand float64, downBand float64) { + logrus.Infof("%s BOLL %s: sma=%f up=%f down=%f", set.Symbol, iw.String(), sma, upBand, downBand) + }) + } + + inc.BindK(set.stream, set.Symbol, iw.Interval) + set.boll[iwb] = inc + } + + return inc +} + +// SMA returns the simple moving average indicator of the given interval and the window size. +func (set *StandardIndicatorSet) SMA(iw types.IntervalWindow) *indicator.SMA { + inc, ok := set.sma[iw] + if !ok { + inc = &indicator.SMA{IntervalWindow: iw} + + if klines, ok := set.store.KLinesOfInterval(iw.Interval); ok { + inc.LoadK(*klines) + } + + if debugSMA { + inc.OnUpdate(func(value float64) { + logrus.Infof("%s SMA %s: %f", set.Symbol, iw.String(), value) + }) + } + + inc.BindK(set.stream, set.Symbol, iw.Interval) + set.sma[iw] = inc + } + + return inc +} + +// EWMA returns the exponential weighed moving average indicator of the given interval and the window size. +func (set *StandardIndicatorSet) EWMA(iw types.IntervalWindow) *indicator.EWMA { + inc, ok := set.ewma[iw] + if !ok { + inc = &indicator.EWMA{IntervalWindow: iw} + + if klines, ok := set.store.KLinesOfInterval(iw.Interval); ok { + inc.LoadK(*klines) + } + + if debugEWMA { + inc.OnUpdate(func(value float64) { + logrus.Infof("%s EWMA %s: value=%f", set.Symbol, iw.String(), value) + }) + } + + inc.BindK(set.stream, set.Symbol, iw.Interval) + set.ewma[iw] = inc + } + + return inc +} + +func (set *StandardIndicatorSet) STOCH(iw types.IntervalWindow) *indicator.STOCH { + inc, ok := set.stoch[iw] + if !ok { + inc = &indicator.STOCH{IntervalWindow: iw} + + if klines, ok := set.store.KLinesOfInterval(iw.Interval); ok { + inc.LoadK(*klines) + } + + inc.BindK(set.stream, set.Symbol, iw.Interval) + set.stoch[iw] = inc + } + + return inc +} diff --git a/pkg/indicator/boll.go b/pkg/indicator/boll.go index 3fa49030c..9b0a54704 100644 --- a/pkg/indicator/boll.go +++ b/pkg/indicator/boll.go @@ -96,27 +96,37 @@ func (inc *BOLL) Update(value float64) { inc.DownBand.Push(downBand) } -func (inc *BOLL) PushK(k types.KLine) { - inc.Update(k.Close.Float64()) +func (inc *BOLL) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { + target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) } -func (inc *BOLL) CalculateAndUpdate(allKLines []types.KLine) { - var last = allKLines[len(allKLines)-1] +func (inc *BOLL) PushK(k types.KLine) { + if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { + return + } + inc.Update(k.Close.Float64()) + inc.EndTime = k.EndTime.Time() + inc.EmitUpdate(inc.SMA.Last(), inc.UpBand.Last(), inc.DownBand.Last()) +} - if inc.SMA == nil { - for _, k := range allKLines { - if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { - continue - } - inc.PushK(k) - } - } else { - inc.PushK(last) +func (inc *BOLL) LoadK(allKLines []types.KLine) { + for _, k := range allKLines { + inc.PushK(k) } inc.EmitUpdate(inc.SMA.Last(), inc.UpBand.Last(), inc.DownBand.Last()) } +func (inc *BOLL) CalculateAndUpdate(allKLines []types.KLine) { + if inc.SMA == nil { + inc.LoadK(allKLines) + return + } + + var last = allKLines[len(allKLines)-1] + inc.PushK(last) +} + func (inc *BOLL) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) { if inc.Interval != interval { return diff --git a/pkg/indicator/ewma.go b/pkg/indicator/ewma.go index c09941534..4830682ba 100644 --- a/pkg/indicator/ewma.go +++ b/pkg/indicator/ewma.go @@ -15,8 +15,8 @@ type EWMA struct { types.IntervalWindow types.SeriesBase - Values types.Float64Slice - LastOpenTime time.Time + Values types.Float64Slice + EndTime time.Time updateCallbacks []func(value float64) } @@ -58,17 +58,11 @@ func (inc *EWMA) Length() int { return len(inc.Values) } -func (inc *EWMA) PushK(k types.KLine) { - inc.Update(k.Close.Float64()) - inc.LastOpenTime = k.StartTime.Time() -} - func (inc *EWMA) CalculateAndUpdate(allKLines []types.KLine) { if len(inc.Values) == 0 { for _, k := range allKLines { inc.PushK(k) } - inc.EmitUpdate(inc.Last()) } else { k := allKLines[len(allKLines)-1] @@ -89,6 +83,27 @@ func (inc *EWMA) Bind(updater KLineWindowUpdater) { updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate) } +func (inc *EWMA) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { + target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) +} + +func (inc *EWMA) PushK(k types.KLine) { + if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { + return + } + + inc.Update(k.Close.Float64()) + inc.EndTime = k.EndTime.Time() + inc.EmitUpdate(inc.Last()) +} + +func (inc *EWMA) LoadK(allKLines []types.KLine) { + for _, k := range allKLines { + inc.PushK(k) + } + inc.EmitUpdate(inc.Last()) +} + func CalculateKLinesEMA(allKLines []types.KLine, priceF KLinePriceMapper, window int) float64 { var multiplier = 2.0 / (float64(window) + 1) return ewma(MapKLinePrice(allKLines, priceF), multiplier) diff --git a/pkg/indicator/hull.go b/pkg/indicator/hull.go index de907d6b0..8e1505539 100644 --- a/pkg/indicator/hull.go +++ b/pkg/indicator/hull.go @@ -61,7 +61,7 @@ func (inc *HULL) CalculateAndUpdate(allKLines []types.KLine) { doable = true } for _, k := range allKLines { - if !doable && k.StartTime.After(inc.ma1.LastOpenTime) { + if !doable && k.EndTime.After(inc.ma1.EndTime) { doable = true } if doable { diff --git a/pkg/indicator/sma.go b/pkg/indicator/sma.go index ce83bf8d8..fe50dd9c4 100644 --- a/pkg/indicator/sma.go +++ b/pkg/indicator/sma.go @@ -56,26 +56,34 @@ func (inc *SMA) Update(value float64) { inc.Values.Push(types.Mean(inc.rawValues)) } +func (inc *SMA) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { + target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) +} + func (inc *SMA) PushK(k types.KLine) { + if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { + return + } + inc.Update(k.Close.Float64()) inc.EndTime = k.EndTime.Time() + inc.EmitUpdate(inc.Values.Last()) +} + +func (inc *SMA) LoadK(allKLines []types.KLine) { + for _, k := range allKLines { + inc.PushK(k) + } } func (inc *SMA) CalculateAndUpdate(allKLines []types.KLine) { - var last = allKLines[len(allKLines)-1] - if inc.rawValues == nil { - for _, k := range allKLines { - if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { - continue - } - inc.PushK(k) - } + inc.LoadK(allKLines) } else { + var last = allKLines[len(allKLines)-1] inc.PushK(last) } - inc.EmitUpdate(inc.Values.Last()) } func (inc *SMA) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) { @@ -86,10 +94,6 @@ func (inc *SMA) handleKLineWindowUpdate(interval types.Interval, window types.KL inc.CalculateAndUpdate(window) } -func (inc *SMA) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { - target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) -} - func (inc *SMA) Bind(updater KLineWindowUpdater) { updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate) } diff --git a/pkg/indicator/stoch.go b/pkg/indicator/stoch.go index 602df765f..177a72f0e 100644 --- a/pkg/indicator/stoch.go +++ b/pkg/indicator/stoch.go @@ -61,6 +61,22 @@ func (inc *STOCH) LastD() float64 { func (inc *STOCH) PushK(k types.KLine) { inc.Update(k.High.Float64(), k.Low.Float64(), k.Close.Float64()) + inc.EndTime = k.EndTime.Time() +} + +func (inc *STOCH) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { + target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) +} + +func (inc *STOCH) LoadK(allKLines []types.KLine) { + for _, k := range allKLines { + if inc.EndTime != zeroTime && !k.EndTime.After(inc.EndTime) { + continue + } + + inc.PushK(k) + } + inc.EmitUpdate(inc.LastK(), inc.LastD()) } func (inc *STOCH) CalculateAndUpdate(kLines []types.KLine) { @@ -77,7 +93,6 @@ func (inc *STOCH) CalculateAndUpdate(kLines []types.KLine) { } inc.EmitUpdate(inc.LastK(), inc.LastD()) - inc.EndTime = kLines[len(kLines)-1].EndTime.Time() } func (inc *STOCH) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) { diff --git a/pkg/indicator/till.go b/pkg/indicator/till.go index 3fdc2a706..8fb570373 100644 --- a/pkg/indicator/till.go +++ b/pkg/indicator/till.go @@ -96,7 +96,7 @@ func (inc *TILL) CalculateAndUpdate(allKLines []types.KLine) { doable = true } for _, k := range allKLines { - if !doable && k.StartTime.After(inc.e1.LastOpenTime) { + if !doable && k.EndTime.After(inc.e1.EndTime) { doable = true }