Merge pull request #749 from c9s/improve/optimizer-local-proc

improve: add parallel local process executor for optimizer
This commit is contained in:
Yo-An Lin 2022-06-20 21:47:06 +08:00 committed by GitHub
commit 74e8540550
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 247 additions and 91 deletions

View File

@ -5,43 +5,43 @@
---
matrix:
#- type: iterate
# label: interval
# path: '/exchangeStrategies/0/pivotshort/interval'
# values: [ "5m", "30m" ]
- type: iterate
label: interval
path: '/exchangeStrategies/0/pivotshort/interval'
values: [ "1m", "5m", "30m" ]
- type: range
path: '/exchangeStrategies/0/pivotshort/window'
label: window
min: 100.0
max: 200.0
step: 20.0
# - type: range
# path: '/exchangeStrategies/0/pivotshort/pivotLength'
# label: pivotLength
# min: 100.0
# max: 200.0
# step: 20.0
- type: range
path: '/exchangeStrategies/0/pivotshort/breakLow/stopEMARange'
label: stopEMARange
min: 0%
max: 10%
step: 1%
- type: range
path: '/exchangeStrategies/0/pivotshort/exit/roiStopLossPercentage'
label: roiStopLossPercentage
min: 0.5%
max: 2%
step: 0.5%
- type: range
path: '/exchangeStrategies/0/pivotshort/exit/roiTakeProfitPercentage'
label: roiTakeProfitPercentage
min: 10%
max: 50%
step: 5%
- type: range
path: '/exchangeStrategies/0/pivotshort/exit/roiMinTakeProfitPercentage'
label: roiMinTakeProfitPercentage
min: 3%
max: 10%
step: 1%
# path: '/exchangeStrategies/0/pivotshort/breakLow/stopEMARange'
# label: stopEMARange
# min: 0%
# max: 10%
# step: 1%
# - type: range
# path: '/exchangeStrategies/0/pivotshort/exit/roiStopLossPercentage'
# label: roiStopLossPercentage
# min: 0.5%
# max: 2%
# step: 0.5%
#
# - type: range
# path: '/exchangeStrategies/0/pivotshort/exit/roiTakeProfitPercentage'
# label: roiTakeProfitPercentage
# min: 10%
# max: 50%
# step: 5%
#
# - type: range
# path: '/exchangeStrategies/0/pivotshort/exit/roiMinTakeProfitPercentage'
# label: roiMinTakeProfitPercentage
# min: 3%
# max: 10%
# step: 1%
#

View File

