2024-06-27 14:42:38 +00:00
package optimizer
import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"github.com/pkg/errors"
2024-06-27 15:14:21 +00:00
"git.qtrade.icu/lychiyu/qbtrade/pkg/backtest"
"github.com/cheggaaa/pb/v3"
2024-06-27 14:42:38 +00:00
"github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
var log = logrus . WithField ( "component" , "optimizer" )
type BacktestTask struct {
ConfigJson [ ] byte
Params [ ] interface { }
Labels [ ] string
Report * backtest . SummaryReport
Error error
}
type Executor interface {
Execute ( configJson [ ] byte ) ( * backtest . SummaryReport , error )
Run ( ctx context . Context , taskC chan BacktestTask , bar * pb . ProgressBar ) ( chan BacktestTask , error )
}
type AsyncHandle struct {
Error error
Report * backtest . SummaryReport
Done chan struct { }
}
type LocalProcessExecutor struct {
Config * LocalExecutorConfig
Bin string
WorkDir string
ConfigDir string
OutputDir string
}
func ( e * LocalProcessExecutor ) ExecuteAsync ( configJson [ ] byte ) * AsyncHandle {
handle := & AsyncHandle {
Done : make ( chan struct { } ) ,
}
go func ( ) {
defer close ( handle . Done )
report , err := e . Execute ( configJson )
handle . Error = err
handle . Report = report
} ( )
return handle
}
func ( e * LocalProcessExecutor ) readReport ( reportPath string ) ( * backtest . SummaryReport , error ) {
summaryReportFilepath := strings . TrimSpace ( reportPath )
_ , err := os . Stat ( summaryReportFilepath )
if os . IsNotExist ( err ) {
return nil , err
}
summaryReport , err := backtest . ReadSummaryReport ( summaryReportFilepath )
if err != nil {
return nil , err
}
return summaryReport , nil
}
// Prepare prepares the environment for the following back tests
// this is a blocking operation
func ( e * LocalProcessExecutor ) Prepare ( configJson [ ] byte ) error {
log . Debugln ( "syncing backtest data before starting backtests..." )
tf , err := jsonToYamlConfig ( e . ConfigDir , configJson )
if err != nil {
return err
}
c := exec . Command ( e . Bin , "backtest" , "--sync" , "--sync-only" , "--config" , tf . Name ( ) )
output , err := c . Output ( )
if err != nil {
return errors . Wrapf ( err , "failed to sync backtest data: %s" , string ( output ) )
}
return nil
}
func ( e * LocalProcessExecutor ) Run ( ctx context . Context , taskC chan BacktestTask , bar * pb . ProgressBar ) ( chan BacktestTask , error ) {
var maxNumOfProcess = e . Config . MaxNumberOfProcesses
var resultsC = make ( chan BacktestTask , maxNumOfProcess * 2 )
wg := sync . WaitGroup { }
wg . Add ( maxNumOfProcess )
go func ( ) {
wg . Wait ( )
close ( resultsC )
} ( )
for i := 0 ; i < maxNumOfProcess ; i ++ {
// fork workers
go func ( id int , taskC chan BacktestTask ) {
taskCnt := 0
bar . Set ( "log" , fmt . Sprintf ( "starting local worker #%d" , id ) )
bar . Write ( )
defer wg . Done ( )
for {
select {
case <- ctx . Done ( ) :
return
case task , ok := <- taskC :
if ! ok {
return
}
taskCnt ++
bar . Set ( "log" , fmt . Sprintf ( "local worker #%d received param task: %v" , id , task . Params ) )
bar . Write ( )
report , err := e . Execute ( task . ConfigJson )
if err != nil {
if err2 , ok := err . ( * exec . ExitError ) ; ok {
log . WithError ( err ) . Errorf ( "execute error: %s" , err2 . Stderr )
} else {
log . WithError ( err ) . Errorf ( "execute error" )
}
}
task . Error = err
task . Report = report
resultsC <- task
}
}
} ( i + 1 , taskC )
}
return resultsC , nil
}
// Execute runs the config json and returns the summary report. This is a blocking operation.
func ( e * LocalProcessExecutor ) Execute ( configJson [ ] byte ) ( * backtest . SummaryReport , error ) {
tf , err := jsonToYamlConfig ( e . ConfigDir , configJson )
if err != nil {
return nil , err
}
c := exec . Command ( e . Bin , "backtest" , "--config" , tf . Name ( ) , "--output" , e . OutputDir , "--subdir" )
output , err := c . Output ( )
if err != nil {
log . WithError ( err ) . WithField ( "command" , [ ] string { e . Bin , "backtest" , "--config" , tf . Name ( ) , "--output" , e . OutputDir , "--subdir" } ) . Errorf ( "failed to execute backtest" )
return nil , err
}
// the last line is the report path
scanner := bufio . NewScanner ( bytes . NewBuffer ( output ) )
var reportFilePath string
for scanner . Scan ( ) {
reportFilePath = scanner . Text ( )
}
return e . readReport ( reportFilePath )
}
// jsonToYamlConfig translate json format config into a YAML format config file
// The generated file is a temp file
func jsonToYamlConfig ( dir string , configJson [ ] byte ) ( * os . File , error ) {
var o map [ string ] interface { }
if err := json . Unmarshal ( configJson , & o ) ; err != nil {
return nil , err
}
yamlConfig , err := yaml . Marshal ( o )
if err != nil {
return nil , err
}
tf , err := os . CreateTemp ( dir , "qbtrade-*.yaml" )
if err != nil {
return nil , err
}
if _ , err = tf . Write ( yamlConfig ) ; err != nil {
return nil , err
}
if err := tf . Close ( ) ; err != nil {
return nil , err
}
return tf , nil
}