indicator: support histrical price push

This commit is contained in:
c9s 2023-05-26 15:18:43 +08:00
parent 9fac61351d
commit 5bf204b890
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
3 changed files with 32 additions and 15 deletions

View File

@ -11,12 +11,14 @@ type KLineStream struct {
kLines []types.KLine
}
// AddSubscriber adds the subscriber function and push histrical data to the subscriber
// AddSubscriber adds the subscriber function and push historical data to the subscriber
func (s *KLineStream) AddSubscriber(f func(k types.KLine)) {
if len(s.kLines) > 0 {
// push historical klines to the subscriber
for _, k := range s.kLines {
f(k)
}
}
s.OnUpdate(f)
}

View File

@ -28,6 +28,11 @@ type Float64Source interface {
OnUpdate(f func(v float64))
}
type Float64Subscription interface {
types.Series
AddSubscriber(f func(v float64))
}
//go:generate callbackgen -type EWMAStream
type EWMAStream struct {
Float64Updater
@ -46,14 +51,22 @@ func EWMA2(source Float64Source, window int) *EWMAStream {
}
s.SeriesBase.Series = s.slice
source.OnUpdate(func(v float64) {
v2 := s.calculate(v)
s.slice.Push(v2)
s.EmitUpdate(v2)
})
if sub, ok := source.(Float64Subscription); ok {
sub.AddSubscriber(s.calculateAndPush)
} else {
source.OnUpdate(s.calculateAndPush)
}
return s
}
func (s *EWMAStream) calculateAndPush(v float64) {
v2 := s.calculate(v)
s.slice.Push(v2)
s.EmitUpdate(v2)
}
func (s *EWMAStream) calculate(v float64) float64 {
last := s.slice.Last()
m := s.multiplier

View File

@ -5,8 +5,8 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
type KLineSource interface {
OnUpdate(f func(k types.KLine))
type KLineSubscription interface {
AddSubscriber(f func(k types.KLine))
}
type PriceStream struct {
@ -17,12 +17,14 @@ type PriceStream struct {
mapper KLineValueMapper
}
func Price(source KLineSource, mapper KLineValueMapper) *PriceStream {
func Price(source KLineSubscription, mapper KLineValueMapper) *PriceStream {
s := &PriceStream{
mapper: mapper,
}
s.SeriesBase.Series = s.slice
source.OnUpdate(func(k types.KLine) {
source.AddSubscriber(func(k types.KLine) {
v := s.mapper(k)
s.slice.Push(v)
s.EmitUpdate(v)
@ -30,18 +32,18 @@ func Price(source KLineSource, mapper KLineValueMapper) *PriceStream {
return s
}
func ClosePrices(source KLineSource) *PriceStream {
func ClosePrices(source KLineSubscription) *PriceStream {
return Price(source, KLineClosePriceMapper)
}
func LowPrices(source KLineSource) *PriceStream {
func LowPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineLowPriceMapper)
}
func HighPrices(source KLineSource) *PriceStream {
func HighPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineHighPriceMapper)
}
func OpenPrices(source KLineSource) *PriceStream {
func OpenPrices(source KLineSubscription) *PriceStream {
return Price(source, KLineOpenPriceMapper)
}