feature: add drift indicator, split heikinashi's Queue

This commit is contained in:
zenix 2022-06-07 20:46:31 +09:00 committed by Austin Liu
parent 792e67e982
commit 9dd8dbbede
5 changed files with 178 additions and 51 deletions

82
pkg/indicator/drift.go Normal file
View File

@ -0,0 +1,82 @@
package indicator
import (
"math"
"github.com/c9s/bbgo/pkg/types"
)
// Refer: https://tradingview.com/script/aDymGrFx-Drift-Study-Inspired-by-Monte-Carlo-Simulations-with-BM-KL/
// Brownian Motion's drift factor
// could be used in Monte Carlo Simulations
//go:generate callbackgen -type Drift
type Drift struct {
types.IntervalWindow
chng *types.Queue
Values types.Float64Slice
SMA *SMA
LastValue float64
UpdateCallbacks []func(value float64)
}
func (inc *Drift) Update(value float64) {
if inc.chng == nil {
inc.SMA = &SMA{IntervalWindow: types.IntervalWindow{inc.Interval, inc.Window}}
inc.chng = types.NewQueue(inc.Window)
}
chng := math.Log(value / inc.LastValue)
inc.LastValue = value
inc.SMA.Update(chng)
inc.chng.Update(chng)
stdev := types.Stdev(inc.chng, inc.Window)
drift := inc.SMA.Last() - stdev * stdev * 0.5
inc.Values.Push(drift)
}
func (inc *Drift) Index(i int) float64 {
if inc.Values == nil {
return 0
}
return inc.Values.Index(i)
}
func (inc *Drift) Last() float64 {
if inc.Values.Length() == 0 {
return 0
}
return inc.Values.Last()
}
func (inc *Drift) Length() int {
if inc.Values == nil {
return 0
}
return inc.Values.Length()
}
var _ types.Series = &Drift{}
func (inc *Drift) calculateAndUpdate(allKLines []types.KLine) {
if inc.chng == nil {
for _, k := range allKLines {
inc.Update(k.Close.Float64())
inc.EmitUpdate(inc.Last())
}
} else {
inc.Update(allKLines[len(allKLines)-1].Close.Float64())
inc.EmitUpdate(inc.Last())
}
}
func (inc *Drift) handleKLineWindowUpdate(interval types.Interval, window types.KLineWindow) {
if inc.Interval != interval {
return
}
inc.calculateAndUpdate(window)
}
func (inc *Drift) Bind(updater KLineWindowUpdater) {
updater.OnKLineWindowUpdate(inc.handleKLineWindowUpdate)
}

View File

@ -0,0 +1,15 @@
// Code generated by "callbackgen -type Drift"; DO NOT EDIT.
package indicator
import ()
func (inc *Drift) OnUpdate(cb func(value float64)) {
inc.UpdateCallbacks = append(inc.UpdateCallbacks, cb)
}
func (inc *Drift) EmitUpdate(value float64) {
for _, cb := range inc.UpdateCallbacks {
cb(value)
}
}

View File

@ -7,58 +7,21 @@ import (
"github.com/c9s/bbgo/pkg/types"
)
type Queue struct {
arr []float64
size int
}
func NewQueue(size int) *Queue {
return &Queue{
arr: make([]float64, 0, size),
size: size,
}
}
func (inc *Queue) Last() float64 {
if len(inc.arr) == 0 {
return 0
}
return inc.arr[len(inc.arr)-1]
}
func (inc *Queue) Index(i int) float64 {
if len(inc.arr)-i-1 < 0 {
return 0
}
return inc.arr[len(inc.arr)-i-1]
}
func (inc *Queue) Length() int {
return len(inc.arr)
}
func (inc *Queue) Update(v float64) {
inc.arr = append(inc.arr, v)
if len(inc.arr) > inc.size {
inc.arr = inc.arr[len(inc.arr)-inc.size:]
}
}
type HeikinAshi struct {
Close *Queue
Open *Queue
High *Queue
Low *Queue
Volume *Queue
Close *types.Queue
Open *types.Queue
High *types.Queue
Low *types.Queue
Volume *types.Queue
}
func NewHeikinAshi(size int) *HeikinAshi {
return &HeikinAshi{
Close: NewQueue(size),
Open: NewQueue(size),
High: NewQueue(size),
Low: NewQueue(size),
Volume: NewQueue(size),
Close: types.NewQueue(size),
Open: types.NewQueue(size),
High: types.NewQueue(size),
Low: types.NewQueue(size),
Volume: types.NewQueue(size),
}
}

View File

