freqtrade_origin/freqtrade/freqai/base_models/BasePyTorchRegressor.py
2024-05-13 07:10:25 +02:00

121 lines
4.5 KiB
Python

import logging
from time import time
from typing import Any, Tuple
import numpy as np
import numpy.typing as npt
from pandas import DataFrame
from freqtrade.freqai.base_models.BasePyTorchModel import BasePyTorchModel
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
logger = logging.getLogger(__name__)
class BasePyTorchRegressor(BasePyTorchModel):
"""
A PyTorch implementation of a regressor.
User must implement fit method
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def predict(
self, unfiltered_df: DataFrame, dk: FreqaiDataKitchen, **kwargs
) -> Tuple[DataFrame, npt.NDArray[np.int_]]:
"""
Filter the prediction features data and predict with it.
:param unfiltered_df: Full dataframe for the current backtest period.
:return:
:pred_df: dataframe containing the predictions
:do_predict: np.array of 1s and 0s to indicate places where freqai needed to remove
data (NaNs) or felt uncertain about data (PCA and DI index)
"""
dk.find_features(unfiltered_df)
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
dk.data_dictionary["prediction_features"] = filtered_df
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True
)
x = self.data_convertor.convert_x(
dk.data_dictionary["prediction_features"], device=self.device
)
self.model.model.eval()
y = self.model.model(x)
pred_df = DataFrame(y.detach().tolist(), columns=[dk.label_list[0]])
pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df)
if dk.feature_pipeline["di"]:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(outliers.shape[0])
dk.do_predict = outliers
return (pred_df, dk.do_predict)
def train(self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:return:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
dd = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
dk.feature_pipeline = self.define_data_pipeline(threads=dk.thread_count)
dk.label_pipeline = self.define_label_pipeline(threads=dk.thread_count)
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
(dd["train_features"], dd["train_labels"], dd["train_weights"]) = (
dk.feature_pipeline.fit_transform(
dd["train_features"], dd["train_labels"], dd["train_weights"]
)
)
dd["train_labels"], _, _ = dk.label_pipeline.fit_transform(dd["train_labels"])
if self.freqai_info.get("data_split_parameters", {}).get("test_size", 0.1) != 0:
(dd["test_features"], dd["test_labels"], dd["test_weights"]) = (
dk.feature_pipeline.transform(
dd["test_features"], dd["test_labels"], dd["test_weights"]
)
)
dd["test_labels"], _, _ = dk.label_pipeline.transform(dd["test_labels"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(dd['train_features'])} data points")
model = self.fit(dd, dk)
end_time = time()
logger.info(
f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------"
)
return model