indicator: implement Subscribe method on PriceStream

This commit is contained in:
c9s 2023-06-30 10:58:25 +08:00
parent 775ad7d906
commit ea1025d790
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
2 changed files with 41 additions and 8 deletions

View File

@ -8,7 +8,7 @@ import (
"github.com/c9s/bbgo/pkg/types" "github.com/c9s/bbgo/pkg/types"
) )
func TestIndicatorSet_closeCache(t *testing.T) { func newTestIndicatorSet() *IndicatorSet {
symbol := "BTCUSDT" symbol := "BTCUSDT"
store := NewMarketDataStore(symbol) store := NewMarketDataStore(symbol)
store.KLineWindows[types.Interval1m] = &types.KLineWindow{ store.KLineWindows[types.Interval1m] = &types.KLineWindow{
@ -17,15 +17,32 @@ func TestIndicatorSet_closeCache(t *testing.T) {
{Open: number(19200.0), Close: number(19300.0)}, {Open: number(19200.0), Close: number(19300.0)},
{Open: number(19300.0), Close: number(19200.0)}, {Open: number(19300.0), Close: number(19200.0)},
{Open: number(19200.0), Close: number(19100.0)}, {Open: number(19200.0), Close: number(19100.0)},
{Open: number(19100.0), Close: number(19500.0)},
{Open: number(19500.0), Close: number(19600.0)},
{Open: number(19600.0), Close: number(19700.0)},
} }
stream := types.NewStandardStream() stream := types.NewStandardStream()
indicatorSet := NewIndicatorSet(symbol, &stream, store) indicatorSet := NewIndicatorSet(symbol, &stream, store)
return indicatorSet
}
func TestIndicatorSet_closeCache(t *testing.T) {
indicatorSet := newTestIndicatorSet()
close1m := indicatorSet.CLOSE(types.Interval1m) close1m := indicatorSet.CLOSE(types.Interval1m)
assert.NotNil(t, close1m) assert.NotNil(t, close1m)
close1m2 := indicatorSet.CLOSE(types.Interval1m) close1m2 := indicatorSet.CLOSE(types.Interval1m)
assert.Equal(t, close1m, close1m2) assert.Equal(t, close1m, close1m2)
} }
func TestIndicatorSet_rsi(t *testing.T) {
indicatorSet := newTestIndicatorSet()
rsi1m := indicatorSet.RSI(types.IntervalWindow{Interval: types.Interval1m, Window: 7})
assert.NotNil(t, rsi1m)
rsiLast := rsi1m.Last(0)
assert.InDelta(t, 80, rsiLast, 0.0000001)
}

View File

@ -22,15 +22,31 @@ func Price(source KLineSubscription, mapper KLineValueMapper) *PriceStream {
mapper: mapper, mapper: mapper,
} }
if source != nil { if source == nil {
source.AddSubscriber(func(k types.KLine) { return s
v := s.mapper(k)
s.PushAndEmit(v)
})
} }
source.AddSubscriber(func(k types.KLine) {
v := s.mapper(k)
s.PushAndEmit(v)
})
return s return s
} }
// AddSubscriber adds the subscriber function and push historical data to the subscriber
func (s *PriceStream) AddSubscriber(f func(v float64)) {
s.OnUpdate(f)
if len(s.slice) == 0 {
return
}
// push historical value to the subscriber
for _, v := range s.slice {
f(v)
}
}
func (s *PriceStream) PushAndEmit(v float64) { func (s *PriceStream) PushAndEmit(v float64) {
s.slice.Push(v) s.slice.Push(v)
s.EmitUpdate(v) s.EmitUpdate(v)