qbtrade/pkg/cmd/run.go

335 lines
7.9 KiB
Go
Raw Normal View History

2024-06-27 14:42:38 +00:00
package cmd
import (
"context"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"syscall"
"time"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
flag "github.com/spf13/pflag"
"git.qtrade.icu/lychiyu/qbtrade/pkg/cmd/cmdutil"
"git.qtrade.icu/lychiyu/qbtrade/pkg/grpc"
"git.qtrade.icu/lychiyu/qbtrade/pkg/qbtrade"
"git.qtrade.icu/lychiyu/qbtrade/pkg/server"
)
func init() {
RunCmd.Flags().Bool("no-compile", false, "do not compile wrapper binary")
RunCmd.Flags().Bool("no-sync", false, "do not sync on startup")
RunCmd.Flags().String("totp-key-url", "", "time-based one-time password key URL, if defined, it will be used for restoring the otp key")
RunCmd.Flags().String("totp-issuer", "", "")
RunCmd.Flags().String("totp-account-name", "", "")
RunCmd.Flags().Bool("enable-webserver", false, "enable webserver")
RunCmd.Flags().Bool("enable-web-server", false, "legacy option, this is renamed to --enable-webserver")
RunCmd.Flags().String("webserver-bind", ":8080", "webserver binding")
RunCmd.Flags().Bool("lightweight", false, "lightweight mode")
RunCmd.Flags().Bool("enable-grpc", false, "enable grpc server")
RunCmd.Flags().String("grpc-bind", ":50051", "grpc server binding")
RunCmd.Flags().Bool("setup", false, "use setup mode")
RootCmd.AddCommand(RunCmd)
}
var RunCmd = &cobra.Command{
Use: "run",
Short: "run strategies from config file",
// SilenceUsage is an option to silence usage when an error occurs.
SilenceUsage: true,
RunE: run,
}
func runSetup(baseCtx context.Context, userConfig *qbtrade.Config, enableApiServer bool) error {
ctx, cancelTrading := context.WithCancel(baseCtx)
defer cancelTrading()
environ := qbtrade.NewEnvironment()
trader := qbtrade.NewTrader(environ)
if enableApiServer {
go func() {
s := &server.Server{
Config: userConfig,
Environ: environ,
Trader: trader,
OpenInBrowser: true,
Setup: &server.Setup{
Context: ctx,
Cancel: cancelTrading,
Token: "",
},
}
if err := s.Run(ctx); err != nil {
log.WithError(err).Errorf("server error")
}
}()
}
cmdutil.WaitForSignal(ctx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
gracefulShutdownPeriod := 30 * time.Second
shtCtx, cancelShutdown := context.WithTimeout(qbtrade.NewTodoContextWithExistingIsolation(ctx), gracefulShutdownPeriod)
qbtrade.Shutdown(shtCtx)
cancelShutdown()
return nil
}
func runConfig(basectx context.Context, cmd *cobra.Command, userConfig *qbtrade.Config) error {
noSync, err := cmd.Flags().GetBool("no-sync")
if err != nil {
return err
}
enableWebServer, err := cmd.Flags().GetBool("enable-webserver")
if err != nil {
return err
}
webServerBind, err := cmd.Flags().GetString("webserver-bind")
if err != nil {
return err
}
enableWebServerLegacy, err := cmd.Flags().GetBool("enable-web-server")
if err != nil {
return err
}
if enableWebServerLegacy {
log.Warn("command option --enable-web-server is renamed to --enable-webserver")
enableWebServer = true
}
enableGrpc, err := cmd.Flags().GetBool("enable-grpc")
if err != nil {
return err
}
grpcBind, err := cmd.Flags().GetString("grpc-bind")
if err != nil {
return err
}
_ = grpcBind
_ = enableGrpc
tradingCtx, cancelTrading := context.WithCancel(basectx)
defer cancelTrading()
environ := qbtrade.NewEnvironment()
lightweight, err := cmd.Flags().GetBool("lightweight")
if err != nil {
return err
}
if lightweight {
if err := qbtrade.BootstrapEnvironmentLightweight(tradingCtx, environ, userConfig); err != nil {
return err
}
} else {
if err := qbtrade.BootstrapEnvironment(tradingCtx, environ, userConfig); err != nil {
return err
}
}
if err := environ.Init(tradingCtx); err != nil {
return err
}
if !noSync {
if err := environ.Sync(tradingCtx, userConfig); err != nil {
return err
}
if userConfig.Sync != nil {
environ.BindSync(userConfig.Sync)
}
}
trader := qbtrade.NewTrader(environ)
if err := trader.Configure(userConfig); err != nil {
return err
}
if err := trader.Initialize(tradingCtx); err != nil {
return err
}
if err := trader.LoadState(tradingCtx); err != nil {
return err
}
if err := trader.Run(tradingCtx); err != nil {
return err
}
if enableWebServer {
go func() {
s := &server.Server{
Config: userConfig,
Environ: environ,
Trader: trader,
}
if err := s.Run(tradingCtx, webServerBind); err != nil {
log.WithError(err).Errorf("http server bind error")
}
}()
}
if enableGrpc {
go func() {
s := &grpc.Server{
Config: userConfig,
Environ: environ,
Trader: trader,
}
if err := s.ListenAndServe(grpcBind); err != nil {
log.WithError(err).Errorf("grpc server bind error")
}
}()
}
cmdutil.WaitForSignal(tradingCtx, syscall.SIGINT, syscall.SIGTERM)
cancelTrading()
gracefulShutdownPeriod := 30 * time.Second
shtCtx, cancelShutdown := context.WithTimeout(qbtrade.NewTodoContextWithExistingIsolation(tradingCtx), gracefulShutdownPeriod)
qbtrade.Shutdown(shtCtx)
if err := trader.SaveState(shtCtx); err != nil {
log.WithError(err).Errorf("can not save strategy persistence states")
}
cancelShutdown()
for _, session := range environ.Sessions() {
if err := session.MarketDataStream.Close(); err != nil {
log.WithError(err).Errorf("[%s] market data stream close error", session.Name)
}
if err := session.UserDataStream.Close(); err != nil {
log.WithError(err).Errorf("[%s] user data stream close error", session.Name)
}
}
return nil
}
func run(cmd *cobra.Command, args []string) error {
setup, err := cmd.Flags().GetBool("setup")
if err != nil {
return err
}
noCompile, err := cmd.Flags().GetBool("no-compile")
if err != nil {
return err
}
configFile, err := cmd.Flags().GetString("config")
if err != nil {
return err
}
if !setup {
// if it's not setup, then the config file option is required.
if len(configFile) == 0 {
return errors.New("--config option is required")
}
if _, err := os.Stat(configFile); err != nil {
return err
}
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// for wrapper binary, we can just run the strategies
if qbtrade.IsWrapperBinary || (userConfig.Build != nil && len(userConfig.Build.Imports) == 0) || noCompile {
if qbtrade.IsWrapperBinary {
log.Infof("running wrapper binary...")
}
if setup {
return runSetup(ctx, userConfig, true)
}
// default setting is false, here load as true
userConfig, err = qbtrade.Load(configFile, true)
if err != nil {
return err
}
return runConfig(ctx, cmd, userConfig)
}
return runWrapperBinary(ctx, cmd, userConfig, args)
}
func runWrapperBinary(ctx context.Context, cmd *cobra.Command, userConfig *qbtrade.Config, args []string) error {
var runArgs = []string{"run"}
cmd.Flags().Visit(func(flag *flag.Flag) {
runArgs = append(runArgs, "--"+flag.Name, flag.Value.String())
})
runArgs = append(runArgs, args...)
runCmd, err := buildAndRun(ctx, userConfig, runArgs...)
if err != nil {
return err
}
if sig := cmdutil.WaitForSignal(ctx, syscall.SIGTERM, syscall.SIGINT); sig != nil {
log.Infof("sending signal to the child process...")
if err := runCmd.Process.Signal(sig); err != nil {
return err
}
if err := runCmd.Wait(); err != nil {
return err
}
}
return nil
}
// buildAndRun builds the package natively and run the binary with the given args
func buildAndRun(ctx context.Context, userConfig *qbtrade.Config, args ...string) (*exec.Cmd, error) {
packageDir, err := ioutil.TempDir("build", "qbtradew")
if err != nil {
return nil, err
}
defer os.RemoveAll(packageDir)
targetConfig := qbtrade.GetNativeBuildTargetConfig()
binary, err := qbtrade.Build(ctx, userConfig, targetConfig)
if err != nil {
return nil, err
}
cwd, err := os.Getwd()
if err != nil {
return nil, err
}
executePath := filepath.Join(cwd, binary)
runCmd := exec.Command(executePath, args...)
runCmd.Stdout = os.Stdout
runCmd.Stderr = os.Stderr
return runCmd, runCmd.Start()
}