diff --git a/pkg/indicator/inf.go b/pkg/indicator/inf.go index c93de779f..11c3cc43e 100644 --- a/pkg/indicator/inf.go +++ b/pkg/indicator/inf.go @@ -6,6 +6,12 @@ type KLineWindowUpdater interface { OnKLineWindowUpdate(func(interval types.Interval, window types.KLineWindow)) } +type KLineClosedBinder interface { + BindK(target KLineClosedEmitter, symbol string, interval types.Interval) +} + +// KLineClosedEmitter is currently applied to the market data stream +// the market data stream emits the KLine closed event to the listeners. type KLineClosedEmitter interface { OnKLineClosed(func(k types.KLine)) } diff --git a/pkg/indicator/sma.go b/pkg/indicator/sma.go index 7f5fa65b9..ce83bf8d8 100644 --- a/pkg/indicator/sma.go +++ b/pkg/indicator/sma.go @@ -86,6 +86,10 @@ func (inc *SMA) handleKLineWindowUpdate(interval types.Interval, window types.KL inc.CalculateAndUpdate(window) } +func (inc *SMA) BindK(target KLineClosedEmitter, symbol string, interval types.Interval) { + target.OnKLineClosed(types.KLineWith(symbol, interval, inc.PushK)) +} + func (inc *SMA) Bind(updater KLineWindowUpdater) { updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate) } diff --git a/pkg/indicator/vwma.go b/pkg/indicator/vwma.go index 742a37aca..08e45f4b2 100644 --- a/pkg/indicator/vwma.go +++ b/pkg/indicator/vwma.go @@ -3,8 +3,6 @@ package indicator import ( "time" - log "github.com/sirupsen/logrus" - "github.com/c9s/bbgo/pkg/types" ) @@ -22,10 +20,14 @@ Volume Weighted Moving Average type VWMA struct { types.SeriesBase types.IntervalWindow - Values types.Float64Slice + + Values types.Float64Slice + PriceVolumeSMA *SMA + VolumeSMA *SMA + EndTime time.Time - UpdateCallbacks []func(value float64) + updateCallbacks []func(value float64) } func (inc *VWMA) Last() float64 { @@ -49,46 +51,46 @@ func (inc *VWMA) Length() int { var _ types.SeriesExtend = &VWMA{} +func (inc *VWMA) Update(price, volume float64) { + if inc.PriceVolumeSMA == nil { + inc.PriceVolumeSMA = &SMA{IntervalWindow: inc.IntervalWindow} + inc.SeriesBase.Series = inc + } + + if inc.VolumeSMA == nil { + inc.VolumeSMA = &SMA{IntervalWindow: inc.IntervalWindow} + } + + inc.PriceVolumeSMA.Update(price * volume) + inc.VolumeSMA.Update(volume) + + pv := inc.PriceVolumeSMA.Last() + v := inc.VolumeSMA.Last() + vwma := pv / v + inc.Values.Push(vwma) +} func (inc *VWMA) CalculateAndUpdate(allKLines []types.KLine) { if len(allKLines) < inc.Window { return } - var index = len(allKLines) - 1 - var kline = allKLines[index] + var last = allKLines[len(allKLines)-1] - if inc.EndTime != zeroTime && kline.EndTime.Before(inc.EndTime) { - return + if inc.VolumeSMA == nil { + for _, k := range allKLines { + if inc.EndTime != zeroTime && k.EndTime.Before(inc.EndTime) { + return + } + + inc.Update(k.Close.Float64(), k.Volume.Float64()) + } + } else { + inc.Update(last.Close.Float64(), last.Volume.Float64()) } - var recentK = allKLines[index-(inc.Window-1) : index+1] - - pv, err := calculateSMA(recentK, inc.Window, KLinePriceVolumeMapper) - if err != nil { - log.WithError(err).Error("price x volume SMA error") - return - } - v, err := calculateSMA(recentK, inc.Window, KLineVolumeMapper) - if err != nil { - log.WithError(err).Error("volume SMA error") - return - } - - if len(inc.Values) == 0 { - inc.SeriesBase.Series = inc - } - - vwma := pv / v - inc.Values.Push(vwma) - - if len(inc.Values) > MaxNumOfSMA { - inc.Values = inc.Values[MaxNumOfSMATruncateSize-1:] - } - - inc.EndTime = allKLines[index].EndTime.Time() - - inc.EmitUpdate(vwma) + inc.EndTime = last.EndTime.Time() + inc.EmitUpdate(inc.Values.Last()) } func (inc *VWMA) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) { diff --git a/pkg/indicator/vwma_callbacks.go b/pkg/indicator/vwma_callbacks.go index 5be9f70f0..375aee111 100644 --- a/pkg/indicator/vwma_callbacks.go +++ b/pkg/indicator/vwma_callbacks.go @@ -5,11 +5,11 @@ package indicator import () func (inc *VWMA) OnUpdate(cb func(value float64)) { - inc.UpdateCallbacks = append(inc.UpdateCallbacks, cb) + inc.updateCallbacks = append(inc.updateCallbacks, cb) } func (inc *VWMA) EmitUpdate(value float64) { - for _, cb := range inc.UpdateCallbacks { + for _, cb := range inc.updateCallbacks { cb(value) } }