bbgo_origin/pkg/optimizer/local.go

169 lines
3.4 KiB
Go
Raw Normal View History

package optimizer
import (
"context"
"encoding/json"
"os"
"os/exec"
2022-05-19 12:31:25 +00:00
"strings"
"sync"
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
2022-05-19 12:31:25 +00:00
"github.com/c9s/bbgo/pkg/backtest"
)
var log = logrus.WithField("component", "optimizer")
type BacktestTask struct {
ConfigJson []byte
Params []interface{}
Labels []string
Report *backtest.SummaryReport
Error error
}
type Executor interface {
// Execute(configJson []byte) (*backtest.SummaryReport, error)
Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error)
}
type AsyncHandle struct {
Error error
Report *backtest.SummaryReport
Done chan struct{}
}
type LocalProcessExecutor struct {
Config *LocalExecutorConfig
2022-05-19 12:31:25 +00:00
Bin string
WorkDir string
ConfigDir string
OutputDir string
}
func (e *LocalProcessExecutor) ExecuteAsync(configJson []byte) *AsyncHandle {
handle := &AsyncHandle{
Done: make(chan struct{}),
}
go func() {
defer close(handle.Done)
report, err := e.execute(configJson)
handle.Error = err
handle.Report = report
}()
return handle
}
func (e *LocalProcessExecutor) readReport(output []byte) (*backtest.SummaryReport, error) {
summaryReportFilepath := strings.TrimSpace(string(output))
_, err := os.Stat(summaryReportFilepath)
if os.IsNotExist(err) {
2022-05-19 12:31:25 +00:00
return nil, err
}
summaryReport, err := backtest.ReadSummaryReport(summaryReportFilepath)
if err != nil {
2022-05-19 12:31:25 +00:00
return nil, err
}
return summaryReport, nil
}
func (e *LocalProcessExecutor) Run(ctx context.Context, taskC chan BacktestTask) (chan BacktestTask, error) {
var maxNumOfProcess = e.Config.MaxNumberOfProcesses
var resultsC = make(chan BacktestTask, maxNumOfProcess*2)
wg := sync.WaitGroup{}
wg.Add(maxNumOfProcess)
go func() {
wg.Wait()
close(resultsC)
}()
for i := 0; i < maxNumOfProcess; i++ {
// fork workers
go func(id int, taskC chan BacktestTask) {
taskCnt := 0
log.Infof("starting local worker #%d", id)
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task, ok := <-taskC:
if !ok {
return
}
taskCnt++
log.Infof("local worker #%d received param task: %v", id, task.Params)
report, err := e.execute(task.ConfigJson)
if err != nil {
log.WithError(err).Errorf("execute error")
}
task.Error = err
task.Report = report
resultsC <- task
}
}
}(i+1, taskC)
}
return resultsC, nil
}
// execute runs the config json and returns the summary report
// this is a blocking operation
func (e *LocalProcessExecutor) execute(configJson []byte) (*backtest.SummaryReport, error) {
tf, err := jsonToYamlConfig(e.ConfigDir, configJson)
if err != nil {
return nil, err
}
c := exec.Command(e.Bin, "backtest", "--config", tf.Name(), "--output", e.OutputDir, "--subdir")
output, err := c.Output()
if err != nil {
2022-05-19 12:31:25 +00:00
return nil, err
}
return e.readReport(output)
}
// jsonToYamlConfig translate json format config into a YAML format config file
// The generated file is a temp file
func jsonToYamlConfig(dir string, configJson []byte) (*os.File, error) {
var o map[string]interface{}
if err := json.Unmarshal(configJson, &o); err != nil {
2022-05-19 12:31:25 +00:00
return nil, err
}
yamlConfig, err := yaml.Marshal(o)
if err != nil {
2022-06-20 03:07:48 +00:00
return nil, err
}
tf, err := os.CreateTemp(dir, "bbgo-*.yaml")
2022-05-19 12:31:25 +00:00
if err != nil {
return nil, err
}
if _, err = tf.Write(yamlConfig); err != nil {
2022-05-19 12:31:25 +00:00
return nil, err
}
if err := tf.Close(); err != nil {
2022-05-19 12:31:25 +00:00
return nil, err
}
return tf, nil
}