@ -9,6 +9,7 @@ import (
"github.com/c9s/bbgo/pkg/bbgo"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/indicator"
"github.com/c9s/bbgo/pkg/types"
)
@ -57,6 +58,10 @@ func (s *Strategy) ID() string {
return ID
}
func (s *Strategy) InstanceID() string {
return fmt.Sprintf("%s:%s", ID, s.Symbol)
}
func (s *Strategy) Subscribe(session *bbgo.ExchangeSession) {
log.Infof("subscribe %s", s.Symbol)
session.Subscribe(types.KLineChannel, s.Symbol, types.SubscribeOptions{Interval: s.Interval})
@ -149,8 +154,10 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// s.pvDivergence.OnUpdate(func(corr float64) {
// //fmt.Printf("now we've got corr: %f\n", corr)
// })
drift := &indicator.Drift{IntervalWindow: types.IntervalWindow{Window: 14, Interval: s.Interval}}
drift.Bind(st)
s.Alpha = [][]float64{{}, {}, {}, {}, {}}
s.Alpha = [][]float64{{}, {}, {}, {}, {}, {}}
s.Ret = []float64{}
// thetas := []float64{0, 0, 0, 0}
preCompute := 0
@ -172,6 +179,8 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
log.Infof("connected")
})
s.T = 20
session.MarketDataStream.OnKLineClosed(func(kline types.KLine) {
if kline.Symbol != s.Symbol || kline.Interval != s.Interval {
@ -193,12 +202,15 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
// opening gap
ogap := kline.Open.Div(s.prevClose)
driftVal := drift.Last()
log.Infof("corr: %f, rev: %f, a150: %f, mom: %f, ogap: %f", corr.Float64(), rev.Float64(), a150.Float64(), mom.Float64(), ogap.Float64())
s.Alpha[0] = append(s.Alpha[0], corr.Float64())
s.Alpha[1] = append(s.Alpha[1], rev.Float64())
s.Alpha[2] = append(s.Alpha[2], a150.Float64())
s.Alpha[3] = append(s.Alpha[3], mom.Float64())
s.Alpha[4] = append(s.Alpha[4], ogap.Float64())
s.Alpha[5] = append(s.Alpha[5], driftVal)
// s.Alpha[5] = append(s.Alpha[4], 1.0) // constant
@ -207,7 +219,6 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
log.Infof("Current Return: %f", s.Ret[len(s.Ret)-1])
// accumulate enough data for cross-sectional regression, not time-series regression
s.T = 20
if preCompute < int(s.T)+1 {
preCompute++
} else {
@ -221,10 +232,18 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
r.SetVar(2, "A150")
r.SetVar(3, "Mom")
r.SetVar(4, "OGap")
r.SetVar(5, "Drift")
var rdp regression.DataPoints
for i := 1; i <= int(s.T); i++ {
// alphas[t-1], previous alphas, dot not take current alpha into account, will cause look-ahead bias
as := []float64{s.Alpha[0][len(s.Alpha[0])-(i+2)], s.Alpha[1][len(s.Alpha[1])-(i+2)], s.Alpha[2][len(s.Alpha[2])-(i+2)], s.Alpha[3][len(s.Alpha[3])-(i+2)], s.Alpha[4][len(s.Alpha[4])-(i+2)]}
as := []float64{
s.Alpha[0][len(s.Alpha[0])-(i+2)],
s.Alpha[1][len(s.Alpha[1])-(i+2)],
s.Alpha[2][len(s.Alpha[2])-(i+2)],
s.Alpha[3][len(s.Alpha[3])-(i+2)],
s.Alpha[4][len(s.Alpha[4])-(i+2)],
s.Alpha[5][len(s.Alpha[5])-(i+2)],
}
// alphas[t], current return rate
rt := s.Ret[len(s.Ret)-(i+1)]
rdp = append(rdp, regression.DataPoint(rt, as))
@ -234,7 +253,14 @@ func (s *Strategy) Run(ctx context.Context, orderExecutor bbgo.OrderExecutor, se
r.Run()
fmt.Printf("Regression formula:\n%v\n", r.Formula)
// prediction := r.Coeff(0)*corr.Float64() + r.Coeff(1)*rev.Float64() + r.Coeff(2)*factorzoo.Float64() + r.Coeff(3)*mom.Float64() + r.Coeff(4)
prediction, _ := r.Predict([]float64{corr.Float64(), rev.Float64(), a150.Float64(), mom.Float64(), ogap.Float64()})
prediction, _ := r.Predict([]float64{
corr.Float64(),
rev.Float64(),
a150.Float64(),
mom.Float64(),
ogap.Float64(),
driftVal,
})
log.Infof("Predicted Return: %f", prediction)
s.placeOrders(ctx, orderExecutor, fixedpoint.NewFromFloat(prediction))

View File

@ -8,6 +8,47 @@ import (
"gonum.org/v1/gonum/stat"
)
// Super basic Series type that simply holds the float64 data
// with size limit (the only difference compare to float64slice)
type Queue struct {
arr []float64
size int
}
func NewQueue(size int) *Queue {
return &Queue{
arr: make([]float64, 0, size),
size: size,
}
}
func (inc *Queue) Last() float64 {
if len(inc.arr) == 0 {
return 0
}
return inc.arr[len(inc.arr)-1]
}
func (inc *Queue) Index(i int) float64 {
if len(inc.arr)-i-1 < 0 {
return 0
}
return inc.arr[len(inc.arr)-i-1]
}
func (inc *Queue) Length() int {
return len(inc.arr)
}
func (inc *Queue) Update(v float64) {
inc.arr = append(inc.arr, v)
if len(inc.arr) > inc.size {
inc.arr = inc.arr[len(inc.arr)-inc.size:]
}
}
var _ Series = &Queue{}
// Float64Indicator is the indicators (SMA and EWMA) that we want to use are returning float64 data.
type Float64Indicator interface {
Last() float64