import logging from time import time from typing import Any, Tuple import numpy as np import numpy.typing as npt from pandas import DataFrame from freqtrade.freqai.base_models.BasePyTorchModel import BasePyTorchModel from freqtrade.freqai.data_kitchen import FreqaiDataKitchen logger = logging.getLogger(__name__) class BasePyTorchRegressor(BasePyTorchModel): """ A PyTorch implementation of a regressor. User must implement fit method """ def __init__(self, **kwargs): super().__init__(**kwargs) def predict( self, unfiltered_df: DataFrame, dk: FreqaiDataKitchen, **kwargs ) -> Tuple[DataFrame, npt.NDArray[np.int_]]: """ Filter the prediction features data and predict with it. :param unfiltered_df: Full dataframe for the current backtest period. :return: :pred_df: dataframe containing the predictions :do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove data (NaNs) or felt uncertain about data (PCA and DI index) """ dk.find_features(unfiltered_df) filtered_df, _ = dk.filter_features( unfiltered_df, dk.training_features_list, training_filter=False ) dk.data_dictionary["prediction_features"] = filtered_df dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform( dk.data_dictionary["prediction_features"], outlier_check=True ) x = self.data_convertor.convert_x( dk.data_dictionary["prediction_features"], device=self.device ) self.model.model.eval() y = self.model.model(x) pred_df = DataFrame(y.detach().tolist(), columns=[dk.label_list[0]]) pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df) if dk.feature_pipeline["di"]: dk.DI_values = dk.feature_pipeline["di"].di_values else: dk.DI_values = np.zeros(outliers.shape[0]) dk.do_predict = outliers return (pred_df, dk.do_predict) def train(self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs) -> Any: """ Filter the training data and train a model to it. Train makes heavy use of the datakitchen for storing, saving, loading, and analyzing the data. :param unfiltered_df: Full dataframe for the current training period :return: :model: Trained model which can be used to inference (self.predict) """ logger.info(f"-------------------- Starting training {pair} --------------------") start_time = time() features_filtered, labels_filtered = dk.filter_features( unfiltered_df, dk.training_features_list, dk.label_list, training_filter=True, ) # split data into train/test data. dd = dk.make_train_test_datasets(features_filtered, labels_filtered) if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live: dk.fit_labels() dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count) dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count) dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"]) dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"]) (dd["train_features"], dd["train_labels"], dd["train_weights"]) = ( dk.feature_pipeline.fit_transform( dd["train_features"], dd["train_labels"], dd["train_weights"] ) ) dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"]) if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) != 0: (dd["test_features"], dd["test_labels"], dd["test_weights"]) = ( dk.feature_pipeline.transform( dd["test_features"], dd["test_labels"], dd["test_weights"] ) ) dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"]) logger.info( f"Training model on {len(dk.data_dictionary['train_features'].columns)} features" ) logger.info(f"Training model on {len(dd['train_features'])} data points") model = self.fit(dd, dk) end_time = time() logger.info( f"-------------------- Done training {pair} " f"({end_time - start_time:.2f} secs) --------------------" ) return model