From ea1025d79026b49ee5fb72b1c6e7031299f7903e Mon Sep 17 00:00:00 2001 From: c9s Date: Fri, 30 Jun 2023 10:58:25 +0800 Subject: [PATCH] indicator: implement Subscribe method on PriceStream --- pkg/bbgo/indicator_set_test.go | 23 ++++++++++++++++++++--- pkg/indicator/v2_price.go | 26 +++++++++++++++++++++----- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/pkg/bbgo/indicator_set_test.go b/pkg/bbgo/indicator_set_test.go index e1bbb3f58..0cf7050b6 100644 --- a/pkg/bbgo/indicator_set_test.go +++ b/pkg/bbgo/indicator_set_test.go @@ -8,7 +8,7 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -func TestIndicatorSet_closeCache(t *testing.T) { +func newTestIndicatorSet() *IndicatorSet { symbol := "BTCUSDT" store := NewMarketDataStore(symbol) store.KLineWindows[types.Interval1m] = &types.KLineWindow{ @@ -17,15 +17,32 @@ func TestIndicatorSet_closeCache(t *testing.T) { {Open: number(19200.0), Close: number(19300.0)}, {Open: number(19300.0), Close: number(19200.0)}, {Open: number(19200.0), Close: number(19100.0)}, + {Open: number(19100.0), Close: number(19500.0)}, + {Open: number(19500.0), Close: number(19600.0)}, + {Open: number(19600.0), Close: number(19700.0)}, } stream := types.NewStandardStream() - indicatorSet := NewIndicatorSet(symbol, &stream, store) - + return indicatorSet +} + +func TestIndicatorSet_closeCache(t *testing.T) { + indicatorSet := newTestIndicatorSet() + close1m := indicatorSet.CLOSE(types.Interval1m) assert.NotNil(t, close1m) close1m2 := indicatorSet.CLOSE(types.Interval1m) assert.Equal(t, close1m, close1m2) } + +func TestIndicatorSet_rsi(t *testing.T) { + indicatorSet := newTestIndicatorSet() + + rsi1m := indicatorSet.RSI(types.IntervalWindow{Interval: types.Interval1m, Window: 7}) + assert.NotNil(t, rsi1m) + + rsiLast := rsi1m.Last(0) + assert.InDelta(t, 80, rsiLast, 0.0000001) +} diff --git a/pkg/indicator/v2_price.go b/pkg/indicator/v2_price.go index 871bee01c..d4c546e57 100644 --- a/pkg/indicator/v2_price.go +++ b/pkg/indicator/v2_price.go @@ -22,15 +22,31 @@ func Price(source KLineSubscription, mapper KLineValueMapper) *PriceStream { mapper: mapper, } - if source != nil { - source.AddSubscriber(func(k types.KLine) { - v := s.mapper(k) - s.PushAndEmit(v) - }) + if source == nil { + return s } + + source.AddSubscriber(func(k types.KLine) { + v := s.mapper(k) + s.PushAndEmit(v) + }) return s } +// AddSubscriber adds the subscriber function and push historical data to the subscriber +func (s *PriceStream) AddSubscriber(f func(v float64)) { + s.OnUpdate(f) + + if len(s.slice) == 0 { + return + } + + // push historical value to the subscriber + for _, v := range s.slice { + f(v) + } +} + func (s *PriceStream) PushAndEmit(v float64) { s.slice.Push(v) s.EmitUpdate(v)