diff --git a/config/pivotshort_optimizer.yaml b/config/pivotshort_optimizer.yaml index bc226dfe4..1ab25cb8c 100644 --- a/config/pivotshort_optimizer.yaml +++ b/config/pivotshort_optimizer.yaml @@ -10,12 +10,12 @@ matrix: path: '/exchangeStrategies/0/pivotshort/interval' values: [ "1m", "5m", "30m" ] -#- type: range -# path: '/exchangeStrategies/0/pivotshort/pivotLength' -# label: pivotLength -# min: 100.0 -# max: 200.0 -# step: 20.0 +- type: range + path: '/exchangeStrategies/0/pivotshort/window' + label: window + min: 100.0 + max: 200.0 + step: 20.0 # - type: range # path: '/exchangeStrategies/0/pivotshort/breakLow/stopEMARange' diff --git a/pkg/optimizer/grid.go b/pkg/optimizer/grid.go index a9b11efdb..ebf961b81 100644 --- a/pkg/optimizer/grid.go +++ b/pkg/optimizer/grid.go @@ -1,6 +1,7 @@ package optimizer import ( + "context" "encoding/json" "fmt" "sort" @@ -170,26 +171,17 @@ func (o *GridOptimizer) Run(executor Executor, configJson []byte) (map[string][] var metrics = map[string][]Metric{} var ops = o.buildOps() + + var taskC = make(chan BacktestTask, 100) + var app = func(configJson []byte, next func(configJson []byte) error) error { - summaryReport, err := executor.Execute(configJson) - if err != nil { - return err - } - var labels = copyLabels(o.ParamLabels) - var currentParams = copyParams(o.CurrentParams) - for metricName, metricFunc := range valueFunctions { - var metricValue = metricFunc(summaryReport) - - metrics[metricName] = append(metrics[metricName], Metric{ - Params: currentParams, - Labels: labels, - Value: metricValue, - }) - - log.Infof("params: %+v => %s %+v", currentParams, metricName, metricValue) + var params = copyParams(o.CurrentParams) + taskC <- BacktestTask{ + ConfigJson: configJson, + Params: params, + Labels: labels, } - return nil } @@ -207,7 +199,27 @@ func (o *GridOptimizer) Run(executor Executor, configJson []byte) (map[string][] } } - err := wrapper(configJson) + resultsC, err := executor.Run(context.Background(), taskC) + if err != nil { + return nil, err + } + + if err := wrapper(configJson); err != nil { + return nil, err + } + close(taskC) // this will shut down the executor + + for result := range resultsC { + for metricName, metricFunc := range valueFunctions { + var metricValue = metricFunc(result.Report) + log.Infof("params: %+v => %s %+v", result.Params, metricName, metricValue) + metrics[metricName] = append(metrics[metricName], Metric{ + Params: result.Params, + Labels: result.Labels, + Value: metricValue, + }) + } + } for n := range metrics { sort.Slice(metrics[n], func(i, j int) bool { diff --git a/pkg/optimizer/local.go b/pkg/optimizer/local.go index eff8632c2..3740cc925 100644 --- a/pkg/optimizer/local.go +++ b/pkg/optimizer/local.go @@ -1,10 +1,12 @@ package optimizer import ( + "context" "encoding/json" "os" "os/exec" "strings" + "sync" "github.com/sirupsen/logrus" "gopkg.in/yaml.v3" @@ -14,8 +16,17 @@ import ( var log = logrus.WithField("component", "optimizer") +type BacktestTask struct { + ConfigJson []byte + Params []interface{} + Labels []string + Report *backtest.SummaryReport + Error error +} + type Executor interface { - Execute(configJson []byte) (*backtest.SummaryReport, error) + // Execute(configJson []byte) (*backtest.SummaryReport, error) + Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error) } type AsyncHandle struct { @@ -38,7 +49,7 @@ func (e *LocalProcessExecutor) ExecuteAsync(configJson []byte) *AsyncHandle { go func() { defer close(handle.Done) - report, err := e.Execute(configJson) + report, err := e.execute(configJson) handle.Error = err handle.Report = report }() @@ -61,7 +72,57 @@ func (e *LocalProcessExecutor) readReport(output []byte) (*backtest.SummaryRepor return summaryReport, nil } -func (e *LocalProcessExecutor) Execute(configJson []byte) (*backtest.SummaryReport, error) { +func (e *LocalProcessExecutor) Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error) { + var maxNumOfProcess = 5 + var resultsC = make(chan BacktestTask, 10) + + wg := sync.WaitGroup{} + wg.Add(maxNumOfProcess) + + go func() { + wg.Wait() + close(resultsC) + }() + + for i := 0; i < maxNumOfProcess; i++ { + // fork workers + go func(id int, taskC chan BacktestTask) { + taskCnt := 0 + log.Infof("starting local worker #%d", id) + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + + case task, ok := <-taskC: + if !ok { + return + } + + taskCnt++ + log.Infof("local worker #%d received param task: %v", id, task.Params) + + report, err := e.execute(task.ConfigJson) + if err != nil { + log.WithError(err).Errorf("execute error") + } + + task.Error = err + task.Report = report + + resultsC <- task + } + } + }(i+1, taskC) + } + + return resultsC, nil +} + +// execute runs the config json and returns the summary report +// this is a blocking operation +func (e *LocalProcessExecutor) execute(configJson []byte) (*backtest.SummaryReport, error) { tf, err := jsonToYamlConfig(e.ConfigDir, configJson) if err != nil { return nil, err diff --git a/utils/embed/main.go b/utils/embed/main.go index 2d1720953..1d45db115 100644 --- a/utils/embed/main.go +++ b/utils/embed/main.go @@ -90,7 +90,7 @@ func (f *file) Sys() interface{} { return nil } func Embed(file string, dirs ...string) error { var buf bytes.Buffer - // Execute template + // execute template if err := tmpl.Execute(&buf, struct { Package string Tag string