diff --git a/pkg/bbgo/standard_indicator_set.go b/pkg/bbgo/standard_indicator_set.go index 3f74591d0..71befe890 100644 --- a/pkg/bbgo/standard_indicator_set.go +++ b/pkg/bbgo/standard_indicator_set.go @@ -29,7 +29,7 @@ type StandardIndicatorSet struct { // interval -> window iwbIndicators map[types.IntervalWindowBandWidth]*indicator.BOLL iwIndicators map[indicatorKey]indicator.KLinePusher - macdIndicators map[indicator.MACDConfig]*indicator.MACD + macdIndicators map[indicator.MACDConfig]*indicator.MACDLegacy stream types.Stream store *MarketDataStore @@ -47,7 +47,7 @@ func NewStandardIndicatorSet(symbol string, stream types.Stream, store *MarketDa stream: stream, iwIndicators: make(map[indicatorKey]indicator.KLinePusher), iwbIndicators: make(map[types.IntervalWindowBandWidth]*indicator.BOLL), - macdIndicators: make(map[indicator.MACDConfig]*indicator.MACD), + macdIndicators: make(map[indicator.MACDConfig]*indicator.MACDLegacy), } } @@ -154,14 +154,14 @@ func (s *StandardIndicatorSet) BOLL(iw types.IntervalWindow, bandWidth float64) return inc } -func (s *StandardIndicatorSet) MACD(iw types.IntervalWindow, shortPeriod, longPeriod int) *indicator.MACD { +func (s *StandardIndicatorSet) MACD(iw types.IntervalWindow, shortPeriod, longPeriod int) *indicator.MACDLegacy { config := indicator.MACDConfig{IntervalWindow: iw, ShortPeriod: shortPeriod, LongPeriod: longPeriod} inc, ok := s.macdIndicators[config] if ok { return inc } - inc = &indicator.MACD{MACDConfig: config} + inc = &indicator.MACDLegacy{MACDConfig: config} s.macdIndicators[config] = inc s.initAndBind(inc, config.IntervalWindow.Interval) return inc diff --git a/pkg/indicator/float64updater.go b/pkg/indicator/float64updater.go new file mode 100644 index 000000000..a9743538e --- /dev/null +++ b/pkg/indicator/float64updater.go @@ -0,0 +1,6 @@ +package indicator + +//go:generate callbackgen -type Float64Updater +type Float64Updater struct { + updateCallbacks []func(v float64) +} diff --git a/pkg/indicator/klinestream.go b/pkg/indicator/klinestream.go new file mode 100644 index 000000000..4fafafc08 --- /dev/null +++ b/pkg/indicator/klinestream.go @@ -0,0 +1,37 @@ +package indicator + +import "github.com/c9s/bbgo/pkg/types" + +const MaxNumOfKLines = 4_000 + +//go:generate callbackgen -type KLineStream +type KLineStream struct { + updateCallbacks []func(k types.KLine) + + kLines []types.KLine +} + +// AddSubscriber adds the subscriber function and push histrical data to the subscriber +func (s *KLineStream) AddSubscriber(f func(k types.KLine)) { + if len(s.kLines) > 0 { + // push historical klines to the subscriber + } + + s.OnUpdate(f) +} + +// KLines creates a KLine stream that pushes the klines to the subscribers +func KLines(source types.Stream) *KLineStream { + s := &KLineStream{} + + source.OnKLineClosed(func(k types.KLine) { + s.kLines = append(s.kLines, k) + + if len(s.kLines) > MaxNumOfKLines { + s.kLines = s.kLines[len(s.kLines)-1-MaxNumOfKLines:] + } + s.EmitUpdate(k) + }) + + return s +} diff --git a/pkg/indicator/klinestream_callbacks.go b/pkg/indicator/klinestream_callbacks.go index a3fa68d97..eb6e59dbb 100644 --- a/pkg/indicator/klinestream_callbacks.go +++ b/pkg/indicator/klinestream_callbacks.go @@ -6,12 +6,12 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -func (K *KLineStream) OnUpdate(cb func(k types.KLine)) { - K.updateCallbacks = append(K.updateCallbacks, cb) +func (s *KLineStream) OnUpdate(cb func(k types.KLine)) { + s.updateCallbacks = append(s.updateCallbacks, cb) } -func (K *KLineStream) EmitUpdate(k types.KLine) { - for _, cb := range K.updateCallbacks { +func (s *KLineStream) EmitUpdate(k types.KLine) { + for _, cb := range s.updateCallbacks { cb(k) } } diff --git a/pkg/indicator/macd2.go b/pkg/indicator/macd2.go index 1997d90ac..407c7d5e1 100644 --- a/pkg/indicator/macd2.go +++ b/pkg/indicator/macd2.go @@ -23,50 +23,6 @@ signal := EMA(macd, 16) histogram := Subtract(macd, signal) */ -//go:generate callbackgen -type KLineStream -type KLineStream struct { - updateCallbacks []func(k types.KLine) -} - -func KLines(source types.Stream) *KLineStream { - stream := &KLineStream{} - source.OnKLineClosed(stream.EmitUpdate) - return stream -} - -type KLineSource interface { - OnUpdate(f func(k types.KLine)) -} - -type PriceStream struct { - types.SeriesBase - Float64Updater - - slice floats.Slice - mapper KLineValueMapper -} - -func Price(source KLineSource, mapper KLineValueMapper) *PriceStream { - s := &PriceStream{ - mapper: mapper, - } - s.SeriesBase.Series = s.slice - source.OnUpdate(func(k types.KLine) { - v := s.mapper(k) - s.slice.Push(v) - s.EmitUpdate(v) - }) - return s -} - -func ClosePrices(source KLineSource) *PriceStream { - return Price(source, KLineClosePriceMapper) -} - -func OpenPrices(source KLineSource) *PriceStream { - return Price(source, KLineOpenPriceMapper) -} - type Float64Source interface { types.Series OnUpdate(f func(v float64)) @@ -104,11 +60,6 @@ func (s *EWMAStream) calculate(v float64) float64 { return (1.0-m)*last + m*v } -//go:generate callbackgen -type Float64Updater -type Float64Updater struct { - updateCallbacks []func(v float64) -} - type SubtractStream struct { Float64Updater types.SeriesBase diff --git a/pkg/indicator/macd2_test.go b/pkg/indicator/macd2_test.go index dc4791dba..5fe59773d 100644 --- a/pkg/indicator/macd2_test.go +++ b/pkg/indicator/macd2_test.go @@ -26,9 +26,5 @@ func TestSubtract(t *testing.T) { assert.Equal(t, len(subtract.a), len(subtract.b)) assert.Equal(t, len(subtract.a), len(subtract.c)) - assert.Equal(t, subtract.c[0], subtract.a[0]-subtract.b[0]) - - t.Logf("subtract.a: %+v", subtract.a) - t.Logf("subtract.b: %+v", subtract.b) - t.Logf("subtract.c: %+v", subtract.c) + assert.InDelta(t, subtract.c[0], subtract.a[0]-subtract.b[0], 0.0001) } diff --git a/pkg/indicator/macd_callbacks.go b/pkg/indicator/macd_callbacks.go deleted file mode 100644 index 93a1bc8c9..000000000 --- a/pkg/indicator/macd_callbacks.go +++ /dev/null @@ -1,15 +0,0 @@ -// Code generated by "callbackgen -type MACD"; DO NOT EDIT. - -package indicator - -import () - -func (inc *MACD) OnUpdate(cb func(macd float64, signal float64, histogram float64)) { - inc.updateCallbacks = append(inc.updateCallbacks, cb) -} - -func (inc *MACD) EmitUpdate(macd float64, signal float64, histogram float64) { - for _, cb := range inc.updateCallbacks { - cb(macd, signal, histogram) - } -} diff --git a/pkg/indicator/macd.go b/pkg/indicator/macdlegacy.go similarity index 87% rename from pkg/indicator/macd.go rename to pkg/indicator/macdlegacy.go index 93ed7c0cc..19818c70e 100644 --- a/pkg/indicator/macd.go +++ b/pkg/indicator/macdlegacy.go @@ -27,20 +27,19 @@ type MACDConfig struct { LongPeriod int `json:"long"` } -//go:generate callbackgen -type MACD -type MACD struct { +//go:generate callbackgen -type MACDLegacy +type MACDLegacy struct { MACDConfig Values floats.Slice `json:"-"` fastEWMA, slowEWMA, signalLine *EWMA Histogram floats.Slice `json:"-"` - EndTime time.Time - updateCallbacks []func(macd, signal, histogram float64) + EndTime time.Time } -func (inc *MACD) Update(x float64) { +func (inc *MACDLegacy) Update(x float64) { if len(inc.Values) == 0 { // apply default values inc.fastEWMA = &EWMA{IntervalWindow: types.IntervalWindow{Window: inc.ShortPeriod}} @@ -76,7 +75,7 @@ func (inc *MACD) Update(x float64) { inc.EmitUpdate(macd, signal, histogram) } -func (inc *MACD) Last() float64 { +func (inc *MACDLegacy) Last() float64 { if len(inc.Values) == 0 { return 0.0 } @@ -84,27 +83,27 @@ func (inc *MACD) Last() float64 { return inc.Values[len(inc.Values)-1] } -func (inc *MACD) Length() int { +func (inc *MACDLegacy) Length() int { return len(inc.Values) } -func (inc *MACD) PushK(k types.KLine) { +func (inc *MACDLegacy) PushK(k types.KLine) { inc.Update(k.Close.Float64()) } -func (inc *MACD) MACD() types.SeriesExtend { - out := &MACDValues{MACD: inc} +func (inc *MACDLegacy) MACD() types.SeriesExtend { + out := &MACDValues{MACDLegacy: inc} out.SeriesBase.Series = out return out } -func (inc *MACD) Singals() types.SeriesExtend { +func (inc *MACDLegacy) Singals() types.SeriesExtend { return inc.signalLine } type MACDValues struct { types.SeriesBase - *MACD + *MACDLegacy } func (inc *MACDValues) Last() float64 { diff --git a/pkg/indicator/macdlegacy_callbacks.go b/pkg/indicator/macdlegacy_callbacks.go new file mode 100644 index 000000000..ed4d10816 --- /dev/null +++ b/pkg/indicator/macdlegacy_callbacks.go @@ -0,0 +1,15 @@ +// Code generated by "callbackgen -type MACDLegacy"; DO NOT EDIT. + +package indicator + +import () + +func (inc *MACDLegacy) OnUpdate(cb func(macd float64, signal float64, histogram float64)) { + inc.updateCallbacks = append(inc.updateCallbacks, cb) +} + +func (inc *MACDLegacy) EmitUpdate(macd float64, signal float64, histogram float64) { + for _, cb := range inc.updateCallbacks { + cb(macd, signal, histogram) + } +} diff --git a/pkg/indicator/macd_test.go b/pkg/indicator/macdlegacy_test.go similarity index 92% rename from pkg/indicator/macd_test.go rename to pkg/indicator/macdlegacy_test.go index e3f5075bb..c9bede48a 100644 --- a/pkg/indicator/macd_test.go +++ b/pkg/indicator/macdlegacy_test.go @@ -40,7 +40,7 @@ func Test_calculateMACD(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { iw := types.IntervalWindow{Window: 9} - macd := MACD{MACDConfig: MACDConfig{IntervalWindow: iw, ShortPeriod: 12, LongPeriod: 26}} + macd := MACDLegacy{MACDConfig: MACDConfig{IntervalWindow: iw, ShortPeriod: 12, LongPeriod: 26}} for _, k := range tt.kLines { macd.PushK(k) } diff --git a/pkg/indicator/price.go b/pkg/indicator/price.go new file mode 100644 index 000000000..dbd5853fe --- /dev/null +++ b/pkg/indicator/price.go @@ -0,0 +1,47 @@ +package indicator + +import ( + "github.com/c9s/bbgo/pkg/datatype/floats" + "github.com/c9s/bbgo/pkg/types" +) + +type KLineSource interface { + OnUpdate(f func(k types.KLine)) +} + +type PriceStream struct { + types.SeriesBase + Float64Updater + + slice floats.Slice + mapper KLineValueMapper +} + +func Price(source KLineSource, mapper KLineValueMapper) *PriceStream { + s := &PriceStream{ + mapper: mapper, + } + s.SeriesBase.Series = s.slice + source.OnUpdate(func(k types.KLine) { + v := s.mapper(k) + s.slice.Push(v) + s.EmitUpdate(v) + }) + return s +} + +func ClosePrices(source KLineSource) *PriceStream { + return Price(source, KLineClosePriceMapper) +} + +func LowPrices(source KLineSource) *PriceStream { + return Price(source, KLineLowPriceMapper) +} + +func HighPrices(source KLineSource) *PriceStream { + return Price(source, KLineHighPriceMapper) +} + +func OpenPrices(source KLineSource) *PriceStream { + return Price(source, KLineOpenPriceMapper) +} diff --git a/pkg/strategy/pivotshort/failedbreakhigh.go b/pkg/strategy/pivotshort/failedbreakhigh.go index eb193e994..eb30dc91b 100644 --- a/pkg/strategy/pivotshort/failedbreakhigh.go +++ b/pkg/strategy/pivotshort/failedbreakhigh.go @@ -47,7 +47,7 @@ type FailedBreakHigh struct { MACDDivergence *MACDDivergence `json:"macdDivergence"` - macd *indicator.MACD + macd *indicator.MACDLegacy macdTopDivergence bool