This commit is contained in:
Aleksey Savin 2024-06-01 00:32:05 +03:00
parent 98ef62cb46
commit ac5c22d0bc

View File

@ -57,21 +57,22 @@ class IFreqaiModel(ABC):
"""
def __init__(self, config: Config) -> None:
self.config = config
self.assert_config(self.config)
self.freqai_info: Dict[str, Any] = config["freqai"]
self.data_split_parameters: Dict[str, Any] = config.get("freqai", {}).get(
"data_split_parameters", {})
"data_split_parameters", {}
)
self.model_training_parameters: Dict[str, Any] = config.get("freqai", {}).get(
"model_training_parameters", {})
"model_training_parameters", {}
)
self.identifier: str = self.freqai_info.get("identifier", "no_id_provided")
self.retrain = False
self.first = True
self.set_full_path()
self.save_backtest_models: bool = self.freqai_info.get("save_backtest_models", False)
if self.save_backtest_models:
logger.info('Backtesting module configured to save all models.')
logger.info("Backtesting module configured to save all models.")
self.dd = FreqaiDataDrawer(Path(self.full_path), self.config)
# set current candle to arbitrary historical date
@ -85,7 +86,7 @@ class IFreqaiModel(ABC):
self.ft_params["DI_threshold"] = 0
logger.warning("DI threshold is not configured for Keras models yet. Deactivating.")
self.CONV_WIDTH = self.freqai_info.get('conv_width', 1)
self.CONV_WIDTH = self.freqai_info.get("conv_width", 1)
self.class_names: List[str] = [] # used in classification subclasses
self.pair_it = 0
self.pair_it_train = 0
@ -95,8 +96,8 @@ class IFreqaiModel(ABC):
self.train_time: float = 0
self.begin_time: float = 0
self.begin_time_train: float = 0
self.base_tf_seconds = timeframe_to_seconds(self.config['timeframe'])
self.continual_learning = self.freqai_info.get('continual_learning', False)
self.base_tf_seconds = timeframe_to_seconds(self.config["timeframe"])
self.continual_learning = self.freqai_info.get("continual_learning", False)
self.plot_features = self.ft_params.get("plot_feature_importances", 0)
self.corr_dataframes: Dict[str, DataFrame] = {}
# get_corr_dataframes is controlling the caching of corr_dataframes
@ -108,10 +109,11 @@ class IFreqaiModel(ABC):
self.data_provider: Optional[DataProvider] = None
self.max_system_threads = max(int(psutil.cpu_count() * 2 - 2), 1)
self.can_short = True # overridden in start() with strategy.can_short
if self.ft_params.get('principal_component_analysis', False) and self.continual_learning:
self.ft_params.update({'principal_component_analysis': False})
logger.warning('User tried to use PCA with continual learning. Deactivating PCA.')
self.activate_tensorboard: bool = self.freqai_info.get('activate_tensorboard', True)
self.model: Any = None
if self.ft_params.get("principal_component_analysis", False) and self.continual_learning:
self.ft_params.update({"principal_component_analysis": False})
logger.warning("User tried to use PCA with continual learning. Deactivating PCA.")
self.activate_tensorboard: bool = self.freqai_info.get("activate_tensorboard", True)
record_params(config, self.full_path)
@ -119,10 +121,9 @@ class IFreqaiModel(ABC):
"""
Return an empty state to be pickled in hyperopt
"""
return ({})
return {}
def assert_config(self, config: Config) -> None:
if not config.get("freqai", {}):
raise OperationalException("No freqai parameters found in configuration file.")
@ -143,7 +144,7 @@ class IFreqaiModel(ABC):
self.can_short = strategy.can_short
if self.live:
self.inference_timer('start')
self.inference_timer("start")
self.dk = FreqaiDataKitchen(self.config, self.live, metadata["pair"])
dk = self.start_live(dataframe, metadata, strategy, self.dk)
dataframe = dk.remove_features_from_df(dk.return_dataframe)
@ -161,13 +162,12 @@ class IFreqaiModel(ABC):
dataframe = dk.remove_features_from_df(dk.return_dataframe)
else:
logger.info("Backtesting using historic predictions (live models)")
dk = self.start_backtesting_from_historic_predictions(
dataframe, metadata, self.dk)
dk = self.start_backtesting_from_historic_predictions(dataframe, metadata, self.dk)
dataframe = dk.return_dataframe
self.clean_up()
if self.live:
self.inference_timer('stop', metadata["pair"])
self.inference_timer("stop", metadata["pair"])
return dataframe
@ -224,7 +224,7 @@ class IFreqaiModel(ABC):
# ensure pair is available in dp
if pair not in strategy.dp.current_whitelist():
self.train_queue.popleft()
logger.warning(f'{pair} not in current whitelist, removing from train queue.')
logger.warning(f"{pair} not in current whitelist, removing from train queue.")
continue
(_, trained_timestamp) = self.dd.get_pair_dict_info(pair)
@ -237,23 +237,25 @@ class IFreqaiModel(ABC):
) = dk.check_if_new_training_required(trained_timestamp)
if retrain:
self.train_timer('start')
self.train_timer("start")
dk.set_paths(pair, new_trained_timerange.stopts)
try:
self.extract_data_and_train_model(
new_trained_timerange, pair, strategy, dk, data_load_timerange
)
except Exception as msg:
logger.exception(f"Training {pair} raised exception {msg.__class__.__name__}. "
f"Message: {msg}, skipping.")
logger.exception(
f"Training {pair} raised exception {msg.__class__.__name__}. "
f"Message: {msg}, skipping."
)
self.train_timer('stop', pair)
self.train_timer("stop", pair)
# only rotate the queue after the first has been trained.
self.train_queue.rotate(-1)
self.dd.save_historic_predictions_to_disk()
if self.freqai_info.get('write_metrics_to_disk', False):
if self.freqai_info.get("write_metrics_to_disk", False):
self.dd.save_metric_tracker_to_disk()
def _train_model(self, dataframe_train, pair, dk, tr_backtest):
@ -303,8 +305,13 @@ class IFreqaiModel(ABC):
train_it += 1
total_trains = len(dk.backtesting_timeranges)
self.training_timerange = tr_train
len_backtest_df = len(dataframe.loc[(dataframe["date"] >= tr_backtest.startdt) & (
dataframe["date"] < tr_backtest.stopdt), :])
len_backtest_df = len(
dataframe.loc[
(dataframe["date"] >= tr_backtest.startdt)
& (dataframe["date"] < tr_backtest.stopdt),
:,
]
)
if not self.ensure_data_exists(len_backtest_df, tr_backtest, pair):
continue
@ -340,10 +347,12 @@ class IFreqaiModel(ABC):
dataframe_base_train = dataframe.loc[dataframe["date"] < tr_train.stopdt, :]
dataframe_base_train = strategy.set_freqai_targets(
dataframe_base_train, metadata=metadata)
dataframe_base_train, metadata=metadata
)
dataframe_base_backtest = dataframe.loc[dataframe["date"] < tr_backtest.stopdt, :]
dataframe_base_backtest = strategy.set_freqai_targets(
dataframe_base_backtest, metadata=metadata)
dataframe_base_backtest, metadata=metadata
)
tr_train = dk.buffer_timerange(tr_train)
@ -398,9 +407,11 @@ class IFreqaiModel(ABC):
"""
if not strategy.process_only_new_candles:
raise OperationalException("You are trying to use a FreqAI strategy with "
raise OperationalException(
"You are trying to use a FreqAI strategy with "
"process_only_new_candles = False. This is not supported "
"by FreqAI, and it is therefore aborting.")
"by FreqAI, and it is therefore aborting."
)
# get the model metadata associated with the current pair
(_, trained_timestamp) = self.dd.get_pair_dict_info(metadata["pair"])
@ -428,8 +439,10 @@ class IFreqaiModel(ABC):
self.model = self.dd.load_data(metadata["pair"], dk)
dataframe = dk.use_strategy_to_populate_indicators(
strategy, prediction_dataframe=dataframe, pair=metadata["pair"],
do_corr_pairs=self.get_corr_dataframes
strategy,
prediction_dataframe=dataframe,
pair=metadata["pair"],
do_corr_pairs=self.get_corr_dataframes,
)
if not self.model:
@ -451,7 +464,6 @@ class IFreqaiModel(ABC):
def build_strategy_return_arrays(
self, dataframe: DataFrame, dk: FreqaiDataKitchen, pair: str, trained_timestamp: int
) -> None:
# hold the historical predictions in memory so we are sending back
# correct array to strategy
@ -477,18 +489,16 @@ class IFreqaiModel(ABC):
else:
# remaining predictions are made only on the most recent candles for performance and
# historical accuracy reasons.
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH:], dk, first=False)
pred_df, do_preds = self.predict(dataframe.iloc[-self.CONV_WIDTH :], dk, first=False)
if self.freqai_info.get('fit_live_predictions_candles', 0) and self.live:
if self.freqai_info.get("fit_live_predictions_candles", 0) and self.live:
self.fit_live_predictions(dk, pair)
self.dd.append_model_predictions(pair, pred_df, do_preds, dk, dataframe)
dk.return_dataframe = self.dd.attach_return_values_to_return_dataframe(pair, dataframe)
return
def check_if_feature_list_matches_strategy(
self, dk: FreqaiDataKitchen
) -> None:
def check_if_feature_list_matches_strategy(self, dk: FreqaiDataKitchen) -> None:
"""
Ensure user is passing the proper feature set if they are reusing an `identifier` pointing
to a folder holding existing models.
@ -500,7 +510,7 @@ class IFreqaiModel(ABC):
if "training_features_list_raw" in dk.data:
feature_list = dk.data["training_features_list_raw"]
else:
feature_list = dk.data['training_features_list']
feature_list = dk.data["training_features_list"]
if dk.training_features_list != feature_list:
raise OperationalException(
@ -516,38 +526,35 @@ class IFreqaiModel(ABC):
def define_data_pipeline(self, threads=-1) -> Pipeline:
ft_params = self.freqai_info["feature_parameters"]
pipe_steps = [
('const', ds.VarianceThreshold(threshold=0)),
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))
("const", ds.VarianceThreshold(threshold=0)),
("scaler", SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1)))),
]
if ft_params.get("principal_component_analysis", False):
pipe_steps.append(('pca', ds.PCA(n_components=0.999)))
pipe_steps.append(('post-pca-scaler',
SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1)))))
pipe_steps.append(("pca", ds.PCA(n_components=0.999)))
pipe_steps.append(
("post-pca-scaler", SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))
)
if ft_params.get("use_SVM_to_remove_outliers", False):
svm_params = ft_params.get(
"svm_params", {"shuffle": False, "nu": 0.01})
pipe_steps.append(('svm', ds.SVMOutlierExtractor(**svm_params)))
svm_params = ft_params.get("svm_params", {"shuffle": False, "nu": 0.01})
pipe_steps.append(("svm", ds.SVMOutlierExtractor(**svm_params)))
di = ft_params.get("DI_threshold", 0)
if di:
pipe_steps.append(('di', ds.DissimilarityIndex(di_threshold=di, n_jobs=threads)))
pipe_steps.append(("di", ds.DissimilarityIndex(di_threshold=di, n_jobs=threads)))
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
pipe_steps.append(('dbscan', ds.DBSCAN(n_jobs=threads)))
pipe_steps.append(("dbscan", ds.DBSCAN(n_jobs=threads)))
sigma = self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0)
sigma = self.freqai_info["feature_parameters"].get("noise_standard_deviation", 0)
if sigma:
pipe_steps.append(('noise', ds.Noise(sigma=sigma)))
pipe_steps.append(("noise", ds.Noise(sigma=sigma)))
return Pipeline(pipe_steps)
def define_label_pipeline(self, threads=-1) -> Pipeline:
label_pipeline = Pipeline([
('scaler', SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))
])
label_pipeline = Pipeline([("scaler", SKLearnWrapper(MinMaxScaler(feature_range=(-1, 1))))])
return label_pipeline
@ -559,7 +566,7 @@ class IFreqaiModel(ABC):
:return:
:boolean: whether the model file exists or not.
"""
if self.dd.model_type == 'joblib':
if self.dd.model_type == "joblib":
file_type = ".joblib"
elif self.dd.model_type in ["stable_baselines3", "sb3_contrib", "pytorch"]:
file_type = ".zip"
@ -576,9 +583,7 @@ class IFreqaiModel(ABC):
"""
Creates and sets the full path for the identifier
"""
self.full_path = Path(
self.config["user_data_dir"] / "models" / f"{self.identifier}"
)
self.full_path = Path(self.config["user_data_dir"] / "models" / f"{self.identifier}")
self.full_path.mkdir(parents=True, exist_ok=True)
def extract_data_and_train_model(
@ -619,8 +624,7 @@ class IFreqaiModel(ABC):
dk.find_features(unfiltered_dataframe)
dk.find_labels(unfiltered_dataframe)
self.tb_logger = get_tb_logger(self.dd.model_type, dk.data_path,
self.activate_tensorboard)
self.tb_logger = get_tb_logger(self.dd.model_type, dk.data_path, self.activate_tensorboard)
model = self.train(unfiltered_dataframe, pair, dk)
self.tb_logger.close()
@ -668,21 +672,21 @@ class IFreqaiModel(ABC):
for label in hist_preds_df.columns:
if hist_preds_df[label].dtype == object:
continue
hist_preds_df[f'{label}_mean'] = 0
hist_preds_df[f'{label}_std'] = 0
hist_preds_df[f"{label}_mean"] = 0
hist_preds_df[f"{label}_std"] = 0
hist_preds_df['do_predict'] = 0
hist_preds_df["do_predict"] = 0
if self.freqai_info['feature_parameters'].get('DI_threshold', 0) > 0:
hist_preds_df['DI_values'] = 0
if self.freqai_info["feature_parameters"].get("DI_threshold", 0) > 0:
hist_preds_df["DI_values"] = 0
for return_str in dk.data['extra_returns_per_train']:
hist_preds_df[return_str] = dk.data['extra_returns_per_train'][return_str]
for return_str in dk.data["extra_returns_per_train"]:
hist_preds_df[return_str] = dk.data["extra_returns_per_train"][return_str]
hist_preds_df['high_price'] = strat_df['high']
hist_preds_df['low_price'] = strat_df['low']
hist_preds_df['close_price'] = strat_df['close']
hist_preds_df['date_pred'] = strat_df['date']
hist_preds_df["high_price"] = strat_df["high"]
hist_preds_df["low_price"] = strat_df["low"]
hist_preds_df["close_price"] = strat_df["close"]
hist_preds_df["date_pred"] = strat_df["date"]
def fit_live_predictions(self, dk: FreqaiDataKitchen, pair: str) -> None:
"""
@ -698,52 +702,51 @@ class IFreqaiModel(ABC):
for label in full_labels:
if self.dd.historic_predictions[dk.pair][label].dtype == object:
continue
f = spy.stats.norm.fit(
self.dd.historic_predictions[dk.pair][label].tail(num_candles))
f = spy.stats.norm.fit(self.dd.historic_predictions[dk.pair][label].tail(num_candles))
dk.data["labels_mean"][label], dk.data["labels_std"][label] = f[0], f[1]
return
def inference_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
def inference_timer(self, do: Literal["start", "stop"] = "start", pair: str = ""):
"""
Timer designed to track the cumulative time spent in FreqAI for one pass through
the whitelist. This will check if the time spent is more than 1/4 the time
of a single candle, and if so, it will warn the user of degraded performance
"""
if do == 'start':
if do == "start":
self.pair_it += 1
self.begin_time = time.time()
elif do == 'stop':
elif do == "stop":
end = time.time()
time_spent = (end - self.begin_time)
if self.freqai_info.get('write_metrics_to_disk', False):
self.dd.update_metric_tracker('inference_time', time_spent, pair)
time_spent = end - self.begin_time
if self.freqai_info.get("write_metrics_to_disk", False):
self.dd.update_metric_tracker("inference_time", time_spent, pair)
self.inference_time += time_spent
if self.pair_it == self.total_pairs:
logger.info(
f'Total time spent inferencing pairlist {self.inference_time:.2f} seconds')
f"Total time spent inferencing pairlist {self.inference_time:.2f} seconds"
)
self.pair_it = 0
self.inference_time = 0
return
def train_timer(self, do: Literal['start', 'stop'] = 'start', pair: str = ''):
def train_timer(self, do: Literal["start", "stop"] = "start", pair: str = ""):
"""
Timer designed to track the cumulative time spent training the full pairlist in
FreqAI.
"""
if do == 'start':
if do == "start":
self.pair_it_train += 1
self.begin_time_train = time.time()
elif do == 'stop':
elif do == "stop":
end = time.time()
time_spent = (end - self.begin_time_train)
if self.freqai_info.get('write_metrics_to_disk', False):
time_spent = end - self.begin_time_train
if self.freqai_info.get("write_metrics_to_disk", False):
self.dd.collect_metrics(time_spent, pair)
self.train_time += time_spent
if self.pair_it_train == self.total_pairs:
logger.info(
f'Total time spent training pairlist {self.train_time:.2f} seconds')
logger.info(f"Total time spent training pairlist {self.train_time:.2f} seconds")
self.pair_it_train = 0
self.train_time = 0
return
@ -763,14 +766,14 @@ class IFreqaiModel(ABC):
"""
current_pairlist = self.config.get("exchange", {}).get("pair_whitelist")
if not self.dd.pair_dict:
logger.info('Set fresh train queue from whitelist. '
f'Queue: {current_pairlist}')
logger.info("Set fresh train queue from whitelist. Queue: {current_pairlist}")
return deque(current_pairlist)
best_queue = deque()
pair_dict_sorted = sorted(self.dd.pair_dict.items(),
key=lambda k: k[1]['trained_timestamp'])
pair_dict_sorted = sorted(
self.dd.pair_dict.items(), key=lambda k: k[1]["trained_timestamp"]
)
for pair in pair_dict_sorted:
if pair[0] in current_pairlist:
best_queue.append(pair[0])
@ -778,8 +781,9 @@ class IFreqaiModel(ABC):
if pair not in best_queue:
best_queue.appendleft(pair)
logger.info('Set existing queue from trained timestamps. '
f'Best approximation queue: {best_queue}')
logger.info(
"Set existing queue from trained timestamps. Best approximation queue: {best_queue}"
)
return best_queue
def cache_corr_pairlist_dfs(self, dataframe: DataFrame, dk: FreqaiDataKitchen) -> DataFrame:
@ -794,14 +798,15 @@ class IFreqaiModel(ABC):
if self.get_corr_dataframes:
self.corr_dataframes = dk.extract_corr_pair_columns_from_populated_indicators(dataframe)
if not self.corr_dataframes:
logger.warning("Couldn't cache corr_pair dataframes for improved performance. "
logger.warning(
"Couldn't cache corr_pair dataframes for improved performance. "
"Consider ensuring that the full coin/stake, e.g. XYZ/USD, "
"is included in the column names when you are creating features "
"in `feature_engineering_*` functions.")
"in `feature_engineering_*` functions."
)
self.get_corr_dataframes = not bool(self.corr_dataframes)
elif self.corr_dataframes:
dataframe = dk.attach_corr_pair_columns(
dataframe, self.corr_dataframes, dk.pair)
dataframe = dk.attach_corr_pair_columns(dataframe, self.corr_dataframes, dk.pair)
return dataframe
@ -817,8 +822,9 @@ class IFreqaiModel(ABC):
self.pair_it = 1
self.current_candle = self.dd.current_candle
def ensure_data_exists(self, len_dataframe_backtest: int,
tr_backtest: TimeRange, pair: str) -> bool:
def ensure_data_exists(
self, len_dataframe_backtest: int, tr_backtest: TimeRange, pair: str
) -> bool:
"""
Check if the dataframe is empty, if not, report useful information to user.
:param len_dataframe_backtest: the len of backtesting dataframe
@ -833,8 +839,9 @@ class IFreqaiModel(ABC):
return False
return True
def log_backtesting_progress(self, tr_train: TimeRange, pair: str,
train_it: int, total_trains: int):
def log_backtesting_progress(
self, tr_train: TimeRange, pair: str, train_it: int, total_trains: int
):
"""
Log the backtesting progress so user knows how many pairs have been trained and
how many more pairs/trains remain.
@ -861,30 +868,37 @@ class IFreqaiModel(ABC):
fit_live_predictions_candles = self.freqai_info.get("fit_live_predictions_candles", 0)
if fit_live_predictions_candles:
logger.info("Applying fit_live_predictions in backtesting")
label_columns = [col for col in dk.full_df.columns if (
col.startswith("&") and
not (col.startswith("&") and col.endswith("_mean")) and
not (col.startswith("&") and col.endswith("_std")) and
col not in self.dk.data["extra_returns_per_train"])
label_columns = [
col
for col in dk.full_df.columns
if (
col.startswith("&")
and not (col.startswith("&") and col.endswith("_mean"))
and not (col.startswith("&") and col.endswith("_std"))
and col not in self.dk.data["extra_returns_per_train"]
)
]
for index in range(len(dk.full_df)):
if index >= fit_live_predictions_candles:
self.dd.historic_predictions[self.dk.pair] = (
dk.full_df.iloc[index - fit_live_predictions_candles:index])
self.dd.historic_predictions[self.dk.pair] = dk.full_df.iloc[
index - fit_live_predictions_candles : index
]
self.fit_live_predictions(self.dk, self.dk.pair)
for label in label_columns:
if dk.full_df[label].dtype == object:
continue
if "labels_mean" in self.dk.data:
dk.full_df.at[index, f"{label}_mean"] = (
self.dk.data["labels_mean"][label])
dk.full_df.at[index, f"{label}_mean"] = self.dk.data["labels_mean"][
label
]
if "labels_std" in self.dk.data:
dk.full_df.at[index, f"{label}_std"] = self.dk.data["labels_std"][label]
for extra_col in self.dk.data["extra_returns_per_train"]:
dk.full_df.at[index, f"{extra_col}"] = (
self.dk.data["extra_returns_per_train"][extra_col])
dk.full_df.at[index, f"{extra_col}"] = self.dk.data[
"extra_returns_per_train"
][extra_col]
return
@ -901,7 +915,8 @@ class IFreqaiModel(ABC):
if key_name not in self.metadata:
metadata = self.metadata
metadata[key_name] = int(
pd.to_datetime(live_dataframe.tail(1)["date"].values[0]).timestamp())
pd.to_datetime(live_dataframe.tail(1)["date"].values[0]).timestamp()
)
self.update_metadata(metadata)
def start_backtesting_from_historic_predictions(
@ -917,19 +932,20 @@ class IFreqaiModel(ABC):
pair = metadata["pair"]
dk.return_dataframe = dataframe
saved_dataframe = self.dd.historic_predictions[pair]
columns_to_drop = list(set(saved_dataframe.columns).intersection(
dk.return_dataframe.columns))
columns_to_drop = list(
set(saved_dataframe.columns).intersection(dk.return_dataframe.columns)
)
dk.return_dataframe = dk.return_dataframe.drop(columns=list(columns_to_drop))
dk.return_dataframe = pd.merge(
dk.return_dataframe, saved_dataframe, how='left', left_on='date', right_on="date_pred")
dk.return_dataframe, saved_dataframe, how="left", left_on="date", right_on="date_pred"
)
return dk
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModel.py for an example.
@abstractmethod
def train(self, unfiltered_df: DataFrame, pair: str,
dk: FreqaiDataKitchen, **kwargs) -> Any:
def train(self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and analyzing the data.
@ -970,23 +986,25 @@ class IFreqaiModel(ABC):
"""
throw deprecation warning if this function is called
"""
logger.warning(f"Your model {self.__class__.__name__} relies on the deprecated"
logger.warning(
f"Your model {self.__class__.__name__} relies on the deprecated"
" data pipeline. Please update your model to use the new data pipeline."
" This can be achieved by following the migration guide at "
f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline")
f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline"
)
dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
dd = dk.data_dictionary
(dd["train_features"],
dd["train_labels"],
dd["train_weights"]) = dk.feature_pipeline.fit_transform(dd["train_features"],
dd["train_labels"],
dd["train_weights"])
(dd["train_features"], dd["train_labels"], dd["train_weights"]) = (
dk.feature_pipeline.fit_transform(
dd["train_features"], dd["train_labels"], dd["train_weights"]
)
)
(dd["test_features"],
dd["test_labels"],
dd["test_weights"]) = dk.feature_pipeline.transform(dd["test_features"],
dd["test_labels"],
dd["test_weights"])
(dd["test_features"], dd["test_labels"], dd["test_weights"]) = (
dk.feature_pipeline.transform(
dd["test_features"], dd["test_labels"], dd["test_weights"]
)
)
dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
@ -998,13 +1016,16 @@ class IFreqaiModel(ABC):
"""
throw deprecation warning if this function is called
"""
logger.warning(f"Your model {self.__class__.__name__} relies on the deprecated"
logger.warning(
f"Your model {self.__class__.__name__} relies on the deprecated"
" data pipeline. Please update your model to use the new data pipeline."
" This can be achieved by following the migration guide at "
f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline")
f"{DOCS_LINK}/strategy_migration/#freqai-new-data-pipeline"
)
dd = dk.data_dictionary
dd["predict_features"], outliers, _ = dk.feature_pipeline.transform(
dd["predict_features"], outlier_check=True)
dd["predict_features"], outlier_check=True
)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else: