optimizer: add parallel local process worker support for optimizer

This commit is contained in:
c9s 2022-06-20 17:18:05 +08:00
parent 626934a059
commit 6afe2de9f7
No known key found for this signature in database
GPG Key ID: 7385E7E464CB0A54
4 changed files with 101 additions and 28 deletions

View File

@ -10,12 +10,12 @@ matrix:
path: '/exchangeStrategies/0/pivotshort/interval' path: '/exchangeStrategies/0/pivotshort/interval'
values: [ "1m", "5m", "30m" ] values: [ "1m", "5m", "30m" ]
#- type: range - type: range
# path: '/exchangeStrategies/0/pivotshort/pivotLength' path: '/exchangeStrategies/0/pivotshort/window'
# label: pivotLength label: window
# min: 100.0 min: 100.0
# max: 200.0 max: 200.0
# step: 20.0 step: 20.0
# - type: range # - type: range
# path: '/exchangeStrategies/0/pivotshort/breakLow/stopEMARange' # path: '/exchangeStrategies/0/pivotshort/breakLow/stopEMARange'

View File

@ -1,6 +1,7 @@
package optimizer package optimizer
import ( import (
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"sort" "sort"
@ -170,26 +171,17 @@ func (o *GridOptimizer) Run(executor Executor, configJson []byte) (map[string][]
var metrics = map[string][]Metric{} var metrics = map[string][]Metric{}
var ops = o.buildOps() var ops = o.buildOps()
var taskC = make(chan BacktestTask, 100)
var app = func(configJson []byte, next func(configJson []byte) error) error { var app = func(configJson []byte, next func(configJson []byte) error) error {
summaryReport, err := executor.Execute(configJson)
if err != nil {
return err
}
var labels = copyLabels(o.ParamLabels) var labels = copyLabels(o.ParamLabels)
var currentParams = copyParams(o.CurrentParams) var params = copyParams(o.CurrentParams)
for metricName, metricFunc := range valueFunctions { taskC <- BacktestTask{
var metricValue = metricFunc(summaryReport) ConfigJson: configJson,
Params: params,
metrics[metricName] = append(metrics[metricName], Metric{
Params: currentParams,
Labels: labels, Labels: labels,
Value: metricValue,
})
log.Infof("params: %+v => %s %+v", currentParams, metricName, metricValue)
} }
return nil return nil
} }
@ -207,7 +199,27 @@ func (o *GridOptimizer) Run(executor Executor, configJson []byte) (map[string][]
} }
} }
err := wrapper(configJson) resultsC, err := executor.Run(context.Background(), taskC)
if err != nil {
return nil, err
}
if err := wrapper(configJson); err != nil {
return nil, err
}
close(taskC) // this will shut down the executor
for result := range resultsC {
for metricName, metricFunc := range valueFunctions {
var metricValue = metricFunc(result.Report)
log.Infof("params: %+v => %s %+v", result.Params, metricName, metricValue)
metrics[metricName] = append(metrics[metricName], Metric{
Params: result.Params,
Labels: result.Labels,
Value: metricValue,
})
}
}
for n := range metrics { for n := range metrics {
sort.Slice(metrics[n], func(i, j int) bool { sort.Slice(metrics[n], func(i, j int) bool {

View File

@ -1,10 +1,12 @@
package optimizer package optimizer
import ( import (
"context"
"encoding/json" "encoding/json"
"os" "os"
"os/exec" "os/exec"
"strings" "strings"
"sync"
"github.com/sirupsen/logrus" "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3" "gopkg.in/yaml.v3"
@ -14,8 +16,17 @@ import (
var log = logrus.WithField("component", "optimizer") var log = logrus.WithField("component", "optimizer")
type BacktestTask struct {
ConfigJson []byte
Params []interface{}
Labels []string
Report *backtest.SummaryReport
Error error
}
type Executor interface { type Executor interface {
Execute(configJson []byte) (*backtest.SummaryReport, error) // Execute(configJson []byte) (*backtest.SummaryReport, error)
Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error)
} }
type AsyncHandle struct { type AsyncHandle struct {
@ -38,7 +49,7 @@ func (e *LocalProcessExecutor) ExecuteAsync(configJson []byte) *AsyncHandle {
go func() { go func() {
defer close(handle.Done) defer close(handle.Done)
report, err := e.Execute(configJson) report, err := e.execute(configJson)
handle.Error = err handle.Error = err
handle.Report = report handle.Report = report
}() }()
@ -61,7 +72,57 @@ func (e *LocalProcessExecutor) readReport(output []byte) (*backtest.SummaryRepor
return summaryReport, nil return summaryReport, nil
} }
func (e *LocalProcessExecutor) Execute(configJson []byte) (*backtest.SummaryReport, error) { func (e *LocalProcessExecutor) Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error) {
var maxNumOfProcess = 5
var resultsC = make(chan BacktestTask, 10)
wg := sync.WaitGroup{}
wg.Add(maxNumOfProcess)
go func() {
wg.Wait()
close(resultsC)
}()
for i := 0; i < maxNumOfProcess; i++ {
// fork workers
go func(id int, taskC chan BacktestTask) {
taskCnt := 0
log.Infof("starting local worker #%d", id)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-taskC:
if !ok {
return
}
taskCnt++
log.Infof("local worker #%d received param task: %v", id, task.Params)
report, err := e.execute(task.ConfigJson)
if err != nil {
log.WithError(err).Errorf("execute error")
}
task.Error = err
task.Report = report
resultsC <- task
}
}
}(i+1, taskC)
}
return resultsC, nil
}
// execute runs the config json and returns the summary report
// this is a blocking operation
func (e *LocalProcessExecutor) execute(configJson []byte) (*backtest.SummaryReport, error) {
tf, err := jsonToYamlConfig(e.ConfigDir, configJson) tf, err := jsonToYamlConfig(e.ConfigDir, configJson)
if err != nil { if err != nil {
return nil, err return nil, err

View File

@ -90,7 +90,7 @@ func (f *file) Sys() interface{} { return nil }
func Embed(file string, dirs ...string) error { func Embed(file string, dirs ...string) error {
var buf bytes.Buffer var buf bytes.Buffer
// Execute template // execute template
if err := tmpl.Execute(&buf, struct { if err := tmpl.Execute(&buf, struct {
Package string Package string
Tag string Tag string