2022-05-19 10:23:12 +00:00
package optimizer
import (
2022-07-29 08:22:36 +00:00
"bufio"
"bytes"
2022-06-20 09:18:05 +00:00
"context"
2022-05-19 10:23:12 +00:00
"encoding/json"
2022-06-29 08:17:43 +00:00
"fmt"
2022-05-19 10:23:12 +00:00
"os"
"os/exec"
2022-05-19 12:31:25 +00:00
"strings"
2022-06-20 09:18:05 +00:00
"sync"
2022-05-19 10:23:12 +00:00
2022-07-06 17:16:53 +00:00
"github.com/cheggaaa/pb/v3"
2022-09-15 17:53:23 +00:00
"github.com/pkg/errors"
2022-07-06 17:16:53 +00:00
2022-05-19 10:23:12 +00:00
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
2022-05-19 12:31:25 +00:00
"github.com/c9s/bbgo/pkg/backtest"
2022-05-19 10:23:12 +00:00
)
var log = logrus . WithField ( "component" , "optimizer" )
2022-06-20 09:18:05 +00:00
type BacktestTask struct {
ConfigJson [ ] byte
Params [ ] interface { }
Labels [ ] string
Report * backtest . SummaryReport
Error error
}
2022-05-19 10:23:12 +00:00
type Executor interface {
2022-07-29 08:22:36 +00:00
Execute ( configJson [ ] byte ) ( * backtest . SummaryReport , error )
2022-06-29 08:17:43 +00:00
Run ( ctx context . Context , taskC chan BacktestTask , bar * pb . ProgressBar ) ( chan BacktestTask , error )
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:20:26 +00:00
type AsyncHandle struct {
Error error
Report * backtest . SummaryReport
Done chan struct { }
}
2022-05-19 10:23:12 +00:00
type LocalProcessExecutor struct {
2022-06-21 04:31:42 +00:00
Config * LocalExecutorConfig
2022-05-19 12:31:25 +00:00
Bin string
WorkDir string
ConfigDir string
OutputDir string
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:20:26 +00:00
func ( e * LocalProcessExecutor ) ExecuteAsync ( configJson [ ] byte ) * AsyncHandle {
handle := & AsyncHandle {
Done : make ( chan struct { } ) ,
}
go func ( ) {
2022-06-20 03:54:55 +00:00
defer close ( handle . Done )
2022-07-29 08:22:36 +00:00
report , err := e . Execute ( configJson )
2022-06-20 03:20:26 +00:00
handle . Error = err
handle . Report = report
} ( )
return handle
}
2022-07-29 08:22:36 +00:00
func ( e * LocalProcessExecutor ) readReport ( reportPath string ) ( * backtest . SummaryReport , error ) {
summaryReportFilepath := strings . TrimSpace ( reportPath )
2022-06-20 03:54:55 +00:00
_ , err := os . Stat ( summaryReportFilepath )
if os . IsNotExist ( err ) {
2022-05-19 12:31:25 +00:00
return nil , err
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:54:55 +00:00
summaryReport , err := backtest . ReadSummaryReport ( summaryReportFilepath )
2022-05-19 10:23:12 +00:00
if err != nil {
2022-05-19 12:31:25 +00:00
return nil , err
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:54:55 +00:00
return summaryReport , nil
}
2022-07-13 07:28:11 +00:00
// Prepare prepares the environment for the following back tests
// this is a blocking operation
func ( e * LocalProcessExecutor ) Prepare ( configJson [ ] byte ) error {
2022-09-15 17:53:23 +00:00
log . Debugln ( "syncing backtest data before starting backtests..." )
2022-07-13 07:28:11 +00:00
tf , err := jsonToYamlConfig ( e . ConfigDir , configJson )
if err != nil {
return err
}
c := exec . Command ( e . Bin , "backtest" , "--sync" , "--sync-only" , "--config" , tf . Name ( ) )
2022-09-15 17:53:23 +00:00
output , err := c . Output ( )
if err != nil {
return errors . Wrapf ( err , "failed to sync backtest data: %s" , string ( output ) )
}
return nil
2022-07-13 07:28:11 +00:00
}
2022-06-29 08:17:43 +00:00
func ( e * LocalProcessExecutor ) Run ( ctx context . Context , taskC chan BacktestTask , bar * pb . ProgressBar ) ( chan BacktestTask , error ) {
2022-06-21 04:31:42 +00:00
var maxNumOfProcess = e . Config . MaxNumberOfProcesses
2022-06-21 03:51:20 +00:00
var resultsC = make ( chan BacktestTask , maxNumOfProcess * 2 )
2022-06-20 09:18:05 +00:00
wg := sync . WaitGroup { }
wg . Add ( maxNumOfProcess )
go func ( ) {
wg . Wait ( )
close ( resultsC )
} ( )
for i := 0 ; i < maxNumOfProcess ; i ++ {
// fork workers
go func ( id int , taskC chan BacktestTask ) {
taskCnt := 0
2022-06-29 08:17:43 +00:00
bar . Set ( "log" , fmt . Sprintf ( "starting local worker #%d" , id ) )
bar . Write ( )
2022-06-20 09:18:05 +00:00
defer wg . Done ( )
for {
select {
case <- ctx . Done ( ) :
return
case task , ok := <- taskC :
if ! ok {
return
}
taskCnt ++
2022-06-29 08:17:43 +00:00
bar . Set ( "log" , fmt . Sprintf ( "local worker #%d received param task: %v" , id , task . Params ) )
bar . Write ( )
2022-06-20 09:18:05 +00:00
2022-07-29 08:22:36 +00:00
report , err := e . Execute ( task . ConfigJson )
2022-06-20 09:18:05 +00:00
if err != nil {
2022-07-06 17:16:53 +00:00
if err2 , ok := err . ( * exec . ExitError ) ; ok {
log . WithError ( err ) . Errorf ( "execute error: %s" , err2 . Stderr )
} else {
log . WithError ( err ) . Errorf ( "execute error" )
}
2022-06-20 09:18:05 +00:00
}
task . Error = err
task . Report = report
resultsC <- task
}
}
} ( i + 1 , taskC )
}
return resultsC , nil
}
2022-07-29 08:22:36 +00:00
// Execute runs the config json and returns the summary report. This is a blocking operation.
func ( e * LocalProcessExecutor ) Execute ( configJson [ ] byte ) ( * backtest . SummaryReport , error ) {
2022-06-20 03:54:55 +00:00
tf , err := jsonToYamlConfig ( e . ConfigDir , configJson )
if err != nil {
2022-06-20 03:20:26 +00:00
return nil , err
}
2022-06-20 03:54:55 +00:00
c := exec . Command ( e . Bin , "backtest" , "--config" , tf . Name ( ) , "--output" , e . OutputDir , "--subdir" )
output , err := c . Output ( )
2022-05-19 10:23:12 +00:00
if err != nil {
2022-07-29 08:22:36 +00:00
log . WithError ( err ) . WithField ( "command" , [ ] string { e . Bin , "backtest" , "--config" , tf . Name ( ) , "--output" , e . OutputDir , "--subdir" } ) . Errorf ( "failed to execute backtest" )
2022-05-19 12:31:25 +00:00
return nil , err
2022-05-19 10:23:12 +00:00
}
2022-07-29 08:22:36 +00:00
// the last line is the report path
scanner := bufio . NewScanner ( bytes . NewBuffer ( output ) )
var reportFilePath string
for scanner . Scan ( ) {
reportFilePath = scanner . Text ( )
}
return e . readReport ( reportFilePath )
2022-06-20 03:20:26 +00:00
}
2022-06-20 06:52:40 +00:00
// jsonToYamlConfig translate json format config into a YAML format config file
// The generated file is a temp file
2022-06-20 03:20:26 +00:00
func jsonToYamlConfig ( dir string , configJson [ ] byte ) ( * os . File , error ) {
var o map [ string ] interface { }
if err := json . Unmarshal ( configJson , & o ) ; err != nil {
2022-05-19 12:31:25 +00:00
return nil , err
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:20:26 +00:00
yamlConfig , err := yaml . Marshal ( o )
if err != nil {
2022-06-20 03:07:48 +00:00
return nil , err
}
2022-06-20 03:20:26 +00:00
tf , err := os . CreateTemp ( dir , "bbgo-*.yaml" )
2022-05-19 12:31:25 +00:00
if err != nil {
return nil , err
}
2022-06-20 03:20:26 +00:00
if _ , err = tf . Write ( yamlConfig ) ; err != nil {
2022-05-19 12:31:25 +00:00
return nil , err
}
2022-06-20 03:20:26 +00:00
if err := tf . Close ( ) ; err != nil {
2022-05-19 12:31:25 +00:00
return nil , err
2022-05-19 10:23:12 +00:00
}
2022-06-20 03:20:26 +00:00
return tf , nil
2022-05-19 10:23:12 +00:00
}