@ -104,12 +104,15 @@ var optimizeCmd = &cobra.Command{
// print metrics JSON to stdout
fmt.Println(string(out))
} else {
if len(metrics) > 0 {
fmt.Printf("%v\n", metrics[0].Labels)
for n, values := range metrics {
if len(values) == 0 {
continue
}
for _, m := range metrics {
fmt.Printf("%v => %v\n", m.Params, m.Value)
fmt.Printf("%v => %s\n", values[0].Labels, n)
for _, m := range values {
fmt.Printf("%v => %s %v\n", m.Params, n, m.Value)
}
}
}

View File

@ -1,6 +1,7 @@
package optimizer
import (
"context"
"encoding/json"
"fmt"
"sort"
@ -23,6 +24,18 @@ type Metric struct {
Value fixedpoint.Value `json:"value,omitempty"`
}
func copyParams(params []interface{}) []interface{} {
var c = make([]interface{}, len(params))
copy(c, params)
return c
}
func copyLabels(labels []string) []string {
var c = make([]string, len(labels))
copy(c, labels)
return c
}
type GridOptimizer struct {
Config *Config
@ -149,31 +162,26 @@ func (o *GridOptimizer) buildOps() []OpFunc {
return ops
}
func (o *GridOptimizer) Run(executor Executor, configJson []byte) ([]Metric, error) {
func (o *GridOptimizer) Run(executor Executor, configJson []byte) (map[string][]Metric, error) {
o.CurrentParams = make([]interface{}, len(o.Config.Matrix))
var metrics []Metric
var valueFunctions = map[string]MetricValueFunc{
"totalProfit": TotalProfitMetricValueFunc,
}
var metrics = map[string][]Metric{}
var ops = o.buildOps()
var taskC = make(chan BacktestTask, 100)
var app = func(configJson []byte, next func(configJson []byte) error) error {
summaryReport, err := executor.Execute(configJson)
if err != nil {
return err
var labels = copyLabels(o.ParamLabels)
var params = copyParams(o.CurrentParams)
taskC <- BacktestTask{
ConfigJson: configJson,
Params: params,
Labels: labels,
}
// TODO: Add more metric value function
metricValue := TotalProfitMetricValueFunc(summaryReport)
var currentParams = make([]interface{}, len(o.CurrentParams))
copy(currentParams, o.CurrentParams)
metrics = append(metrics, Metric{
Params: currentParams,
Labels: o.ParamLabels,
Value: metricValue,
})
log.Infof("current params: %+v => %+v", currentParams, metricValue)
return nil
}
@ -191,13 +199,36 @@ func (o *GridOptimizer) Run(executor Executor, configJson []byte) ([]Metric, err
}
}
err := wrapper(configJson)
resultsC, err := executor.Run(context.Background(), taskC)
if err != nil {
return nil, err
}
sort.Slice(metrics, func(i, j int) bool {
a := metrics[i].Value
b := metrics[j].Value
if err := wrapper(configJson); err != nil {
return nil, err
}
close(taskC) // this will shut down the executor
for result := range resultsC {
for metricName, metricFunc := range valueFunctions {
var metricValue = metricFunc(result.Report)
log.Infof("params: %+v => %s %+v", result.Params, metricName, metricValue)
metrics[metricName] = append(metrics[metricName], Metric{
Params: result.Params,
Labels: result.Labels,
Value: metricValue,
})
}
}
for n := range metrics {
sort.Slice(metrics[n], func(i, j int) bool {
a := metrics[n][i].Value
b := metrics[n][j].Value
return a.Compare(b) > 0
})
}
return metrics, err
}

View File

@ -1,10 +1,12 @@
package optimizer
import (
"context"
"encoding/json"
"os"
"os/exec"
"strings"
"sync"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
@ -14,8 +16,23 @@ import (
var log = logrus.WithField("component", "optimizer")
type BacktestTask struct {
ConfigJson []byte
Params []interface{}
Labels []string
Report *backtest.SummaryReport
Error error
}
type Executor interface {
Execute(configJson []byte) (*backtest.SummaryReport, error)
// Execute(configJson []byte) (*backtest.SummaryReport, error)
Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error)
}
type AsyncHandle struct {
Error error
Report *backtest.SummaryReport
Done chan struct{}
}
type LocalProcessExecutor struct {
@ -25,34 +42,24 @@ type LocalProcessExecutor struct {
OutputDir string
}
func (e *LocalProcessExecutor) Execute(configJson []byte) (*backtest.SummaryReport, error) {
var o map[string]interface{}
if err := json.Unmarshal(configJson, &o); err != nil {
return nil, err
func (e *LocalProcessExecutor) ExecuteAsync(configJson []byte) *AsyncHandle {
handle := &AsyncHandle{
Done: make(chan struct{}),
}
yamlConfig, err := yaml.Marshal(o)
if err != nil {
return nil, err
}
tf, err := os.CreateTemp(e.ConfigDir, "bbgo-*.yaml")
if err != nil {
return nil, err
}
if _, err = tf.Write(yamlConfig); err != nil {
return nil, err
}
c := exec.Command(e.Bin, "backtest", "--config", tf.Name(), "--output", e.OutputDir, "--subdir")
output, err := c.Output()
if err != nil {
return nil, err
go func() {
defer close(handle.Done)
report, err := e.execute(configJson)
handle.Error = err
handle.Report = report
}()
return handle
}
func (e *LocalProcessExecutor) readReport(output []byte) (*backtest.SummaryReport, error) {
summaryReportFilepath := strings.TrimSpace(string(output))
_, err = os.Stat(summaryReportFilepath)
_, err := os.Stat(summaryReportFilepath)
if os.IsNotExist(err) {
return nil, err
}
@ -64,3 +71,97 @@ func (e *LocalProcessExecutor) Execute(configJson []byte) (*backtest.SummaryRepo
return summaryReport, nil
}
func (e *LocalProcessExecutor) Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error) {
var maxNumOfProcess = 5
var resultsC = make(chan BacktestTask, 10)
wg := sync.WaitGroup{}
wg.Add(maxNumOfProcess)
go func() {
wg.Wait()
close(resultsC)
}()
for i := 0; i < maxNumOfProcess; i++ {
// fork workers
go func(id int, taskC chan BacktestTask) {
taskCnt := 0
log.Infof("starting local worker #%d", id)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-taskC:
if !ok {
return
}
taskCnt++
log.Infof("local worker #%d received param task: %v", id, task.Params)
report, err := e.execute(task.ConfigJson)
if err != nil {
log.WithError(err).Errorf("execute error")
}
task.Error = err
task.Report = report
resultsC <- task
}
}
}(i+1, taskC)
}
return resultsC, nil
}
// execute runs the config json and returns the summary report
// this is a blocking operation
func (e *LocalProcessExecutor) execute(configJson []byte) (*backtest.SummaryReport, error) {
tf, err := jsonToYamlConfig(e.ConfigDir, configJson)
if err != nil {
return nil, err
}
c := exec.Command(e.Bin, "backtest", "--config", tf.Name(), "--output", e.OutputDir, "--subdir")
output, err := c.Output()
if err != nil {
return nil, err
}
return e.readReport(output)
}
// jsonToYamlConfig translate json format config into a YAML format config file
// The generated file is a temp file
func jsonToYamlConfig(dir string, configJson []byte) (*os.File, error) {
var o map[string]interface{}
if err := json.Unmarshal(configJson, &o); err != nil {
return nil, err
}
yamlConfig, err := yaml.Marshal(o)
if err != nil {
return nil, err
}
tf, err := os.CreateTemp(dir, "bbgo-*.yaml")
if err != nil {
return nil, err
}
if _, err = tf.Write(yamlConfig); err != nil {
return nil, err
}
if err := tf.Close(); err != nil {
return nil, err
}
return tf, nil
}

View File

@ -0,0 +1,21 @@
package optimizer
import (
"os"
"testing"
"github.com/stretchr/testify/assert"
)
func Test_jsonToYamlConfig(t *testing.T) {
err := os.Mkdir(".tmpconfig", 0755)
assert.NoError(t, err)
tf, err := jsonToYamlConfig(".tmpconfig", []byte(`{
}`))
assert.NoError(t, err)
assert.NotNil(t, tf)
assert.NotEmpty(t, tf.Name())
_ = os.RemoveAll(".tmpconfig")
}

View File

@ -90,7 +90,7 @@ func (f *file) Sys() interface{} { return nil }
func Embed(file string, dirs ...string) error {
var buf bytes.Buffer
// Execute template
// execute template
if err := tmpl.Execute(&buf, struct {
Package string
Tag string