From 9dd8dbbede41eed13b9c2b2f287d700f5374805d Mon Sep 17 00:00:00 2001 From: zenix Date: Tue, 7 Jun 2022 20:46:31 +0900 Subject: [PATCH] feature: add drift indicator, split heikinashi's Queue --- pkg/indicator/drift.go | 82 +++++++++++++++++++++++++++++ pkg/indicator/drift_callbacks.go | 15 ++++++ pkg/strategy/ewoDgtrd/heikinashi.go | 57 ++++---------------- pkg/strategy/factorzoo/strategy.go | 34 ++++++++++-- pkg/types/indicator.go | 41 +++++++++++++++ 5 files changed, 178 insertions(+), 51 deletions(-) create mode 100644 pkg/indicator/drift.go create mode 100644 pkg/indicator/drift_callbacks.go diff --git a/pkg/indicator/drift.go b/pkg/indicator/drift.go new file mode 100644 index 000000000..d760014a1 --- /dev/null +++ b/pkg/indicator/drift.go @@ -0,0 +1,82 @@ +package indicator + +import ( + "math" + + "github.com/c9s/bbgo/pkg/types" +) + +// Refer: https://tradingview.com/script/aDymGrFx-Drift-Study-Inspired-by-Monte-Carlo-Simulations-with-BM-KL/ +// Brownian Motion's drift factor +// could be used in Monte Carlo Simulations +//go:generate callbackgen -type Drift +type Drift struct { + types.IntervalWindow + chng *types.Queue + Values types.Float64Slice + SMA *SMA + LastValue float64 + + UpdateCallbacks []func(value float64) +} + +func (inc *Drift) Update(value float64) { + if inc.chng == nil { + inc.SMA = &SMA{IntervalWindow: types.IntervalWindow{inc.Interval, inc.Window}} + inc.chng = types.NewQueue(inc.Window) + } + chng := math.Log(value / inc.LastValue) + inc.LastValue = value + inc.SMA.Update(chng) + inc.chng.Update(chng) + stdev := types.Stdev(inc.chng, inc.Window) + drift := inc.SMA.Last() - stdev * stdev * 0.5 + inc.Values.Push(drift) +} + +func (inc *Drift) Index(i int) float64 { + if inc.Values == nil { + return 0 + } + return inc.Values.Index(i) +} + +func (inc *Drift) Last() float64 { + if inc.Values.Length() == 0 { + return 0 + } + return inc.Values.Last() +} + +func (inc *Drift) Length() int { + if inc.Values == nil { + return 0 + } + return inc.Values.Length() +} + +var _ types.Series = &Drift{} + +func (inc *Drift) calculateAndUpdate(allKLines []types.KLine) { + if inc.chng == nil { + for _, k := range allKLines { + inc.Update(k.Close.Float64()) + inc.EmitUpdate(inc.Last()) + } + } else { + inc.Update(allKLines[len(allKLines)-1].Close.Float64()) + inc.EmitUpdate(inc.Last()) + } +} + +func (inc *Drift) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) { + if inc.Interval != interval { + return + } + + inc.calculateAndUpdate(window) +} + +func (inc *Drift) Bind(updater KLineWindowUpdater) { + updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate) +} diff --git a/pkg/indicator/drift_callbacks.go b/pkg/indicator/drift_callbacks.go new file mode 100644 index 000000000..224ef74a4 --- /dev/null +++ b/pkg/indicator/drift_callbacks.go @@ -0,0 +1,15 @@ +// Code generated by "callbackgen -type Drift"; DO NOT EDIT. + +package indicator + +import () + +func (inc *Drift) OnUpdate(cb func(value float64)) { + inc.UpdateCallbacks = append(inc.UpdateCallbacks, cb) +} + +func (inc *Drift) EmitUpdate(value float64) { + for _, cb := range inc.UpdateCallbacks { + cb(value) + } +} diff --git a/pkg/strategy/ewoDgtrd/heikinashi.go b/pkg/strategy/ewoDgtrd/heikinashi.go index 1b81442af..fca1934c0 100644 --- a/pkg/strategy/ewoDgtrd/heikinashi.go +++ b/pkg/strategy/ewoDgtrd/heikinashi.go @@ -7,58 +7,21 @@ import ( "github.com/c9s/bbgo/pkg/types" ) -type Queue struct { - arr []float64 - size int -} - -func NewQueue(size int) *Queue { - return &Queue{ - arr: make([]float64, 0, size), - size: size, - } -} - -func (inc *Queue) Last() float64 { - if len(inc.arr) == 0 { - return 0 - } - return inc.arr[len(inc.arr)-1] -} - -func (inc *Queue) Index(i int) float64 { - if len(inc.arr)-i-1 < 0 { - return 0 - } - return inc.arr[len(inc.arr)-i-1] -} - -func (inc *Queue) Length() int { - return len(inc.arr) -} - -func (inc *Queue) Update(v float64) { - inc.arr = append(inc.arr, v) - if len(inc.arr) > inc.size { - inc.arr = inc.arr[len(inc.arr)-inc.size:] - } -} - type HeikinAshi struct { - Close *Queue - Open *Queue - High *Queue - Low *Queue - Volume *Queue + Close *types.Queue + Open *types.Queue + High *types.Queue + Low *types.Queue + Volume *types.Queue } func NewHeikinAshi(size int) *HeikinAshi { return &HeikinAshi{ - Close: NewQueue(size), - Open: NewQueue(size), - High: NewQueue(size), - Low: NewQueue(size), - Volume: NewQueue(size), + Close: types.NewQueue(size), + Open: types.NewQueue(size), + High: types.NewQueue(size), + Low: types.NewQueue(size), + Volume: types.NewQueue(size), } } diff --git a/pkg/strategy/factorzoo/strategy.go b/pkg/strategy/factorzoo/strategy.go index 70926f758..0e86843ef 100644 --- a/pkg/strategy/factorzoo/strategy.go +++ b/pkg/strategy/factorzoo/strategy.go @@ -9,6 +9,7 @@ import ( "github.com/c9s/bbgo/pkg/bbgo" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/indicator" "github.com/c9s/bbgo/pkg/types" ) @@ -57,6 +58,10 @@ func (s *Strategy) ID() string { return ID } +func (s *Strategy) InstanceID() string { + return fmt.Sprintf("%s:%s", ID, s.Symbol) +} + func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) { log.Infof("subscribe %s", s.Symbol) session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval}) @@ -149,8 +154,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // s.pvDivergence.OnUpdate(func(corr float64) { // //fmt.Printf("now we've got corr: %f\n", corr) // }) + drift := &indicator.Drift{IntervalWindow: types.IntervalWindow{Window: 14, Interval: s.Interval}} + drift.Bind(st) - s.Alpha = [][]float64{{}, {}, {}, {}, {}} + s.Alpha = [][]float64{{}, {}, {}, {}, {}, {}} s.Ret = []float64{} // thetas := []float64{0, 0, 0, 0} preCompute := 0 @@ -172,6 +179,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se log.Infof("connected") }) + s.T = 20 + session.MarketDataStream.OnKLineClosed(func(kline types.KLine) { if kline.Symbol != s.Symbol || kline.Interval != s.Interval { @@ -193,12 +202,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se // opening gap ogap := kline.Open.Div(s.prevClose) + driftVal := drift.Last() + log.Infof("corr: %f, rev: %f, a150: %f, mom: %f, ogap: %f", corr.Float64(), rev.Float64(), a150.Float64(), mom.Float64(), ogap.Float64()) s.Alpha[0] = append(s.Alpha[0], corr.Float64()) s.Alpha[1] = append(s.Alpha[1], rev.Float64()) s.Alpha[2] = append(s.Alpha[2], a150.Float64()) s.Alpha[3] = append(s.Alpha[3], mom.Float64()) s.Alpha[4] = append(s.Alpha[4], ogap.Float64()) + s.Alpha[5] = append(s.Alpha[5], driftVal) // s.Alpha[5] = append(s.Alpha[4], 1.0) // constant @@ -207,7 +219,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se log.Infof("Current Return: %f", s.Ret[len(s.Ret)-1]) // accumulate enough data for cross-sectional regression, not time-series regression - s.T = 20 if preCompute < int(s.T)+1 { preCompute++ } else { @@ -221,10 +232,18 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se r.SetVar(2, "A150") r.SetVar(3, "Mom") r.SetVar(4, "OGap") + r.SetVar(5, "Drift") var rdp regression.DataPoints for i := 1; i <= int(s.T); i++ { // alphas[t-1], previous alphas, dot not take current alpha into account, will cause look-ahead bias - as := []float64{s.Alpha[0][len(s.Alpha[0])-(i+2)], s.Alpha[1][len(s.Alpha[1])-(i+2)], s.Alpha[2][len(s.Alpha[2])-(i+2)], s.Alpha[3][len(s.Alpha[3])-(i+2)], s.Alpha[4][len(s.Alpha[4])-(i+2)]} + as := []float64{ + s.Alpha[0][len(s.Alpha[0])-(i+2)], + s.Alpha[1][len(s.Alpha[1])-(i+2)], + s.Alpha[2][len(s.Alpha[2])-(i+2)], + s.Alpha[3][len(s.Alpha[3])-(i+2)], + s.Alpha[4][len(s.Alpha[4])-(i+2)], + s.Alpha[5][len(s.Alpha[5])-(i+2)], + } // alphas[t], current return rate rt := s.Ret[len(s.Ret)-(i+1)] rdp = append(rdp, regression.DataPoint(rt, as)) @@ -234,7 +253,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se r.Run() fmt.Printf("Regression formula:\n%v\n", r.Formula) // prediction := r.Coeff(0)*corr.Float64() + r.Coeff(1)*rev.Float64() + r.Coeff(2)*factorzoo.Float64() + r.Coeff(3)*mom.Float64() + r.Coeff(4) - prediction, _ := r.Predict([]float64{corr.Float64(), rev.Float64(), a150.Float64(), mom.Float64(), ogap.Float64()}) + prediction, _ := r.Predict([]float64{ + corr.Float64(), + rev.Float64(), + a150.Float64(), + mom.Float64(), + ogap.Float64(), + driftVal, + }) log.Infof("Predicted Return: %f", prediction) s.placeOrders(ctx, orderExecutor, fixedpoint.NewFromFloat(prediction)) diff --git a/pkg/types/indicator.go b/pkg/types/indicator.go index 393fe28c1..d987a46f0 100644 --- a/pkg/types/indicator.go +++ b/pkg/types/indicator.go @@ -8,6 +8,47 @@ import ( "gonum.org/v1/gonum/stat" ) +// Super basic Series type that simply holds the float64 data +// with size limit (the only difference compare to float64slice) +type Queue struct { + arr []float64 + size int +} + +func NewQueue(size int) *Queue { + return &Queue{ + arr: make([]float64, 0, size), + size: size, + } +} + +func (inc *Queue) Last() float64 { + if len(inc.arr) == 0 { + return 0 + } + return inc.arr[len(inc.arr)-1] +} + +func (inc *Queue) Index(i int) float64 { + if len(inc.arr)-i-1 < 0 { + return 0 + } + return inc.arr[len(inc.arr)-i-1] +} + +func (inc *Queue) Length() int { + return len(inc.arr) +} + +func (inc *Queue) Update(v float64) { + inc.arr = append(inc.arr, v) + if len(inc.arr) > inc.size { + inc.arr = inc.arr[len(inc.arr)-inc.size:] + } +} + +var _ Series = &Queue{} + // Float64Indicator is the indicators (SMA and EWMA) that we want to use are returning float64 data. type Float64Indicator interface { Last() float64