bring classifier/rl up to new paradigm. ensure tests pass. remove old code. add documentation, add new example transform

This commit is contained in:
robcaulk 2023-05-29 13:33:29 +02:00
parent 31e19add27
commit e572653616
18 changed files with 390 additions and 754 deletions

View File

@ -209,15 +209,67 @@ Another example, where the user wants to use live metrics from the trade databas
You need to set the standard dictionary in the config so that FreqAI can return proper dataframe shapes. These values will likely be overridden by the prediction model, but in the case where the model has yet to set them, or needs a default initial value, the pre-set values are what will be returned.
## Feature normalization
### Weighting features for temporal importance
FreqAI is strict when it comes to data normalization. The train features, $X^{train}$, are always normalized to [-1, 1] using a shifted min-max normalization:
FreqAI allows you to set a `weight_factor` to weight recent data more strongly than past data via an exponential function:
$$X^{train}_{norm} = 2 * \frac{X^{train} - X^{train}.min()}{X^{train}.max() - X^{train}.min()} - 1$$
$$ W_i = \exp(\frac{-i}{\alpha*n}) $$
All other data (test data and unseen prediction data in dry/live/backtest) is always automatically normalized to the training feature space according to industry standards. FreqAI stores all the metadata required to ensure that test and prediction features will be properly normalized and that predictions are properly denormalized. For this reason, it is not recommended to eschew industry standards and modify FreqAI internals - however - advanced users can do so by inheriting `train()` in their custom `IFreqaiModel` and using their own normalization functions.
where $W_i$ is the weight of data point $i$ in a total set of $n$ data points. Below is a figure showing the effect of different weight factors on the data points in a feature set.
## Data dimensionality reduction with Principal Component Analysis
![weight-factor](assets/freqai_weight-factor.jpg)
# Building the data pipeline
FreqAI uses the the [`DataSieve`](https://github.com/emergentmethods/datasieve) pipeline, which follows the SKlearn pipeline API, but adds, among other features, coherence between the X, y, and sample_weight vector point removals, and feature removal feature name following.
This means that users can use/customize any SKLearn modules and easily add them to their FreqAI data pipeline. By default, FreqAI builds the following pipeline:
```py
dk.feature_pipeline = Pipeline([
('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1))),
('di', ds.DissimilarityIndex(di_threshold=1)),
])
```
But users will find that they can add PCA and other steps just by changing their configuration settings, for example, if you add `"principal_component_analysis": true` to the `feature_parameters` dict in the `freqai` config, then FreqAI will add the PCA step for you resulting in the following pipeline:
```py
dk.feature_pipeline = Pipeline([
('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1))),
('pca', ds.DataSievePCA()),
('post-pca-scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))
('di', ds.DissimilarityIndex(di_threshold=1)),
])
```
The same concept follows if users activate other config options like `"use_SVM_to_remove_outliers": true` or `"use_DBSCAN_to_remove_outliers": true`. FreqAI will add the appropriate steps to the pipeline for you.
## Customizing the pipeline
Users are encouraged to customize the data pipeline to their needs by building their own data pipeline. This can be done by overriding `define_data_pipeline` in their `IFreqaiModel`. For example:
```py
def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
"""
User defines their custom eature pipeline here (if they wish)
"""
from freqtrade.freqai.transforms import FreqaiQuantileTransformer
dk.feature_pipeline = Pipeline([
('qt', FreqaiQuantileTransformer(output_distribution='normal'))
])
return
```
Here, you are defining the exact pipeline that will be used for your feature set during training and prediction. If you have a custom step that you would like to add to the pipeline, you simply create a class that follows the DataSieve/SKLearn API. That means your step must have a `fit()`, `transform()`, `fit_transform()`, and `inverse_transform()` method. You can see examples of this in the `freqtrade.freqai.transforms` module where we use SKLearn `QuantileNormalization` to create a new step for the pipeline.
As there is the `feature_pipeline`, there also exists a definition for the `label_pipeline` which can be defined the same way as the `feature_pipeline`, by overriding `define_label_pipeline`.
!!! note "Inheritence required"
While most SKLearn methods are very easy to override, as shown in freqtrade/freqai/transforms/quantile_transform.py, they still need to include passing X, y, and sample_weights through all `fit()`, `transform()`, `fit_transform()` and `inverse_transform()` functions, even if that means a direct pass through without modifications.
<!-- ## Data dimensionality reduction with Principal Component Analysis
You can reduce the dimensionality of your features by activating the `principal_component_analysis` in the config:
@ -241,17 +293,7 @@ You define the lookback window by setting `inlier_metric_window` and FreqAI comp
FreqAI adds the `inlier_metric` to the training features and hence gives the model access to a novel type of temporal information.
This function does **not** remove outliers from the data set.
## Weighting features for temporal importance
FreqAI allows you to set a `weight_factor` to weight recent data more strongly than past data via an exponential function:
$$ W_i = \exp(\frac{-i}{\alpha*n}) $$
where $W_i$ is the weight of data point $i$ in a total set of $n$ data points. Below is a figure showing the effect of different weight factors on the data points in a feature set.
![weight-factor](assets/freqai_weight-factor.jpg)
This function does **not** remove outliers from the data set. -->
## Outlier detection
@ -259,7 +301,7 @@ Equity and crypto markets suffer from a high level of non-patterned noise in the
### Identifying outliers with the Dissimilarity Index (DI)
The Dissimilarity Index (DI) aims to quantify the uncertainty associated with each prediction made by the model.
The Dissimilarity Index (DI) aims to quantify the uncertainty associated with each prediction made by the model.
You can tell FreqAI to remove outlier data points from the training/test data sets using the DI by including the following statement in the config:
@ -271,7 +313,7 @@ You can tell FreqAI to remove outlier data points from the training/test data se
}
```
The DI allows predictions which are outliers (not existent in the model feature space) to be thrown out due to low levels of certainty. To do so, FreqAI measures the distance between each training data point (feature vector), $X_{a}$, and all other training data points:
Which will add `DissimilarityIndex` step to your `feature_pipeline` and set the threshold to 1. The DI allows predictions which are outliers (not existent in the model feature space) to be thrown out due to low levels of certainty. To do so, FreqAI measures the distance between each training data point (feature vector), $X_{a}$, and all other training data points:
$$ d_{ab} = \sqrt{\sum_{j=1}^p(X_{a,j}-X_{b,j})^2} $$
@ -305,9 +347,9 @@ You can tell FreqAI to remove outlier data points from the training/test data se
}
```
The SVM will be trained on the training data and any data point that the SVM deems to be beyond the feature space will be removed.
Which will add `SVMOutlierExtractor` step to your `feature_pipeline`. The SVM will be trained on the training data and any data point that the SVM deems to be beyond the feature space will be removed.
FreqAI uses `sklearn.linear_model.SGDOneClassSVM` (details are available on scikit-learn's webpage [here](https://scikit-learn.org/stable/modules/generated/sklearn.linear_model.SGDOneClassSVM.html) (external website)) and you can elect to provide additional parameters for the SVM, such as `shuffle`, and `nu`.
You can elect to provide additional parameters for the SVM, such as `shuffle`, and `nu` via the `feature_parameters.svm_params` dictionary in the config.
The parameter `shuffle` is by default set to `False` to ensure consistent results. If it is set to `True`, running the SVM multiple times on the same data set might result in different outcomes due to `max_iter` being to low for the algorithm to reach the demanded `tol`. Increasing `max_iter` solves this issue but causes the procedure to take longer time.
@ -325,7 +367,7 @@ You can configure FreqAI to use DBSCAN to cluster and remove outliers from the t
}
```
DBSCAN is an unsupervised machine learning algorithm that clusters data without needing to know how many clusters there should be.
Which will add the `DataSieveDBSCAN` step to your `feature_pipeline`. This is an unsupervised machine learning algorithm that clusters data without needing to know how many clusters there should be.
Given a number of data points $N$, and a distance $\varepsilon$, DBSCAN clusters the data set by setting all data points that have $N-1$ other data points within a distance of $\varepsilon$ as *core points*. A data point that is within a distance of $\varepsilon$ from a *core point* but that does not have $N-1$ other data points within a distance of $\varepsilon$ from itself is considered an *edge point*. A cluster is then the collection of *core points* and *edge points*. Data points that have no other data points at a distance $<\varepsilon$ are considered outliers. The figure below shows a cluster with $N = 3$.

View File

@ -82,6 +82,9 @@ class BaseReinforcementLearningModel(IFreqaiModel):
if self.ft_params.get('use_DBSCAN_to_remove_outliers', False):
self.ft_params.update({'use_DBSCAN_to_remove_outliers': False})
logger.warning('User tried to use DBSCAN with RL. Deactivating DBSCAN.')
if self.ft_params.get('DI_threshold', False):
self.ft_params.update({'DI_threshold': False})
logger.warning('User tried to use DI_threshold with RL. Deactivating DI_threshold.')
if self.freqai_info['data_split_parameters'].get('shuffle', False):
self.freqai_info['data_split_parameters'].update({'shuffle': False})
logger.warning('User tried to shuffle training data. Setting shuffle to False')
@ -107,27 +110,40 @@ class BaseReinforcementLearningModel(IFreqaiModel):
training_filter=True,
)
data_dictionary: Dict[str, Any] = dk.make_train_test_datasets(
d: Dict[str, Any] = dk.make_train_test_datasets(
features_filtered, labels_filtered)
self.df_raw = copy.deepcopy(data_dictionary["train_features"])
self.df_raw = copy.deepcopy(d["train_features"])
dk.fit_labels() # FIXME useless for now, but just satiating append methods
# normalize all data based on train_dataset only
prices_train, prices_test = self.build_ohlc_price_dataframes(dk.data_dictionary, pair, dk)
data_dictionary = dk.normalize_data(data_dictionary)
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
# data cleaning/analysis
self.data_cleaning_train(dk)
# d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
# d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
logger.info(
f'Training model on {len(dk.data_dictionary["train_features"].columns)}'
f' features and {len(data_dictionary["train_features"])} data points'
f' features and {len(d["train_features"])} data points'
)
self.set_train_and_eval_environments(data_dictionary, prices_train, prices_test, dk)
self.set_train_and_eval_environments(d, prices_train, prices_test, dk)
model = self.fit(data_dictionary, dk)
model = self.fit(d, dk)
logger.info(f"--------------------done training {pair}--------------------")
@ -236,18 +252,19 @@ class BaseReinforcementLearningModel(IFreqaiModel):
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_dataframe = self.drop_ohlc_from_df(filtered_dataframe, dk)
dk.data_dictionary["prediction_features"] = self.drop_ohlc_from_df(filtered_dataframe, dk)
filtered_dataframe = dk.normalize_data_from_metadata(filtered_dataframe)
dk.data_dictionary["prediction_features"] = filtered_dataframe
# optional additional data cleaning/analysis
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
pred_df = self.rl_model_predict(
dk.data_dictionary["prediction_features"], dk, self.model)
pred_df.fillna(0, inplace=True)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)
def rl_model_predict(self, dataframe: DataFrame,

View File

@ -50,21 +50,30 @@ class BaseClassifierModel(IFreqaiModel):
logger.info(f"-------------------- Training on data from {start_date} to "
f"{end_date} --------------------")
# split data into train/test data.
data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions_candles", 0) or not self.live:
dk.fit_labels()
# normalize all data based on train_dataset only
data_dictionary = dk.normalize_data(data_dictionary)
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
# optional additional data cleaning/analysis
self.data_cleaning_train(dk)
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
logger.info(f"Training model on {len(d['train_features'])} data points")
model = self.fit(data_dictionary, dk)
model = self.fit(d, dk)
end_time = time()
@ -89,10 +98,11 @@ class BaseClassifierModel(IFreqaiModel):
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
if self.CONV_WIDTH == 1:
@ -107,4 +117,10 @@ class BaseClassifierModel(IFreqaiModel):
pred_df = pd.concat([pred_df, pred_df_prob], axis=1)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(len(outliers.index))
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)

View File

@ -1,5 +1,6 @@
import logging
from typing import Dict, List, Tuple
from time import time
from typing import Any, Dict, List, Tuple
import numpy as np
import numpy.typing as npt
@ -68,9 +69,12 @@ class BasePyTorchClassifier(BasePyTorchModel):
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
x = self.data_convertor.convert_x(
dk.data_dictionary["prediction_features"],
device=self.device
@ -85,6 +89,13 @@ class BasePyTorchClassifier(BasePyTorchModel):
pred_df_prob = DataFrame(probs.detach().tolist(), columns=class_names)
pred_df = DataFrame(predicted_classes_str, columns=[dk.label_list[0]])
pred_df = pd.concat([pred_df, pred_df_prob], axis=1)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(len(outliers.index))
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)
def encode_class_names(
@ -149,3 +160,58 @@ class BasePyTorchClassifier(BasePyTorchModel):
)
return self.class_names
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:return:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
model = self.fit(d, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------")
return model

View File

@ -1,21 +1,16 @@
import logging
from abc import ABC, abstractmethod
from time import time
from typing import Any
import torch
from pandas import DataFrame
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
# from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.base_models import BaseRegressionModel
from freqtrade.freqai.freqai_interface import IFreqaiModel
from freqtrade.freqai.torch.PyTorchDataConvertor import PyTorchDataConvertor
logger = logging.getLogger(__name__)
class BasePyTorchModel(BaseRegressionModel):
class BasePyTorchModel(IFreqaiModel, ABC):
"""
Base class for PyTorch type models.
User *must* inherit from this class and set fit() and predict() and
@ -30,51 +25,6 @@ class BasePyTorchModel(BaseRegressionModel):
self.splits = ["train", "test"] if test_size != 0 else ["train"]
self.window_size = self.freqai_info.get("conv_width", 1)
# def train(
# self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
# ) -> Any:
# """
# Filter the training data and train a model to it. Train makes heavy use of the datakitchen
# for storing, saving, loading, and analyzing the data.
# :param unfiltered_df: Full dataframe for the current training period
# :return:
# :model: Trained model which can be used to inference (self.predict)
# """
# logger.info(f"-------------------- Starting training {pair} --------------------")
# start_time = time()
# features_filtered, labels_filtered = dk.filter_features(
# unfiltered_df,
# dk.training_features_list,
# dk.label_list,
# training_filter=True,
# )
# # split data into train/test data.
# data_dictionary = dk.make_train_test_datasets(features_filtered, labels_filtered)
# if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
# dk.fit_labels()
# # normalize all data based on train_dataset only
# data_dictionary = dk.normalize_data(data_dictionary)
# # optional additional data cleaning/analysis
# self.data_cleaning_train(dk)
# logger.info(
# f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
# )
# logger.info(f"Training model on {len(data_dictionary['train_features'])} data points")
# model = self.fit(data_dictionary, dk)
# end_time = time()
# logger.info(f"-------------------- Done training {pair} "
# f"({end_time - start_time:.2f} secs) --------------------")
# return model
@property
@abstractmethod
def data_convertor(self) -> PyTorchDataConvertor:

View File

@ -1,5 +1,6 @@
import logging
from typing import Tuple
from time import time
from typing import Any, Tuple
import numpy as np
import numpy.typing as npt
@ -36,10 +37,11 @@ class BasePyTorchRegressor(BasePyTorchModel):
filtered_df, _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
x = self.data_convertor.convert_x(
dk.data_dictionary["prediction_features"],
device=self.device
@ -47,5 +49,69 @@ class BasePyTorchRegressor(BasePyTorchModel):
self.model.model.eval()
y = self.model.model(x)
pred_df = DataFrame(y.detach().tolist(), columns=[dk.label_list[0]])
pred_df = dk.denormalize_labels_from_metadata(pred_df)
pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(len(outliers.index))
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)
def train(
self, unfiltered_df: DataFrame, pair: str, dk: FreqaiDataKitchen, **kwargs
) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datakitchen
for storing, saving, loading, and analyzing the data.
:param unfiltered_df: Full dataframe for the current training period
:return:
:model: Trained model which can be used to inference (self.predict)
"""
logger.info(f"-------------------- Starting training {pair} --------------------")
start_time = time()
features_filtered, labels_filtered = dk.filter_features(
unfiltered_df,
dk.training_features_list,
dk.label_list,
training_filter=True,
)
# split data into train/test data.
d = dk.make_train_test_datasets(features_filtered, labels_filtered)
if not self.freqai_info.get("fit_live_predictions", 0) or not self.live:
dk.fit_labels()
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
)
logger.info(f"Training model on {len(d['train_features'])} data points")
model = self.fit(d, dk)
end_time = time()
logger.info(f"-------------------- Done training {pair} "
f"({end_time - start_time:.2f} secs) --------------------")
return model

View File

@ -56,20 +56,20 @@ class BaseRegressionModel(IFreqaiModel):
self.define_data_pipeline(dk)
self.define_label_pipeline(dk)
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
(d["train_features"],
d["train_labels"],
d["train_weights"]) = dk.pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
d["train_weights"]) = dk.feature_pipeline.fit_transform(d["train_features"],
d["train_labels"],
d["train_weights"])
(d["test_features"],
d["test_labels"],
d["test_weights"]) = dk.pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
d["test_weights"]) = dk.feature_pipeline.transform(d["test_features"],
d["test_labels"],
d["test_weights"])
d["train_labels"], _, _ = dk.label_pipeline.fit_transform(d["train_labels"])
d["test_labels"], _, _ = dk.label_pipeline.transform(d["test_labels"])
logger.info(
f"Training model on {len(dk.data_dictionary['train_features'].columns)} features"
@ -98,13 +98,11 @@ class BaseRegressionModel(IFreqaiModel):
"""
dk.find_features(unfiltered_df)
filtered_df, _ = dk.filter_features(
dk.data_dictionary["prediction_features"], _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
# filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
dk.data_dictionary["prediction_features"], outliers, _ = dk.pipeline.transform(
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
predictions = self.model.predict(dk.data_dictionary["prediction_features"])
@ -114,7 +112,10 @@ class BaseRegressionModel(IFreqaiModel):
pred_df = DataFrame(predictions, columns=dk.label_list)
pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df)
dk.DI_values = dk.label_pipeline.get_step("di").di_values
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(len(outliers.index))
dk.do_predict = outliers.to_numpy()
return (pred_df, dk.do_predict)

View File

@ -449,9 +449,6 @@ class FreqaiDataDrawer:
elif self.model_type in ["stable_baselines3", "sb3_contrib", "pytorch"]:
model.save(save_path / f"{dk.model_filename}_model.zip")
if dk.svm_model is not None:
dump(dk.svm_model, save_path / f"{dk.model_filename}_svm_model.joblib")
dk.data["data_path"] = str(dk.data_path)
dk.data["model_filename"] = str(dk.model_filename)
dk.data["training_features_list"] = dk.training_features_list
@ -461,8 +458,8 @@ class FreqaiDataDrawer:
rapidjson.dump(dk.data, fp, default=self.np_encoder, number_mode=rapidjson.NM_NATIVE)
# save the pipelines to pickle files
with (save_path / f"{dk.model_filename}_pipeline.pkl").open("wb") as fp:
cloudpickle.dump(dk.pipeline, fp)
with (save_path / f"{dk.model_filename}_feature_pipeline.pkl").open("wb") as fp:
cloudpickle.dump(dk.feature_pipeline, fp)
with (save_path / f"{dk.model_filename}_label_pipeline.pkl").open("wb") as fp:
cloudpickle.dump(dk.label_pipeline, fp)
@ -476,11 +473,6 @@ class FreqaiDataDrawer:
save_path / f"{dk.model_filename}_trained_dates_df.pkl"
)
if self.freqai_info["feature_parameters"].get("principal_component_analysis"):
cloudpickle.dump(
dk.pca, (dk.data_path / f"{dk.model_filename}_pca_object.pkl").open("wb")
)
self.model_dictionary[coin] = model
self.pair_dict[coin]["model_filename"] = dk.model_filename
self.pair_dict[coin]["data_path"] = str(dk.data_path)
@ -489,7 +481,7 @@ class FreqaiDataDrawer:
self.meta_data_dictionary[coin] = {}
self.meta_data_dictionary[coin]["train_df"] = dk.data_dictionary["train_features"]
self.meta_data_dictionary[coin]["meta_data"] = dk.data
self.meta_data_dictionary[coin]["pipeline"] = dk.pipeline
self.meta_data_dictionary[coin]["feature_pipeline"] = dk.feature_pipeline
self.meta_data_dictionary[coin]["label_pipeline"] = dk.label_pipeline
self.save_drawer_to_disk()
@ -522,7 +514,7 @@ class FreqaiDataDrawer:
if coin in self.meta_data_dictionary:
dk.data = self.meta_data_dictionary[coin]["meta_data"]
dk.data_dictionary["train_features"] = self.meta_data_dictionary[coin]["train_df"]
dk.pipeline = self.meta_data_dictionary[coin]["pipeline"]
dk.feature_pipeline = self.meta_data_dictionary[coin]["feature_pipeline"]
dk.label_pipeline = self.meta_data_dictionary[coin]["label_pipeline"]
else:
with (dk.data_path / f"{dk.model_filename}_metadata.json").open("r") as fp:
@ -532,7 +524,7 @@ class FreqaiDataDrawer:
dk.data_path / f"{dk.model_filename}_trained_df.pkl"
)
with (dk.data_path / f"{dk.model_filename}_pipeline.pkl").open("rb") as fp:
dk.pipeline = cloudpickle.load(fp)
dk.feature_pipeline = cloudpickle.load(fp)
with (dk.data_path / f"{dk.model_filename}_label_pipeline.pkl").open("rb") as fp:
dk.label_pipeline = cloudpickle.load(fp)
@ -544,9 +536,6 @@ class FreqaiDataDrawer:
model = self.model_dictionary[coin]
elif self.model_type == 'joblib':
model = load(dk.data_path / f"{dk.model_filename}_model.joblib")
elif self.model_type == 'keras':
from tensorflow import keras
model = keras.models.load_model(dk.data_path / f"{dk.model_filename}_model.h5")
elif 'stable_baselines' in self.model_type or 'sb3_contrib' == self.model_type:
mod = importlib.import_module(
self.model_type, self.freqai_info['rl_config']['model_type'])
@ -558,9 +547,6 @@ class FreqaiDataDrawer:
model = zip["pytrainer"]
model = model.load_from_checkpoint(zip)
if Path(dk.data_path / f"{dk.model_filename}_svm_model.joblib").is_file():
dk.svm_model = load(dk.data_path / f"{dk.model_filename}_svm_model.joblib")
if not model:
raise OperationalException(
f"Unable to load model, ensure model exists at " f"{dk.data_path} "
@ -570,11 +556,6 @@ class FreqaiDataDrawer:
if coin not in self.model_dictionary:
self.model_dictionary[coin] = model
if self.config["freqai"]["feature_parameters"]["principal_component_analysis"]:
dk.pca = cloudpickle.load(
(dk.data_path / f"{dk.model_filename}_pca_object.pkl").open("rb")
)
return model
def update_historic_data(self, strategy: IStrategy, dk: FreqaiDataKitchen) -> None:

View File

@ -4,7 +4,6 @@ import logging
import random
import shutil
from datetime import datetime, timezone
from math import cos, sin
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
@ -12,13 +11,10 @@ import numpy as np
import numpy.typing as npt
import pandas as pd
import psutil
from datasieve.pipeline import Pipeline
from pandas import DataFrame
from scipy import stats
from sklearn import linear_model
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from freqtrade.configuration import TimeRange
from freqtrade.constants import Config
@ -27,7 +23,6 @@ from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.strategy import merge_informative_pair
from freqtrade.strategy.interface import IStrategy
from datasieve.pipeline import Pipeline
SECONDS_IN_DAY = 86400
@ -83,11 +78,11 @@ class FreqaiDataKitchen:
self.live = live
self.pair = pair
self.svm_model: linear_model.SGDOneClassSVM = None
# self.svm_model: linear_model.SGDOneClassSVM = None
self.keras: bool = self.freqai_config.get("keras", False)
self.set_all_pairs()
self.backtest_live_models = config.get("freqai_backtest_live_models", False)
self.pipeline = Pipeline()
self.feature_pipeline = Pipeline()
self.label_pipeline = Pipeline()
if not self.live:
@ -230,13 +225,14 @@ class FreqaiDataKitchen:
drop_index = pd.isnull(filtered_df).any(axis=1) # get the rows that have NaNs,
drop_index = drop_index.replace(True, 1).replace(False, 0) # pep8 requirement.
if (training_filter):
const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
if const_cols:
filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
self.data['constant_features_list'] = const_cols
logger.warning(f"Removed features {const_cols} with constant values.")
else:
self.data['constant_features_list'] = []
# const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
# if const_cols:
# filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
# self.data['constant_features_list'] = const_cols
# logger.warning(f"Removed features {const_cols} with constant values.")
# else:
# self.data['constant_features_list'] = []
# we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
# if labels has multiple columns (user wants to train multiple modelEs), we detect here
@ -267,8 +263,10 @@ class FreqaiDataKitchen:
self.data["filter_drop_index_training"] = drop_index
else:
if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
filtered_df = self.check_pred_labels(filtered_df)
# if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
# filtered_df = self.check_pred_labels(filtered_df)
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_df).any(axis=1)
@ -488,415 +486,6 @@ class FreqaiDataKitchen:
return df
def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
"""
Check that prediction feature labels match training feature labels.
:param df_predictions: incoming predictions
"""
constant_labels = self.data['constant_features_list']
df_predictions = df_predictions.filter(
df_predictions.columns.difference(constant_labels)
)
logger.warning(
f"Removed {len(constant_labels)} features from prediction features, "
f"these were considered constant values during most recent training."
)
return df_predictions
# def principal_component_analysis(self) -> None:
# """
# Performs Principal Component Analysis on the data for dimensionality reduction
# and outlier detection (see self.remove_outliers())
# No parameters or returns, it acts on the data_dictionary held by the DataHandler.
# """
# from sklearn.decomposition import PCA # avoid importing if we dont need it
# pca = PCA(0.999)
# pca = pca.fit(self.data_dictionary["train_features"])
# n_keep_components = pca.n_components_
# self.data["n_kept_components"] = n_keep_components
# n_components = self.data_dictionary["train_features"].shape[1]
# logger.info("reduced feature dimension by %s", n_components - n_keep_components)
# logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_))
# train_components = pca.transform(self.data_dictionary["train_features"])
# self.data_dictionary["train_features"] = pd.DataFrame(
# data=train_components,
# columns=["PC" + str(i) for i in range(0, n_keep_components)],
# index=self.data_dictionary["train_features"].index,
# )
# # normalsing transformed training features
# self.data_dictionary["train_features"] = self.normalize_single_dataframe(
# self.data_dictionary["train_features"])
# # keeping a copy of the non-transformed features so we can check for errors during
# # model load from disk
# self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list)
# self.training_features_list = self.data_dictionary["train_features"].columns
# if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
# test_components = pca.transform(self.data_dictionary["test_features"])
# self.data_dictionary["test_features"] = pd.DataFrame(
# data=test_components,
# columns=["PC" + str(i) for i in range(0, n_keep_components)],
# index=self.data_dictionary["test_features"].index,
# )
# # normalise transformed test feature to transformed training features
# self.data_dictionary["test_features"] = self.normalize_data_from_metadata(
# self.data_dictionary["test_features"])
# self.data["n_kept_components"] = n_keep_components
# self.pca = pca
# logger.info(f"PCA reduced total features from {n_components} to {n_keep_components}")
# if not self.data_path.is_dir():
# self.data_path.mkdir(parents=True, exist_ok=True)
# return None
# def pca_transform(self, filtered_dataframe: DataFrame) -> None:
# """
# Use an existing pca transform to transform data into components
# :param filtered_dataframe: DataFrame = the cleaned dataframe
# """
# pca_components = self.pca.transform(filtered_dataframe)
# self.data_dictionary["prediction_features"] = pd.DataFrame(
# data=pca_components,
# columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])],
# index=filtered_dataframe.index,
# )
# # normalise transformed predictions to transformed training features
# self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata(
# self.data_dictionary["prediction_features"])
# def compute_distances(self) -> float:
# """
# Compute distances between each training point and every other training
# point. This metric defines the neighborhood of trained data and is used
# for prediction confidence in the Dissimilarity Index
# """
# # logger.info("computing average mean distance for all training points")
# pairwise = pairwise_distances(
# self.data_dictionary["train_features"], n_jobs=self.thread_count)
# # remove the diagonal distances which are itself distances ~0
# np.fill_diagonal(pairwise, np.NaN)
# pairwise = pairwise.reshape(-1, 1)
# avg_mean_dist = pairwise[~np.isnan(pairwise)].mean()
# return avg_mean_dist
# def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float:
# """
# Check if more than X% of points werer dropped during outlier detection.
# """
# outlier_protection_pct = self.freqai_config["feature_parameters"].get(
# "outlier_protection_percentage", 30)
# outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100
# if outlier_pct >= outlier_protection_pct:
# return outlier_pct
# else:
# return 0.0
# def use_SVM_to_remove_outliers(self, predict: bool) -> None:
# """
# Build/inference a Support Vector Machine to detect outliers
# in training data and prediction
# :param predict: bool = If true, inference an existing SVM model, else construct one
# """
# if self.keras:
# logger.warning(
# "SVM outlier removal not currently supported for Keras based models. "
# "Skipping user requested function."
# )
# if predict:
# self.do_predict = np.ones(len(self.data_dictionary["prediction_features"]))
# return
# if predict:
# if not self.svm_model:
# logger.warning("No svm model available for outlier removal")
# return
# y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
# do_predict = np.where(y_pred == -1, 0, y_pred)
# if (len(do_predict) - do_predict.sum()) > 0:
# logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.")
# self.do_predict += do_predict
# self.do_predict -= 1
# else:
# # use SGDOneClassSVM to increase speed?
# svm_params = self.freqai_config["feature_parameters"].get(
# "svm_params", {"shuffle": False, "nu": 0.1})
# self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit(
# self.data_dictionary["train_features"]
# )
# y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
# kept_points = np.where(y_pred == -1, 0, y_pred)
# # keep_index = np.where(y_pred == 1)
# outlier_pct = self.get_outlier_percentage(1 - kept_points)
# if outlier_pct:
# logger.warning(
# f"SVM detected {outlier_pct:.2f}% of the points as outliers. "
# f"Keeping original dataset."
# )
# self.svm_model = None
# return
# self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
# (y_pred == 1)
# ]
# self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
# (y_pred == 1)
# ]
# self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
# (y_pred == 1)
# ]
# logger.info(
# f"SVM tossed {len(y_pred) - kept_points.sum()}"
# f" train points from {len(y_pred)} total points."
# )
# # same for test data
# # TODO: This (and the part above) could be refactored into a separate function
# # to reduce code duplication
# if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0:
# y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
# kept_points = np.where(y_pred == -1, 0, y_pred)
# self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
# (y_pred == 1)
# ]
# self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][(
# y_pred == 1)]
# self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
# (y_pred == 1)
# ]
# logger.info(
# f"{self.pair}: SVM tossed {len(y_pred) - kept_points.sum()}"
# f" test points from {len(y_pred)} total points."
# )
# return
# def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None:
# """
# Use DBSCAN to cluster training data and remove "noisy" data (read outliers).
# User controls this via the config param `DBSCAN_outlier_pct` which indicates the
# pct of training data that they want to be considered outliers.
# :param predict: bool = If False (training), iterate to find the best hyper parameters
# to match user requested outlier percent target.
# If True (prediction), use the parameters determined from
# the previous training to estimate if the current prediction point
# is an outlier.
# """
# if predict:
# if not self.data['DBSCAN_eps']:
# return
# train_ft_df = self.data_dictionary['train_features']
# pred_ft_df = self.data_dictionary['prediction_features']
# num_preds = len(pred_ft_df)
# df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True)
# clustering = DBSCAN(eps=self.data['DBSCAN_eps'],
# min_samples=self.data['DBSCAN_min_samples'],
# n_jobs=self.thread_count
# ).fit(df)
# do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1)
# if (len(do_predict) - do_predict.sum()) > 0:
# logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions")
# self.do_predict += do_predict
# self.do_predict -= 1
# else:
# def normalise_distances(distances):
# normalised_distances = (distances - distances.min()) / \
# (distances.max() - distances.min())
# return normalised_distances
# def rotate_point(origin, point, angle):
# # rotate a point counterclockwise by a given angle (in radians)
# # around a given origin
# x = origin[0] + cos(angle) * (point[0] - origin[0]) - \
# sin(angle) * (point[1] - origin[1])
# y = origin[1] + sin(angle) * (point[0] - origin[0]) + \
# cos(angle) * (point[1] - origin[1])
# return (x, y)
# MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25)
# # measure pairwise distances to nearest neighbours
# neighbors = NearestNeighbors(
# n_neighbors=MinPts, n_jobs=self.thread_count)
# neighbors_fit = neighbors.fit(self.data_dictionary['train_features'])
# distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features'])
# distances = np.sort(distances, axis=0).mean(axis=1)
# normalised_distances = normalise_distances(distances)
# x_range = np.linspace(0, 1, len(distances))
# line = np.linspace(normalised_distances[0],
# normalised_distances[-1], len(normalised_distances))
# deflection = np.abs(normalised_distances - line)
# max_deflection_loc = np.where(deflection == deflection.max())[0][0]
# origin = x_range[max_deflection_loc], line[max_deflection_loc]
# point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc]
# rot_angle = np.pi / 4
# elbow_loc = rotate_point(origin, point, rot_angle)
# epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0]
# clustering = DBSCAN(eps=epsilon, min_samples=MinPts,
# n_jobs=int(self.thread_count)).fit(
# self.data_dictionary['train_features']
# )
# logger.info(f'DBSCAN found eps of {epsilon:.2f}.')
# self.data['DBSCAN_eps'] = epsilon
# self.data['DBSCAN_min_samples'] = MinPts
# dropped_points = np.where(clustering.labels_ == -1, 1, 0)
# outlier_pct = self.get_outlier_percentage(dropped_points)
# if outlier_pct:
# logger.warning(
# f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. "
# f"Keeping original dataset."
# )
# self.data['DBSCAN_eps'] = 0
# return
# self.data_dictionary['train_features'] = self.data_dictionary['train_features'][
# (clustering.labels_ != -1)
# ]
# self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
# (clustering.labels_ != -1)
# ]
# self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
# (clustering.labels_ != -1)
# ]
# logger.info(
# f"DBSCAN tossed {dropped_points.sum()}"
# f" train points from {len(clustering.labels_)}"
# )
# return
# def compute_inlier_metric(self, set_='train') -> None:
# """
# Compute inlier metric from backwards distance distributions.
# This metric defines how well features from a timepoint fit
# into previous timepoints.
# """
# def normalise(dataframe: DataFrame, key: str) -> DataFrame:
# if set_ == 'train':
# min_value = dataframe.min()
# max_value = dataframe.max()
# self.data[f'{key}_min'] = min_value
# self.data[f'{key}_max'] = max_value
# else:
# min_value = self.data[f'{key}_min']
# max_value = self.data[f'{key}_max']
# return (dataframe - min_value) / (max_value - min_value)
# no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"]
# if set_ == 'train':
# compute_df = copy.deepcopy(self.data_dictionary['train_features'])
# elif set_ == 'test':
# compute_df = copy.deepcopy(self.data_dictionary['test_features'])
# else:
# compute_df = copy.deepcopy(self.data_dictionary['prediction_features'])
# compute_df_reindexed = compute_df.reindex(
# index=np.flip(compute_df.index)
# )
# pairwise = pd.DataFrame(
# np.triu(
# pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count)
# ),
# columns=compute_df_reindexed.index,
# index=compute_df_reindexed.index
# )
# pairwise = pairwise.round(5)
# column_labels = [
# '{}{}'.format('d', i) for i in range(1, no_prev_pts + 1)
# ]
# distances = pd.DataFrame(
# columns=column_labels, index=compute_df.index
# )
# for index in compute_df.index[no_prev_pts:]:
# current_row = pairwise.loc[[index]]
# current_row_no_zeros = current_row.loc[
# :, (current_row != 0).any(axis=0)
# ]
# distances.loc[[index]] = current_row_no_zeros.iloc[
# :, :no_prev_pts
# ]
# distances = distances.replace([np.inf, -np.inf], np.nan)
# drop_index = pd.isnull(distances).any(axis=1)
# distances = distances[drop_index == 0]
# inliers = pd.DataFrame(index=distances.index)
# for key in distances.keys():
# current_distances = distances[key].dropna()
# current_distances = normalise(current_distances, key)
# if set_ == 'train':
# fit_params = stats.weibull_min.fit(current_distances)
# self.data[f'{key}_fit_params'] = fit_params
# else:
# fit_params = self.data[f'{key}_fit_params']
# quantiles = stats.weibull_min.cdf(current_distances, *fit_params)
# df_inlier = pd.DataFrame(
# {key: quantiles}, index=distances.index
# )
# inliers = pd.concat(
# [inliers, df_inlier], axis=1
# )
# inlier_metric = pd.DataFrame(
# data=inliers.sum(axis=1) / no_prev_pts,
# columns=['%-inlier_metric'],
# index=compute_df.index
# )
# inlier_metric = (2 * (inlier_metric - inlier_metric.min()) /
# (inlier_metric.max() - inlier_metric.min()) - 1)
# if set_ in ('train', 'test'):
# inlier_metric = inlier_metric.iloc[no_prev_pts:]
# compute_df = compute_df.iloc[no_prev_pts:]
# self.remove_beginning_points_from_data_dict(set_, no_prev_pts)
# self.data_dictionary[f'{set_}_features'] = pd.concat(
# [compute_df, inlier_metric], axis=1)
# else:
# self.data_dictionary['prediction_features'] = pd.concat(
# [compute_df, inlier_metric], axis=1)
# self.data_dictionary['prediction_features'].fillna(0, inplace=True)
# logger.info('Inlier metric computed and added to features.')
# return None
# def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10):
# features = self.data_dictionary[f'{set_}_features']
# weights = self.data_dictionary[f'{set_}_weights']
# labels = self.data_dictionary[f'{set_}_labels']
# self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:]
# self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:]
# self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:]
def add_noise_to_training_features(self) -> None:
"""
Add noise to train features to reduce the risk of overfitting.

View File

@ -7,9 +7,11 @@ from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Tuple
import datasieve.transforms as ds
import numpy as np
import pandas as pd
import psutil
from datasieve.pipeline import Pipeline
from numpy.typing import NDArray
from pandas import DataFrame
@ -23,8 +25,6 @@ from freqtrade.freqai.data_drawer import FreqaiDataDrawer
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.utils import get_tb_logger, plot_feature_importance, record_params
from freqtrade.strategy.interface import IStrategy
from datasieve.pipeline import Pipeline
import datasieve.transforms as ds
pd.options.mode.chained_assignment = None
@ -505,94 +505,39 @@ class IFreqaiModel(ABC):
"feature_engineering_* functions"
)
def data_cleaning_train(self, dk: FreqaiDataKitchen) -> None:
"""
Base data cleaning method for train.
Functions here improve/modify the input data by identifying outliers,
computing additional metrics, adding noise, reducing dimensionality etc.
"""
ft_params = self.freqai_info["feature_parameters"]
if ft_params.get('inlier_metric_window', 0):
dk.compute_inlier_metric(set_='train')
if self.freqai_info["data_split_parameters"]["test_size"] > 0:
dk.compute_inlier_metric(set_='test')
if ft_params.get(
"principal_component_analysis", False
):
dk.principal_component_analysis()
if ft_params.get("use_SVM_to_remove_outliers", False):
dk.use_SVM_to_remove_outliers(predict=False)
if ft_params.get("DI_threshold", 0):
dk.data["avg_mean_dist"] = dk.compute_distances()
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
if dk.pair in self.dd.old_DBSCAN_eps:
eps = self.dd.old_DBSCAN_eps[dk.pair]
else:
eps = None
dk.use_DBSCAN_to_remove_outliers(predict=False, eps=eps)
self.dd.old_DBSCAN_eps[dk.pair] = dk.data['DBSCAN_eps']
if self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0):
dk.add_noise_to_training_features()
def data_cleaning_predict(self, dk: FreqaiDataKitchen) -> None:
"""
Base data cleaning method for predict.
Functions here are complementary to the functions of data_cleaning_train.
"""
ft_params = self.freqai_info["feature_parameters"]
# ensure user is feeding the correct indicators to the model
self.check_if_feature_list_matches_strategy(dk)
if ft_params.get('inlier_metric_window', 0):
dk.compute_inlier_metric(set_='predict')
if ft_params.get(
"principal_component_analysis", False
):
dk.pca_transform(dk.data_dictionary['prediction_features'])
if ft_params.get("use_SVM_to_remove_outliers", False):
dk.use_SVM_to_remove_outliers(predict=True)
if ft_params.get("DI_threshold", 0):
dk.check_if_pred_in_training_spaces()
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
dk.use_DBSCAN_to_remove_outliers(predict=True)
def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
ft_params = self.freqai_info["feature_parameters"]
dk.pipeline = Pipeline([('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))])
dk.feature_pipeline = Pipeline(
[('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))])
if ft_params.get("principal_component_analysis", False):
dk.pipeline.steps += [('pca', ds.DataSievePCA())]
dk.pipeline.steps += [('post-pca-scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))]
dk.feature_pipeline.steps += [('pca', ds.DataSievePCA())]
dk.feature_pipeline.steps += [('post-pca-scaler',
ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))]
if ft_params.get("use_SVM_to_remove_outliers", False):
dk.pipeline.steps += [('svm', ds.SVMOutlierExtractor())]
svm_params = ft_params.get(
"svm_params", {"shuffle": False, "nu": 0.01})
dk.feature_pipeline.steps += [('svm', ds.SVMOutlierExtractor(**svm_params))]
if ft_params.get("DI_threshold", 0):
dk.pipeline.steps += [('di', ds.DissimilarityIndex())]
di = ft_params.get("DI_threshold", 0)
if di:
dk.feature_pipeline.steps += [('di', ds.DissimilarityIndex(di_threshold=di))]
if ft_params.get("use_DBSCAN_to_remove_outliers", False):
dk.pipeline.steps += [('dbscan', ds.DataSieveDBSCAN())]
dk.feature_pipeline.steps += [('dbscan', ds.DataSieveDBSCAN())]
dk.pipeline.fitparams = dk.pipeline._validate_fitparams({}, dk.pipeline.steps)
dk.feature_pipeline.fitparams = dk.feature_pipeline._validate_fitparams(
{}, dk.feature_pipeline.steps)
# if self.freqai_info["feature_parameters"].get('noise_standard_deviation', 0):
# dk.pipeline.extend(('noise', ds.Noise()))
def define_label_pipeline(self, dk: FreqaiDataKitchen) -> None:
dk.label_pipeline = Pipeline([('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))])
dk.label_pipeline = Pipeline([
('scaler', ds.DataSieveMinMaxScaler(feature_range=(-1, 1)))
])
def model_exists(self, dk: FreqaiDataKitchen) -> bool:
"""

View File

@ -103,13 +103,13 @@ class PyTorchTransformerRegressor(BasePyTorchRegressor):
"""
dk.find_features(unfiltered_df)
filtered_df, _ = dk.filter_features(
dk.data_dictionary["prediction_features"], _ = dk.filter_features(
unfiltered_df, dk.training_features_list, training_filter=False
)
filtered_df = dk.normalize_data_from_metadata(filtered_df)
dk.data_dictionary["prediction_features"] = filtered_df
self.data_cleaning_predict(dk)
dk.data_dictionary["prediction_features"], outliers, _ = dk.feature_pipeline.transform(
dk.data_dictionary["prediction_features"], outlier_check=True)
x = self.data_convertor.convert_x(
dk.data_dictionary["prediction_features"],
device=self.device
@ -131,7 +131,13 @@ class PyTorchTransformerRegressor(BasePyTorchRegressor):
yb = yb.cpu().squeeze()
pred_df = pd.DataFrame(yb.detach().numpy(), columns=dk.label_list)
pred_df = dk.denormalize_labels_from_metadata(pred_df)
pred_df, _, _ = dk.label_pipeline.inverse_transform(pred_df)
if self.freqai_info.get("DI_threshold", 0) > 0:
dk.DI_values = dk.feature_pipeline["di"].di_values
else:
dk.DI_values = np.zeros(len(outliers.index))
dk.do_predict = outliers.to_numpy()
if x.shape[1] > 1:
zeros_df = pd.DataFrame(np.zeros((x.shape[1] - len(pred_df), len(pred_df.columns))),

View File

@ -5,6 +5,7 @@ from xgboost import XGBRFRegressor
from freqtrade.freqai.base_models.BaseRegressionModel import BaseRegressionModel
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.tensorboard import TBCallback
logger = logging.getLogger(__name__)
@ -44,7 +45,10 @@ class XGBoostRFRegressor(BaseRegressionModel):
model = XGBRFRegressor(**self.model_training_parameters)
model.set_params(callbacks=[TBCallback(dk.data_path)], activate=self.activate_tensorboard)
model.fit(X=X, y=y, sample_weight=sample_weight, eval_set=eval_set,
sample_weight_eval_set=eval_weights, xgb_model=xgb_model)
# set the callbacks to empty so that we can serialize to disk later
model.set_params(callbacks=[])
return model

View File

@ -8,6 +8,9 @@ from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from freqtrade.freqai.tensorboard import TBCallback
# from datasieve.pipeline import Pipeline
# from freqtrade.freqai.transforms import FreqaiQuantileTransformer
logger = logging.getLogger(__name__)
@ -52,3 +55,23 @@ class XGBoostRegressor(BaseRegressionModel):
model.set_params(callbacks=[])
return model
# def define_data_pipeline(self, dk: FreqaiDataKitchen) -> None:
# """
# User defines their custom eature pipeline here (if they wish)
# """
# dk.feature_pipeline = Pipeline([
# ('qt', FreqaiQuantileTransformer(output_distribution='normal'))
# ])
# return
# def define_label_pipeline(self, dk: FreqaiDataKitchen) -> None:
# """
# User defines their custom label pipeline here (if they wish)
# """
# dk.label_pipeline = Pipeline([
# ('qt', FreqaiQuantileTransformer(output_distribution='normal'))
# ])
# return

View File

@ -0,0 +1,6 @@
from freqtrade.freqai.transforms.quantile_transform import FreqaiQuantileTransformer
__all__ = (
"FreqaiQuantileTransformer",
)

View File

@ -0,0 +1,28 @@
from sklearn.preprocessing import QuantileTransformer
class FreqaiQuantileTransformer(QuantileTransformer):
"""
A subclass of the SKLearn Quantile that ensures fit, transform, fit_transform and
inverse_transform all take the full set of params X, y, sample_weight required to
benefit from the DataSieve features.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def fit_transform(self, X, y=None, sample_weight=None, feature_list=None, **kwargs):
super().fit(X)
X = super().transform(X)
return X, y, sample_weight, feature_list
def fit(self, X, y=None, sample_weight=None, feature_list=None, **kwargs):
super().fit(X)
return X, y, sample_weight, feature_list
def transform(self, X, y=None, sample_weight=None, feature_list=None, **kwargs):
X = super().transform(X)
return X, y, sample_weight, feature_list
def inverse_transform(self, X, y=None, sample_weight=None, feature_list=None, **kwargs):
return super().inverse_transform(X), y, sample_weight, feature_list

View File

@ -34,7 +34,7 @@ class FreqaiModelResolver(IResolver):
Load the custom class from config parameter
:param config: configuration dictionary
"""
disallowed_models = ["BaseRegressionModel", "BaseTensorFlowModel"]
disallowed_models = ["BaseRegressionModel"]
freqaimodel_name = config.get("freqaimodel")
if not freqaimodel_name:

View File

@ -10,8 +10,8 @@ from freqtrade.data.dataprovider import DataProvider
from freqtrade.exceptions import OperationalException
from freqtrade.freqai.data_kitchen import FreqaiDataKitchen
from tests.conftest import get_patched_exchange # , log_has_re
from tests.freqai.conftest import (get_patched_data_kitchen, get_patched_freqai_strategy,
make_unfiltered_dataframe) # make_data_dictionary,
from tests.freqai.conftest import make_unfiltered_dataframe # make_data_dictionary,
from tests.freqai.conftest import get_patched_data_kitchen, get_patched_freqai_strategy
from tests.freqai.test_freqai_interface import is_mac
@ -72,68 +72,6 @@ def test_check_if_model_expired(mocker, freqai_conf):
shutil.rmtree(Path(dk.full_path))
# def test_use_DBSCAN_to_remove_outliers(mocker, freqai_conf, caplog):
# freqai = make_data_dictionary(mocker, freqai_conf)
# # freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 1})
# freqai.dk.use_DBSCAN_to_remove_outliers(predict=False)
# assert log_has_re(r"DBSCAN found eps of 1\.7\d\.", caplog)
# def test_compute_distances(mocker, freqai_conf):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai_conf['freqai']['feature_parameters'].update({"DI_threshold": 1})
# avg_mean_dist = freqai.dk.compute_distances()
# assert round(avg_mean_dist, 2) == 1.98
# def test_use_SVM_to_remove_outliers_and_outlier_protection(mocker, freqai_conf, caplog):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai_conf['freqai']['feature_parameters'].update({"outlier_protection_percentage": 0.1})
# freqai.dk.use_SVM_to_remove_outliers(predict=False)
# assert log_has_re(
# "SVM detected 7.83%",
# caplog,
# )
# def test_compute_inlier_metric(mocker, freqai_conf, caplog):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai_conf['freqai']['feature_parameters'].update({"inlier_metric_window": 10})
# freqai.dk.compute_inlier_metric(set_='train')
# assert log_has_re(
# "Inlier metric computed and added to features.",
# caplog,
# )
# def test_add_noise_to_training_features(mocker, freqai_conf):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai_conf['freqai']['feature_parameters'].update({"noise_standard_deviation": 0.1})
# freqai.dk.add_noise_to_training_features()
# def test_remove_beginning_points_from_data_dict(mocker, freqai_conf):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai.dk.remove_beginning_points_from_data_dict(set_='train')
# def test_principal_component_analysis(mocker, freqai_conf, caplog):
# freqai = make_data_dictionary(mocker, freqai_conf)
# freqai.dk.principal_component_analysis()
# assert log_has_re(
# "reduced feature dimension by",
# caplog,
# )
# def test_normalize_data(mocker, freqai_conf):
# freqai = make_data_dictionary(mocker, freqai_conf)
# data_dict = freqai.dk.data_dictionary
# freqai.dk.normalize_data(data_dict)
# assert any('_max' in entry for entry in freqai.dk.data.keys())
# assert any('_min' in entry for entry in freqai.dk.data.keys())
def test_filter_features(mocker, freqai_conf):
freqai, unfiltered_dataframe = make_unfiltered_dataframe(mocker, freqai_conf)
freqai.dk.find_features(unfiltered_dataframe)

View File

@ -74,6 +74,7 @@ def test_extract_data_and_train_model_Standard(mocker, freqai_conf, model, pca,
freqai_conf = make_rl_config(freqai_conf)
# test the RL guardrails
freqai_conf['freqai']['feature_parameters'].update({"use_SVM_to_remove_outliers": True})
freqai_conf['freqai']['feature_parameters'].update({"DI_threshold": 2})
freqai_conf['freqai']['data_split_parameters'].update({'shuffle': True})
if 'test_3ac' in model or 'test_4ac' in model:
@ -162,7 +163,6 @@ def test_extract_data_and_train_model_MultiTargets(mocker, freqai_conf, model, s
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_model.joblib").is_file()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_metadata.json").is_file()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_trained_df.pkl").is_file()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_svm_model.joblib").is_file()
assert len(freqai.dk.data['training_features_list']) == 14
shutil.rmtree(Path(freqai.dk.full_path))
@ -218,7 +218,6 @@ def test_extract_data_and_train_model_Classifiers(mocker, freqai_conf, model):
f"{freqai.dk.model_filename}_model{model_file_extension}").exists()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_metadata.json").exists()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_trained_df.pkl").exists()
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_svm_model.joblib").exists()
shutil.rmtree(Path(freqai.dk.full_path))
@ -283,9 +282,6 @@ def test_start_backtesting(mocker, freqai_conf, model, num_files, strat, caplog)
_, base_df = freqai.dd.get_base_and_corr_dataframes(sub_timerange, "LTC/BTC", freqai.dk)
df = base_df[freqai_conf["timeframe"]]
for i in range(5):
df[f'%-constant_{i}'] = i
metadata = {"pair": "LTC/BTC"}
freqai.dk.set_paths('LTC/BTC', None)
freqai.start_backtesting(df, metadata, freqai.dk, strategy)
@ -293,14 +289,6 @@ def test_start_backtesting(mocker, freqai_conf, model, num_files, strat, caplog)
assert len(model_folders) == num_files
Trade.use_db = True
assert log_has_re(
"Removed features ",
caplog,
)
assert log_has_re(
"Removed 5 features from prediction features, ",
caplog,
)
Backtesting.cleanup()
shutil.rmtree(Path(freqai.dk.full_path))
@ -425,36 +413,6 @@ def test_backtesting_fit_live_predictions(mocker, freqai_conf, caplog):
shutil.rmtree(Path(freqai.dk.full_path))
def test_principal_component_analysis(mocker, freqai_conf):
freqai_conf.update({"timerange": "20180110-20180130"})
freqai_conf.get("freqai", {}).get("feature_parameters", {}).update(
{"princpial_component_analysis": "true"})
strategy = get_patched_freqai_strategy(mocker, freqai_conf)
exchange = get_patched_exchange(mocker, freqai_conf)
strategy.dp = DataProvider(freqai_conf, exchange)
strategy.freqai_info = freqai_conf.get("freqai", {})
freqai = strategy.freqai
freqai.live = True
freqai.dk = FreqaiDataKitchen(freqai_conf)
freqai.dk.live = True
timerange = TimeRange.parse_timerange("20180110-20180130")
freqai.dd.load_all_pair_histories(timerange, freqai.dk)
freqai.dd.pair_dict = MagicMock()
data_load_timerange = TimeRange.parse_timerange("20180110-20180130")
new_timerange = TimeRange.parse_timerange("20180120-20180130")
freqai.dk.set_paths('ADA/BTC', None)
freqai.extract_data_and_train_model(
new_timerange, "ADA/BTC", strategy, freqai.dk, data_load_timerange)
assert Path(freqai.dk.data_path / f"{freqai.dk.model_filename}_pca_object.pkl")
shutil.rmtree(Path(freqai.dk.full_path))
def test_plot_feature_importance(mocker, freqai_conf):
from freqtrade.freqai.utils import plot_feature_importance