freqtrade_origin/freqtrade/freqai/data_kitchen.py

1492 lines
62 KiB
Python
Raw Normal View History

import copy
2022-05-04 15:53:40 +00:00
import logging
import shutil
from datetime import datetime, timedelta, timezone
from math import cos, sin
from pathlib import Path
from typing import Any, Dict, List, Tuple
import numpy as np
2022-05-06 14:20:52 +00:00
import numpy.typing as npt
import pandas as pd
from pandas import DataFrame
2022-09-06 18:10:12 +00:00
from scipy import stats
from sklearn import linear_model
2022-08-04 15:00:59 +00:00
from sklearn.cluster import DBSCAN
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.model_selection import train_test_split
from sklearn.neighbors import NearestNeighbors
from freqtrade.configuration import TimeRange
2022-09-18 11:20:36 +00:00
from freqtrade.constants import Config
from freqtrade.exceptions import OperationalException
from freqtrade.exchange import timeframe_to_seconds
from freqtrade.strategy.interface import IStrategy
2022-08-04 15:00:59 +00:00
SECONDS_IN_DAY = 86400
SECONDS_IN_HOUR = 3600
2022-05-04 15:53:40 +00:00
logger = logging.getLogger(__name__)
class FreqaiDataKitchen:
"""
Class designed to analyze data for a single pair. Employed by the IFreqaiModel class.
Functionalities include holding, saving, loading, and analyzing the data.
2022-08-14 18:24:29 +00:00
This object is not persistent, it is reinstantiated for each coin, each time the coin
model needs to be inferenced or trained.
Record of contribution:
FreqAI was developed by a group of individuals who all contributed specific skillsets to the
project.
Conception and software development:
Robert Caulk @robcaulk
Theoretical brainstorming:
Elin Törnquist @th0rntwig
Code review, software architecture brainstorming:
@xmatthias
Beta testing and bug reporting:
@bloodhunter4rc, Salah Lamkadem @ikonx, @ken11o2, @longyu, @paranoidandy, @smidelis, @smarm
2022-08-14 18:24:29 +00:00
Juha Nykänen @suikula, Wagner Costa @wagnercosta, Johan Vlugt @Jooopieeert
"""
2022-07-03 08:59:38 +00:00
def __init__(
self,
2022-09-18 11:20:36 +00:00
config: Config,
2022-07-03 08:59:38 +00:00
live: bool = False,
pair: str = "",
):
self.data: Dict[str, Any] = {}
self.data_dictionary: Dict[str, DataFrame] = {}
self.config = config
2022-08-06 12:55:46 +00:00
self.freqai_config: Dict[str, Any] = config["freqai"]
self.full_df: DataFrame = DataFrame()
self.append_df: DataFrame = DataFrame()
self.data_path = Path()
2022-07-02 16:09:38 +00:00
self.label_list: List = []
self.training_features_list: List = []
self.model_filename: str = ""
self.backtesting_results_path = Path()
self.backtest_predictions_folder: str = "backtesting_predictions"
self.live = live
self.pair = pair
self.svm_model: linear_model.SGDOneClassSVM = None
2022-08-06 12:55:46 +00:00
self.keras: bool = self.freqai_config.get("keras", False)
self.set_all_pairs()
self.backtest_live_models = config.get("freqai_backtest_live_models", False)
if not self.live:
self.full_path = self.get_full_models_path(self.config)
if self.backtest_live_models:
if self.pair:
self.set_timerange_from_ready_models()
(self.training_timeranges,
self.backtesting_timeranges) = self.split_timerange_live_models()
else:
self.full_timerange = self.create_fulltimerange(
self.config["timerange"], self.freqai_config.get("train_period_days", 0)
)
(self.training_timeranges, self.backtesting_timeranges) = self.split_timerange(
self.full_timerange,
config["freqai"]["train_period_days"],
config["freqai"]["backtest_period_days"],
)
self.data['extra_returns_per_train'] = self.freqai_config.get('extra_returns_per_train', {})
self.thread_count = self.freqai_config.get("data_kitchen_thread_count", -1)
self.train_dates: DataFrame = pd.DataFrame()
self.unique_classes: Dict[str, list] = {}
2022-08-10 13:16:50 +00:00
self.unique_class_list: list = []
self.backtest_live_models_data: Dict[str, Any] = {}
2022-07-03 08:59:38 +00:00
def set_paths(
self,
pair: str,
trained_timestamp: int = None,
) -> None:
"""
Set the paths to the data for the present coin/botloop
2022-10-10 12:15:30 +00:00
:param metadata: dict = strategy furnished pair metadata
:param trained_timestamp: int = timestamp of most recent training
"""
self.full_path = self.get_full_models_path(self.config)
2022-07-03 08:59:38 +00:00
self.data_path = Path(
self.full_path
/ f"sub-train-{pair.split('/')[0]}_{trained_timestamp}"
2022-07-03 08:59:38 +00:00
)
return
def make_train_test_datasets(
self, filtered_dataframe: DataFrame, labels: DataFrame
) -> Dict[Any, Any]:
"""
Given the dataframe for the full history for training, split the data into
training and test data according to user specified parameters in configuration
file.
2022-10-10 12:15:30 +00:00
:param filtered_dataframe: cleaned dataframe ready to be split.
:param labels: cleaned labels ready to be split.
"""
feat_dict = self.freqai_config["feature_parameters"]
2022-09-29 22:33:08 +00:00
if 'shuffle' not in self.freqai_config['data_split_parameters']:
self.freqai_config["data_split_parameters"].update({'shuffle': False})
2022-05-06 14:20:52 +00:00
weights: npt.ArrayLike
if feat_dict.get("weight_factor", 0) > 0:
weights = self.set_weights_higher_recent(len(filtered_dataframe))
else:
weights = np.ones(len(filtered_dataframe))
if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
(
train_features,
test_features,
train_labels,
test_labels,
train_weights,
test_weights,
) = train_test_split(
filtered_dataframe[: filtered_dataframe.shape[0]],
labels,
weights,
**self.config["freqai"]["data_split_parameters"],
)
else:
test_labels = np.zeros(2)
test_features = pd.DataFrame()
test_weights = np.zeros(2)
train_features = filtered_dataframe
train_labels = labels
train_weights = weights
# Simplest way to reverse the order of training and test data:
if self.freqai_config['feature_parameters'].get('reverse_train_test_order', False):
return self.build_data_dictionary(
test_features, train_features, test_labels,
train_labels, test_weights, train_weights
)
else:
return self.build_data_dictionary(
train_features, test_features, train_labels,
test_labels, train_weights, test_weights
)
def filter_features(
self,
2022-09-07 15:47:27 +00:00
unfiltered_df: DataFrame,
training_feature_list: List,
2022-07-02 16:09:38 +00:00
label_list: List = list(),
training_filter: bool = True,
) -> Tuple[DataFrame, DataFrame]:
"""
2022-07-02 16:09:38 +00:00
Filter the unfiltered dataframe to extract the user requested features/labels and properly
remove all NaNs. Any row with a NaN is removed from training dataset or replaced with
0s in the prediction dataset. However, prediction dataset do_predict will reflect any
row that had a NaN and will shield user from that prediction.
2022-10-10 12:15:30 +00:00
:param unfiltered_df: the full dataframe for the present training period
:param training_feature_list: list, the training feature list constructed by
self.build_feature_list() according to user specified
parameters in the configuration file.
:param labels: the labels for the dataset
:param training_filter: boolean which lets the function know if it is training data or
prediction data to be filtered.
:returns:
2022-09-07 15:47:27 +00:00
:filtered_df: dataframe cleaned of NaNs and only containing the user
requested feature set.
:labels: labels cleaned of NaNs.
"""
2022-09-07 15:47:27 +00:00
filtered_df = unfiltered_df.filter(training_feature_list, axis=1)
filtered_df = filtered_df.replace([np.inf, -np.inf], np.nan)
2022-07-02 16:09:38 +00:00
drop_index = pd.isnull(filtered_df).any(axis=1) # get the rows that have NaNs,
drop_index = drop_index.replace(True, 1).replace(False, 0) # pep8 requirement.
2022-08-06 12:55:46 +00:00
if (training_filter):
const_cols = list((filtered_df.nunique() == 1).loc[lambda x: x].index)
if const_cols:
filtered_df = filtered_df.filter(filtered_df.columns.difference(const_cols))
2022-10-23 14:24:02 +00:00
self.data['constant_features_list'] = const_cols
logger.warning(f"Removed features {const_cols} with constant values.")
else:
2022-10-23 14:24:02 +00:00
self.data['constant_features_list'] = []
2022-08-06 12:55:46 +00:00
# we don't care about total row number (total no. datapoints) in training, we only care
# about removing any row with NaNs
# if labels has multiple columns (user wants to train multiple modelEs), we detect here
2022-09-07 15:47:27 +00:00
labels = unfiltered_df.filter(label_list, axis=1)
drop_index_labels = pd.isnull(labels).any(axis=1)
drop_index_labels = drop_index_labels.replace(True, 1).replace(False, 0)
2022-09-07 15:47:27 +00:00
dates = unfiltered_df['date']
filtered_df = filtered_df[
(drop_index == 0) & (drop_index_labels == 0)
] # dropping values
labels = labels[
(drop_index == 0) & (drop_index_labels == 0)
] # assuming the labels depend entirely on the dataframe here.
self.train_dates = dates[
(drop_index == 0) & (drop_index_labels == 0)
]
logger.info(
2022-09-07 15:47:27 +00:00
f"dropped {len(unfiltered_df) - len(filtered_df)} training points"
f" due to NaNs in populated dataset {len(unfiltered_df)}."
)
2022-09-07 15:47:27 +00:00
if (1 - len(filtered_df) / len(unfiltered_df)) > 0.1 and self.live:
worst_indicator = str(unfiltered_df.count().idxmin())
logger.warning(
2022-09-07 15:47:27 +00:00
f" {(1 - len(filtered_df)/len(unfiltered_df)) * 100:.0f} percent "
" of training data dropped due to NaNs, model may perform inconsistent "
f"with expectations. Verify {worst_indicator}"
)
self.data["filter_drop_index_training"] = drop_index
else:
2022-11-04 12:02:28 +00:00
if 'constant_features_list' in self.data and len(self.data['constant_features_list']):
2022-10-15 21:30:12 +00:00
filtered_df = self.check_pred_labels(filtered_df)
# we are backtesting so we need to preserve row number to send back to strategy,
# so now we use do_predict to avoid any prediction based on a NaN
drop_index = pd.isnull(filtered_df).any(axis=1)
self.data["filter_drop_index_prediction"] = drop_index
2022-09-07 15:47:27 +00:00
filtered_df.fillna(0, inplace=True)
# replacing all NaNs with zeros to avoid issues in 'prediction', but any prediction
# that was based on a single NaN is ultimately protected from buys with do_predict
drop_index = ~drop_index
self.do_predict = np.array(drop_index.replace(True, 1).replace(False, 0))
if (len(self.do_predict) - self.do_predict.sum()) > 0:
logger.info(
"dropped %s of %s prediction data points due to NaNs.",
len(self.do_predict) - self.do_predict.sum(),
2022-09-07 15:47:27 +00:00
len(filtered_df),
)
2022-07-02 16:09:38 +00:00
labels = []
2022-09-07 15:47:27 +00:00
return filtered_df, labels
def build_data_dictionary(
self,
train_df: DataFrame,
test_df: DataFrame,
train_labels: DataFrame,
test_labels: DataFrame,
train_weights: Any,
test_weights: Any,
) -> Dict:
self.data_dictionary = {
"train_features": train_df,
"test_features": test_df,
"train_labels": train_labels,
"test_labels": test_labels,
"train_weights": train_weights,
"test_weights": test_weights,
"train_dates": self.train_dates
}
return self.data_dictionary
def normalize_data(self, data_dictionary: Dict) -> Dict[Any, Any]:
"""
Normalize all data in the data_dictionary according to the training dataset
2022-10-10 12:15:30 +00:00
:param data_dictionary: dictionary containing the cleaned and
split training/test data/labels
:returns:
:data_dictionary: updated dictionary with standardized values.
"""
2022-09-01 19:51:33 +00:00
# standardize the data by training stats
train_max = data_dictionary["train_features"].max()
train_min = data_dictionary["train_features"].min()
2022-07-03 08:59:38 +00:00
data_dictionary["train_features"] = (
2 * (data_dictionary["train_features"] - train_min) / (train_max - train_min) - 1
)
data_dictionary["test_features"] = (
2 * (data_dictionary["test_features"] - train_min) / (train_max - train_min) - 1
)
for item in train_max.keys():
self.data[item + "_max"] = train_max[item]
self.data[item + "_min"] = train_min[item]
for item in data_dictionary["train_labels"].keys():
if data_dictionary["train_labels"][item].dtype == object:
continue
train_labels_max = data_dictionary["train_labels"][item].max()
train_labels_min = data_dictionary["train_labels"][item].min()
2022-07-24 06:42:50 +00:00
data_dictionary["train_labels"][item] = (
2022-07-23 14:14:13 +00:00
2
* (data_dictionary["train_labels"][item] - train_labels_min)
2022-07-23 14:14:13 +00:00
/ (train_labels_max - train_labels_min)
- 1
)
if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
data_dictionary["test_labels"][item] = (
2
* (data_dictionary["test_labels"][item] - train_labels_min)
/ (train_labels_max - train_labels_min)
- 1
)
2022-07-23 14:14:13 +00:00
2022-09-03 17:48:30 +00:00
self.data[f"{item}_max"] = train_labels_max
self.data[f"{item}_min"] = train_labels_min
return data_dictionary
def normalize_single_dataframe(self, df: DataFrame) -> DataFrame:
train_max = df.max()
train_min = df.min()
df = (
2 * (df - train_min) / (train_max - train_min) - 1
)
for item in train_max.keys():
self.data[item + "_max"] = train_max[item]
self.data[item + "_min"] = train_min[item]
return df
def normalize_data_from_metadata(self, df: DataFrame) -> DataFrame:
"""
Normalize a set of data using the mean and standard deviation from
the associated training data.
2022-08-06 12:55:46 +00:00
:param df: Dataframe to be standardized
"""
train_max = [None] * len(df.keys())
train_min = [None] * len(df.keys())
for i, item in enumerate(df.keys()):
train_max[i] = self.data[f"{item}_max"]
train_min[i] = self.data[f"{item}_min"]
train_max_series = pd.Series(train_max, index=df.keys())
train_min_series = pd.Series(train_min, index=df.keys())
df = (
2 * (df - train_min_series) / (train_max_series - train_min_series) - 1
)
return df
def denormalize_labels_from_metadata(self, df: DataFrame) -> DataFrame:
"""
Denormalize a set of data using the mean and standard deviation from
the associated training data.
2022-08-06 12:55:46 +00:00
:param df: Dataframe of predictions to be denormalized
"""
for label in df.columns:
2022-08-10 13:16:50 +00:00
if df[label].dtype == object or label in self.unique_class_list:
continue
df[label] = (
(df[label] + 1)
* (self.data[f"{label}_max"] - self.data[f"{label}_min"])
/ 2
) + self.data[f"{label}_min"]
return df
def split_timerange(
self, tr: str, train_split: int = 28, bt_split: float = 7
) -> Tuple[list, list]:
"""
Function which takes a single time range (tr) and splits it
into sub timeranges to train and backtest on based on user input
tr: str, full timerange to train on
train_split: the period length for the each training (days). Specified in user
configuration file
bt_split: the backtesting length (days). Specified in user configuration file
"""
2022-07-19 14:16:44 +00:00
if not isinstance(train_split, int) or train_split < 1:
raise OperationalException(
2022-08-06 12:55:46 +00:00
f"train_period_days must be an integer greater than 0. Got {train_split}."
2022-07-19 14:16:44 +00:00
)
train_period_days = train_split * SECONDS_IN_DAY
bt_period = bt_split * SECONDS_IN_DAY
full_timerange = TimeRange.parse_timerange(tr)
config_timerange = TimeRange.parse_timerange(self.config["timerange"])
2022-05-17 17:50:06 +00:00
if config_timerange.stopts == 0:
2022-07-03 08:59:38 +00:00
config_timerange.stopts = int(
datetime.now(tz=timezone.utc).timestamp()
2022-07-03 08:59:38 +00:00
)
timerange_train = copy.deepcopy(full_timerange)
timerange_backtest = copy.deepcopy(full_timerange)
tr_training_list = []
tr_backtesting_list = []
tr_training_list_timerange = []
tr_backtesting_list_timerange = []
first = True
while True:
if not first:
timerange_train.startts = timerange_train.startts + int(bt_period)
timerange_train.stopts = timerange_train.startts + train_period_days
first = False
start = datetime.fromtimestamp(timerange_train.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(timerange_train.stopts, tz=timezone.utc)
tr_training_list.append(start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d"))
tr_training_list_timerange.append(copy.deepcopy(timerange_train))
# associated backtest period
2022-05-06 11:06:54 +00:00
timerange_backtest.startts = timerange_train.stopts
timerange_backtest.stopts = timerange_backtest.startts + int(bt_period)
if timerange_backtest.stopts > config_timerange.stopts:
timerange_backtest.stopts = config_timerange.stopts
start = datetime.fromtimestamp(timerange_backtest.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(timerange_backtest.stopts, tz=timezone.utc)
tr_backtesting_list.append(start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d"))
tr_backtesting_list_timerange.append(copy.deepcopy(timerange_backtest))
# ensure we are predicting on exactly same amount of data as requested by user defined
# --timerange
if timerange_backtest.stopts == config_timerange.stopts:
break
# print(tr_training_list, tr_backtesting_list)
return tr_training_list_timerange, tr_backtesting_list_timerange
def split_timerange_live_models(
self
) -> Tuple[list, list]:
tr_backtesting_list_timerange = []
asset = self.pair.split("/")[0]
if asset not in self.backtest_live_models_data["assets_end_dates"]:
raise OperationalException(
f"Model not available for pair {self.pair}. "
"Please, try again after removing this pair from the configuration file."
)
asset_data = self.backtest_live_models_data["assets_end_dates"][asset]
backtesting_timerange = self.backtest_live_models_data["backtesting_timerange"]
model_end_dates = [x for x in asset_data]
model_end_dates.append(backtesting_timerange.stopts)
model_end_dates.sort()
for index, item in enumerate(model_end_dates):
if len(model_end_dates) > (index + 1):
tr_to_add = TimeRange("date", "date", item, model_end_dates[index + 1])
tr_backtesting_list_timerange.append(tr_to_add)
return tr_backtesting_list_timerange, tr_backtesting_list_timerange
def slice_dataframe(self, timerange: TimeRange, df: DataFrame) -> DataFrame:
"""
Given a full dataframe, extract the user desired window
2022-08-06 12:55:46 +00:00
:param tr: timerange string that we wish to extract from df
:param df: Dataframe containing all candles to run the entire backtest. Here
it is sliced down to just the present training period.
"""
start = datetime.fromtimestamp(timerange.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(timerange.stopts, tz=timezone.utc)
df = df.loc[df["date"] >= start, :]
if not self.live:
df = df.loc[df["date"] < stop, :]
return df
2022-10-08 14:15:48 +00:00
def check_pred_labels(self, df_predictions: DataFrame) -> DataFrame:
2022-10-06 17:26:33 +00:00
"""
Check that prediction feature labels match training feature labels.
2022-10-24 04:53:42 +00:00
:param df_predictions: incoming predictions
2022-10-06 17:26:33 +00:00
"""
2022-10-23 14:24:02 +00:00
constant_labels = self.data['constant_features_list']
df_predictions = df_predictions.filter(
df_predictions.columns.difference(constant_labels)
)
logger.warning(
f"Removed {len(constant_labels)} features from prediction features, "
f"these were considered constant values during most recent training."
)
2022-10-08 14:15:48 +00:00
return df_predictions
2022-10-06 17:26:33 +00:00
def principal_component_analysis(self) -> None:
"""
Performs Principal Component Analysis on the data for dimensionality reduction
and outlier detection (see self.remove_outliers())
No parameters or returns, it acts on the data_dictionary held by the DataHandler.
"""
from sklearn.decomposition import PCA # avoid importing if we dont need it
2022-09-01 19:51:33 +00:00
pca = PCA(0.999)
pca = pca.fit(self.data_dictionary["train_features"])
2022-09-01 19:51:33 +00:00
n_keep_components = pca.n_components_
self.data["n_kept_components"] = n_keep_components
2022-09-01 19:51:33 +00:00
n_components = self.data_dictionary["train_features"].shape[1]
logger.info("reduced feature dimension by %s", n_components - n_keep_components)
2022-09-01 19:51:33 +00:00
logger.info("explained variance %f", np.sum(pca.explained_variance_ratio_))
2022-09-01 19:51:33 +00:00
train_components = pca.transform(self.data_dictionary["train_features"])
self.data_dictionary["train_features"] = pd.DataFrame(
data=train_components,
columns=["PC" + str(i) for i in range(0, n_keep_components)],
index=self.data_dictionary["train_features"].index,
)
2022-09-01 19:51:33 +00:00
# normalsing transformed training features
self.data_dictionary["train_features"] = self.normalize_single_dataframe(
2022-09-01 19:51:33 +00:00
self.data_dictionary["train_features"])
# keeping a copy of the non-transformed features so we can check for errors during
# model load from disk
2022-07-03 08:59:38 +00:00
self.data["training_features_list_raw"] = copy.deepcopy(self.training_features_list)
self.training_features_list = self.data_dictionary["train_features"].columns
if self.freqai_config.get('data_split_parameters', {}).get('test_size', 0.1) != 0:
2022-09-01 19:51:33 +00:00
test_components = pca.transform(self.data_dictionary["test_features"])
self.data_dictionary["test_features"] = pd.DataFrame(
data=test_components,
columns=["PC" + str(i) for i in range(0, n_keep_components)],
index=self.data_dictionary["test_features"].index,
)
2022-09-01 19:51:33 +00:00
# normalise transformed test feature to transformed training features
self.data_dictionary["test_features"] = self.normalize_data_from_metadata(
self.data_dictionary["test_features"])
self.data["n_kept_components"] = n_keep_components
2022-09-01 19:51:33 +00:00
self.pca = pca
2022-07-03 08:59:38 +00:00
logger.info(f"PCA reduced total features from {n_components} to {n_keep_components}")
if not self.data_path.is_dir():
self.data_path.mkdir(parents=True, exist_ok=True)
return None
def pca_transform(self, filtered_dataframe: DataFrame) -> None:
"""
Use an existing pca transform to transform data into components
2022-10-10 12:15:30 +00:00
:param filtered_dataframe: DataFrame = the cleaned dataframe
"""
pca_components = self.pca.transform(filtered_dataframe)
self.data_dictionary["prediction_features"] = pd.DataFrame(
data=pca_components,
columns=["PC" + str(i) for i in range(0, self.data["n_kept_components"])],
index=filtered_dataframe.index,
)
2022-09-01 19:51:33 +00:00
# normalise transformed predictions to transformed training features
self.data_dictionary["prediction_features"] = self.normalize_data_from_metadata(
self.data_dictionary["prediction_features"])
def compute_distances(self) -> float:
"""
Compute distances between each training point and every other training
point. This metric defines the neighborhood of trained data and is used
for prediction confidence in the Dissimilarity Index
"""
# logger.info("computing average mean distance for all training points")
2022-08-04 15:00:59 +00:00
pairwise = pairwise_distances(
self.data_dictionary["train_features"], n_jobs=self.thread_count)
# remove the diagonal distances which are itself distances ~0
np.fill_diagonal(pairwise, np.NaN)
pairwise = pairwise.reshape(-1, 1)
avg_mean_dist = pairwise[~np.isnan(pairwise)].mean()
2022-08-19 10:39:08 +00:00
return avg_mean_dist
2022-08-27 10:44:55 +00:00
def get_outlier_percentage(self, dropped_pts: npt.NDArray) -> float:
2022-08-26 21:05:07 +00:00
"""
Check if more than X% of points werer dropped during outlier detection.
"""
outlier_protection_pct = self.freqai_config["feature_parameters"].get(
"outlier_protection_percentage", 30)
2022-08-28 10:11:29 +00:00
outlier_pct = (dropped_pts.sum() / len(dropped_pts)) * 100
2022-08-26 21:05:07 +00:00
if outlier_pct >= outlier_protection_pct:
return outlier_pct
else:
return 0.0
def use_SVM_to_remove_outliers(self, predict: bool) -> None:
"""
Build/inference a Support Vector Machine to detect outliers
in training data and prediction
2022-10-10 12:15:30 +00:00
:param predict: bool = If true, inference an existing SVM model, else construct one
"""
if self.keras:
logger.warning(
"SVM outlier removal not currently supported for Keras based models. "
"Skipping user requested function."
)
if predict:
self.do_predict = np.ones(len(self.data_dictionary["prediction_features"]))
return
if predict:
if not self.svm_model:
logger.warning("No svm model available for outlier removal")
return
y_pred = self.svm_model.predict(self.data_dictionary["prediction_features"])
do_predict = np.where(y_pred == -1, 0, y_pred)
if (len(do_predict) - do_predict.sum()) > 0:
2022-08-06 12:55:46 +00:00
logger.info(f"SVM tossed {len(do_predict) - do_predict.sum()} predictions.")
self.do_predict += do_predict
self.do_predict -= 1
else:
# use SGDOneClassSVM to increase speed?
svm_params = self.freqai_config["feature_parameters"].get(
"svm_params", {"shuffle": False, "nu": 0.1})
self.svm_model = linear_model.SGDOneClassSVM(**svm_params).fit(
2022-07-03 08:59:38 +00:00
self.data_dictionary["train_features"]
)
y_pred = self.svm_model.predict(self.data_dictionary["train_features"])
2022-08-28 10:11:29 +00:00
kept_points = np.where(y_pred == -1, 0, y_pred)
# keep_index = np.where(y_pred == 1)
2022-08-28 10:11:29 +00:00
outlier_pct = self.get_outlier_percentage(1 - kept_points)
if outlier_pct:
2022-08-26 21:05:07 +00:00
logger.warning(
2022-08-28 10:11:29 +00:00
f"SVM detected {outlier_pct:.2f}% of the points as outliers. "
2022-08-26 21:05:07 +00:00
f"Keeping original dataset."
)
2022-08-30 10:54:39 +00:00
self.svm_model = None
2022-08-26 21:05:07 +00:00
return
2022-07-03 08:59:38 +00:00
self.data_dictionary["train_features"] = self.data_dictionary["train_features"][
(y_pred == 1)
]
self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
(y_pred == 1)
]
self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
(y_pred == 1)
]
logger.info(
2022-08-28 10:11:29 +00:00
f"SVM tossed {len(y_pred) - kept_points.sum()}"
f" train points from {len(y_pred)} total points."
)
# same for test data
2022-08-06 12:55:46 +00:00
# TODO: This (and the part above) could be refactored into a separate function
# to reduce code duplication
if self.freqai_config['data_split_parameters'].get('test_size', 0.1) != 0:
y_pred = self.svm_model.predict(self.data_dictionary["test_features"])
2022-08-28 10:11:29 +00:00
kept_points = np.where(y_pred == -1, 0, y_pred)
self.data_dictionary["test_features"] = self.data_dictionary["test_features"][
(y_pred == 1)
]
self.data_dictionary["test_labels"] = self.data_dictionary["test_labels"][(
y_pred == 1)]
self.data_dictionary["test_weights"] = self.data_dictionary["test_weights"][
(y_pred == 1)
]
logger.info(
2022-08-28 10:11:29 +00:00
f"SVM tossed {len(y_pred) - kept_points.sum()}"
f" test points from {len(y_pred)} total points."
)
return
def use_DBSCAN_to_remove_outliers(self, predict: bool, eps=None) -> None:
"""
Use DBSCAN to cluster training data and remove "noisy" data (read outliers).
User controls this via the config param `DBSCAN_outlier_pct` which indicates the
pct of training data that they want to be considered outliers.
2022-10-10 12:15:30 +00:00
:param predict: bool = If False (training), iterate to find the best hyper parameters
to match user requested outlier percent target.
If True (prediction), use the parameters determined from
the previous training to estimate if the current prediction point
is an outlier.
"""
if predict:
2022-08-30 10:54:39 +00:00
if not self.data['DBSCAN_eps']:
return
train_ft_df = self.data_dictionary['train_features']
pred_ft_df = self.data_dictionary['prediction_features']
num_preds = len(pred_ft_df)
df = pd.concat([train_ft_df, pred_ft_df], axis=0, ignore_index=True)
2022-08-06 12:55:46 +00:00
clustering = DBSCAN(eps=self.data['DBSCAN_eps'],
min_samples=self.data['DBSCAN_min_samples'],
n_jobs=self.thread_count
).fit(df)
do_predict = np.where(clustering.labels_[-num_preds:] == -1, 0, 1)
if (len(do_predict) - do_predict.sum()) > 0:
2022-08-06 12:55:46 +00:00
logger.info(f"DBSCAN tossed {len(do_predict) - do_predict.sum()} predictions")
self.do_predict += do_predict
self.do_predict -= 1
else:
2022-08-04 15:00:59 +00:00
def normalise_distances(distances):
normalised_distances = (distances - distances.min()) / \
(distances.max() - distances.min())
return normalised_distances
def rotate_point(origin, point, angle):
# rotate a point counterclockwise by a given angle (in radians)
# around a given origin
x = origin[0] + cos(angle) * (point[0] - origin[0]) - \
sin(angle) * (point[1] - origin[1])
y = origin[1] + sin(angle) * (point[0] - origin[0]) + \
cos(angle) * (point[1] - origin[1])
return (x, y)
2022-08-26 16:57:27 +00:00
MinPts = int(len(self.data_dictionary['train_features'].index) * 0.25)
# measure pairwise distances to nearest neighbours
neighbors = NearestNeighbors(
n_neighbors=MinPts, n_jobs=self.thread_count)
neighbors_fit = neighbors.fit(self.data_dictionary['train_features'])
distances, _ = neighbors_fit.kneighbors(self.data_dictionary['train_features'])
distances = np.sort(distances, axis=0).mean(axis=1)
normalised_distances = normalise_distances(distances)
x_range = np.linspace(0, 1, len(distances))
line = np.linspace(normalised_distances[0],
normalised_distances[-1], len(normalised_distances))
deflection = np.abs(normalised_distances - line)
max_deflection_loc = np.where(deflection == deflection.max())[0][0]
origin = x_range[max_deflection_loc], line[max_deflection_loc]
point = x_range[max_deflection_loc], normalised_distances[max_deflection_loc]
rot_angle = np.pi / 4
elbow_loc = rotate_point(origin, point, rot_angle)
epsilon = elbow_loc[1] * (distances[-1] - distances[0]) + distances[0]
clustering = DBSCAN(eps=epsilon, min_samples=MinPts,
n_jobs=int(self.thread_count)).fit(
self.data_dictionary['train_features']
)
2022-08-04 15:00:59 +00:00
logger.info(f'DBSCAN found eps of {epsilon:.2f}.')
self.data['DBSCAN_eps'] = epsilon
self.data['DBSCAN_min_samples'] = MinPts
dropped_points = np.where(clustering.labels_ == -1, 1, 0)
2022-08-28 10:11:29 +00:00
outlier_pct = self.get_outlier_percentage(dropped_points)
if outlier_pct:
2022-08-26 21:05:07 +00:00
logger.warning(
2022-08-28 10:11:29 +00:00
f"DBSCAN detected {outlier_pct:.2f}% of the points as outliers. "
2022-08-26 21:05:07 +00:00
f"Keeping original dataset."
)
2022-08-30 10:54:39 +00:00
self.data['DBSCAN_eps'] = 0
2022-08-26 21:05:07 +00:00
return
self.data_dictionary['train_features'] = self.data_dictionary['train_features'][
(clustering.labels_ != -1)
]
self.data_dictionary["train_labels"] = self.data_dictionary["train_labels"][
(clustering.labels_ != -1)
]
self.data_dictionary["train_weights"] = self.data_dictionary["train_weights"][
(clustering.labels_ != -1)
]
logger.info(
f"DBSCAN tossed {dropped_points.sum()}"
f" train points from {len(clustering.labels_)}"
)
return
2022-08-18 17:15:29 +00:00
def compute_inlier_metric(self, set_='train') -> None:
2022-08-18 12:44:49 +00:00
"""
2022-08-18 17:15:29 +00:00
Compute inlier metric from backwards distance distributions.
This metric defines how well features from a timepoint fit
2022-08-18 12:44:49 +00:00
into previous timepoints.
"""
def normalise(dataframe: DataFrame, key: str) -> DataFrame:
if set_ == 'train':
min_value = dataframe.min()
max_value = dataframe.max()
self.data[f'{key}_min'] = min_value
self.data[f'{key}_max'] = max_value
else:
min_value = self.data[f'{key}_min']
max_value = self.data[f'{key}_max']
return (dataframe - min_value) / (max_value - min_value)
2022-08-18 17:15:29 +00:00
no_prev_pts = self.freqai_config["feature_parameters"]["inlier_metric_window"]
if set_ == 'train':
compute_df = copy.deepcopy(self.data_dictionary['train_features'])
elif set_ == 'test':
compute_df = copy.deepcopy(self.data_dictionary['test_features'])
else:
compute_df = copy.deepcopy(self.data_dictionary['prediction_features'])
compute_df_reindexed = compute_df.reindex(
index=np.flip(compute_df.index)
2022-08-18 12:44:49 +00:00
)
pairwise = pd.DataFrame(
np.triu(
2022-08-18 17:15:29 +00:00
pairwise_distances(compute_df_reindexed, n_jobs=self.thread_count)
2022-08-18 12:44:49 +00:00
),
2022-08-18 17:15:29 +00:00
columns=compute_df_reindexed.index,
index=compute_df_reindexed.index
2022-08-18 12:44:49 +00:00
)
pairwise = pairwise.round(5)
column_labels = [
2022-08-18 17:15:29 +00:00
'{}{}'.format('d', i) for i in range(1, no_prev_pts + 1)
2022-08-18 12:44:49 +00:00
]
distances = pd.DataFrame(
2022-08-18 17:15:29 +00:00
columns=column_labels, index=compute_df.index
2022-08-18 12:44:49 +00:00
)
2022-08-18 17:15:29 +00:00
for index in compute_df.index[no_prev_pts:]:
2022-08-18 12:44:49 +00:00
current_row = pairwise.loc[[index]]
current_row_no_zeros = current_row.loc[
2022-08-18 17:15:29 +00:00
:, (current_row != 0).any(axis=0)
2022-08-18 12:44:49 +00:00
]
distances.loc[[index]] = current_row_no_zeros.iloc[
2022-08-18 17:15:29 +00:00
:, :no_prev_pts
2022-08-18 12:44:49 +00:00
]
distances = distances.replace([np.inf, -np.inf], np.nan)
drop_index = pd.isnull(distances).any(axis=1)
2022-08-18 17:15:29 +00:00
distances = distances[drop_index == 0]
2022-08-18 12:44:49 +00:00
inliers = pd.DataFrame(index=distances.index)
for key in distances.keys():
current_distances = distances[key].dropna()
current_distances = normalise(current_distances, key)
if set_ == 'train':
fit_params = stats.weibull_min.fit(current_distances)
self.data[f'{key}_fit_params'] = fit_params
else:
fit_params = self.data[f'{key}_fit_params']
quantiles = stats.weibull_min.cdf(current_distances, *fit_params)
2022-08-30 18:41:37 +00:00
2022-08-18 12:44:49 +00:00
df_inlier = pd.DataFrame(
2022-08-30 18:41:37 +00:00
{key: quantiles}, index=distances.index
2022-08-18 12:44:49 +00:00
)
inliers = pd.concat(
[inliers, df_inlier], axis=1
)
2022-08-18 17:15:29 +00:00
inlier_metric = pd.DataFrame(
data=inliers.sum(axis=1) / no_prev_pts,
columns=['%-inlier_metric'],
2022-08-18 17:15:29 +00:00
index=compute_df.index
2022-08-18 12:44:49 +00:00
)
inlier_metric = (2 * (inlier_metric - inlier_metric.min()) /
(inlier_metric.max() - inlier_metric.min()) - 1)
2022-08-18 17:15:29 +00:00
if set_ in ('train', 'test'):
inlier_metric = inlier_metric.iloc[no_prev_pts:]
compute_df = compute_df.iloc[no_prev_pts:]
self.remove_beginning_points_from_data_dict(set_, no_prev_pts)
self.data_dictionary[f'{set_}_features'] = pd.concat(
[compute_df, inlier_metric], axis=1)
else:
self.data_dictionary['prediction_features'] = pd.concat(
[compute_df, inlier_metric], axis=1)
self.data_dictionary['prediction_features'].fillna(0, inplace=True)
2022-08-18 12:44:49 +00:00
logger.info('Inlier metric computed and added to features.')
2022-08-18 12:44:49 +00:00
return None
2022-08-18 17:15:29 +00:00
def remove_beginning_points_from_data_dict(self, set_='train', no_prev_pts: int = 10):
features = self.data_dictionary[f'{set_}_features']
weights = self.data_dictionary[f'{set_}_weights']
labels = self.data_dictionary[f'{set_}_labels']
self.data_dictionary[f'{set_}_weights'] = weights[no_prev_pts:]
self.data_dictionary[f'{set_}_features'] = features.iloc[no_prev_pts:]
self.data_dictionary[f'{set_}_labels'] = labels.iloc[no_prev_pts:]
2022-08-19 16:35:24 +00:00
def add_noise_to_training_features(self) -> None:
"""
Add noise to train features to reduce the risk of overfitting.
"""
mu = 0 # no shift
sigma = self.freqai_config["feature_parameters"]["noise_standard_deviation"]
compute_df = self.data_dictionary['train_features']
noise = np.random.normal(mu, sigma, [compute_df.shape[0], compute_df.shape[1]])
self.data_dictionary['train_features'] += noise
return
2022-07-02 16:09:38 +00:00
def find_features(self, dataframe: DataFrame) -> None:
"""
Find features in the strategy provided dataframe
2022-08-06 12:55:46 +00:00
:param dataframe: DataFrame = strategy provided dataframe
:return:
features: list = the features to be used for training/prediction
"""
column_names = dataframe.columns
2022-07-03 08:59:38 +00:00
features = [c for c in column_names if "%" in c]
2022-09-30 22:22:05 +00:00
if not features:
raise OperationalException("Could not find any features!")
2022-07-02 16:09:38 +00:00
self.training_features_list = features
def find_labels(self, dataframe: DataFrame) -> None:
column_names = dataframe.columns
labels = [c for c in column_names if "&" in c]
2022-07-02 16:09:38 +00:00
self.label_list = labels
def check_if_pred_in_training_spaces(self) -> None:
"""
Compares the distance from each prediction point to each training data
point. It uses this information to estimate a Dissimilarity Index (DI)
and avoid making predictions on any points that are too far away
from the training data set.
"""
distance = pairwise_distances(
self.data_dictionary["train_features"],
self.data_dictionary["prediction_features"],
n_jobs=self.thread_count,
)
self.DI_values = distance.min(axis=0) / self.data["avg_mean_dist"]
do_predict = np.where(
self.DI_values < self.freqai_config["feature_parameters"]["DI_threshold"],
1,
0,
)
if (len(do_predict) - do_predict.sum()) > 0:
logger.info(
f"DI tossed {len(do_predict) - do_predict.sum()} predictions for "
2022-08-30 10:54:39 +00:00
"being too far from training data."
)
self.do_predict += do_predict
self.do_predict -= 1
2022-05-06 14:20:52 +00:00
def set_weights_higher_recent(self, num_weights: int) -> npt.ArrayLike:
"""
Set weights so that recent data is more heavily weighted during
training than older data.
"""
wfactor = self.config["freqai"]["feature_parameters"]["weight_factor"]
weights = np.exp(-np.arange(num_weights) / (wfactor * num_weights))[::-1]
return weights
2022-08-31 14:23:48 +00:00
def get_predictions_to_append(self, predictions: DataFrame,
do_predict: npt.ArrayLike) -> DataFrame:
"""
2022-08-31 14:23:48 +00:00
Get backtest prediction from current backtest period
"""
append_df = DataFrame()
for label in predictions.columns:
append_df[label] = predictions[label]
if append_df[label].dtype == object:
continue
if "labels_mean" in self.data:
append_df[f"{label}_mean"] = self.data["labels_mean"][label]
if "labels_std" in self.data:
append_df[f"{label}_std"] = self.data["labels_std"][label]
for extra_col in self.data["extra_returns_per_train"]:
2022-10-30 08:19:59 +00:00
append_df[f"{extra_col}"] = self.data["extra_returns_per_train"][extra_col]
append_df["do_predict"] = do_predict
if self.freqai_config["feature_parameters"].get("DI_threshold", 0) > 0:
append_df["DI_values"] = self.DI_values
2022-08-31 14:23:48 +00:00
return append_df
def append_predictions(self, append_df: DataFrame) -> None:
"""
Append backtest prediction from current backtest period to all previous periods
"""
if self.full_df.empty:
self.full_df = append_df
else:
self.full_df = pd.concat([self.full_df, append_df], axis=0)
def fill_predictions(self, dataframe):
"""
Back fill values to before the backtesting range so that the dataframe matches size
when it goes back to the strategy. These rows are not included in the backtest.
"""
len_filler = len(dataframe) - len(self.full_df.index) # startup_candle_count
filler_df = pd.DataFrame(
np.zeros((len_filler, len(self.full_df.columns))), columns=self.full_df.columns
)
self.full_df = pd.concat([filler_df, self.full_df], axis=0, ignore_index=True)
to_keep = [col for col in dataframe.columns if not col.startswith("&")]
self.return_dataframe = pd.concat([dataframe[to_keep], self.full_df], axis=1)
self.full_df = DataFrame()
return
def create_fulltimerange(self, backtest_tr: str, backtest_period_days: int) -> str:
2022-07-19 14:16:44 +00:00
if not isinstance(backtest_period_days, int):
raise OperationalException("backtest_period_days must be an integer")
2022-07-19 14:16:44 +00:00
if backtest_period_days < 0:
raise OperationalException("backtest_period_days must be positive")
2022-07-19 14:16:44 +00:00
backtest_timerange = TimeRange.parse_timerange(backtest_tr)
2022-05-17 17:50:06 +00:00
if backtest_timerange.stopts == 0:
# typically open ended time ranges do work, however, there are some edge cases where
2022-08-06 12:55:46 +00:00
# it does not. accommodating these kinds of edge cases just to allow open-ended
# timerange is not high enough priority to warrant the effort. It is safer for now
# to simply ask user to add their end date
raise OperationalException("FreqAI backtesting does not allow open ended timeranges. "
"Please indicate the end date of your desired backtesting. "
"timerange.")
# backtest_timerange.stopts = int(
# datetime.now(tz=timezone.utc).timestamp()
# )
2022-05-17 17:50:06 +00:00
backtest_timerange.startts = (
backtest_timerange.startts - backtest_period_days * SECONDS_IN_DAY
)
start = datetime.fromtimestamp(backtest_timerange.startts, tz=timezone.utc)
stop = datetime.fromtimestamp(backtest_timerange.stopts, tz=timezone.utc)
full_timerange = start.strftime("%Y%m%d") + "-" + stop.strftime("%Y%m%d")
config_path = Path(self.config["config_files"][0])
if not self.full_path.is_dir():
self.full_path.mkdir(parents=True, exist_ok=True)
shutil.copy(
config_path.resolve(),
Path(self.full_path / config_path.parts[-1]),
)
return full_timerange
def check_if_model_expired(self, trained_timestamp: int) -> bool:
"""
A model age checker to determine if the model is trustworthy based on user defined
`expiration_hours` in the configuration file.
2022-08-06 12:55:46 +00:00
:param trained_timestamp: int = The time of training for the most recent model.
:return:
bool = If the model is expired or not.
"""
time = datetime.now(tz=timezone.utc).timestamp()
elapsed_time = (time - trained_timestamp) / 3600 # hours
2022-07-03 08:59:38 +00:00
max_time = self.freqai_config.get("expiration_hours", 0)
if max_time > 0:
return elapsed_time > max_time
else:
return False
2022-07-03 08:59:38 +00:00
def check_if_new_training_required(
self, trained_timestamp: int
) -> Tuple[bool, TimeRange, TimeRange]:
time = datetime.now(tz=timezone.utc).timestamp()
2022-05-28 09:38:57 +00:00
trained_timerange = TimeRange()
data_load_timerange = TimeRange()
timeframes = self.freqai_config["feature_parameters"].get("include_timeframes")
max_tf_seconds = 0
for tf in timeframes:
secs = timeframe_to_seconds(tf)
if secs > max_tf_seconds:
max_tf_seconds = secs
# We notice that users like to use exotic indicators where
# they do not know the required timeperiod. Here we include a factor
# of safety by multiplying the user considered "max" by 2.
max_period = self.config.get('startup_candle_count', 20) * 2
additional_seconds = max_period * max_tf_seconds
if trained_timestamp != 0:
elapsed_time = (time - trained_timestamp) / SECONDS_IN_HOUR
retrain = elapsed_time > self.freqai_config.get("live_retrain_hours", 0)
if retrain:
2022-07-03 08:59:38 +00:00
trained_timerange.startts = int(
time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
2022-07-03 08:59:38 +00:00
)
trained_timerange.stopts = int(time)
# we want to load/populate indicators on more data than we plan to train on so
# because most of the indicators have a rolling timeperiod, and are thus NaNs
# unless they have data further back in time before the start of the train period
2022-07-03 08:59:38 +00:00
data_load_timerange.startts = int(
time
- self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
2022-07-03 08:59:38 +00:00
- additional_seconds
)
data_load_timerange.stopts = int(time)
else: # user passed no live_trained_timerange in config
2022-07-03 08:59:38 +00:00
trained_timerange.startts = int(
2022-08-06 12:55:46 +00:00
time - self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
2022-07-03 08:59:38 +00:00
)
trained_timerange.stopts = int(time)
2022-07-03 08:59:38 +00:00
data_load_timerange.startts = int(
time
- self.freqai_config.get("train_period_days", 0) * SECONDS_IN_DAY
2022-07-03 08:59:38 +00:00
- additional_seconds
)
data_load_timerange.stopts = int(time)
retrain = True
return retrain, trained_timerange, data_load_timerange
def set_new_model_names(self, pair: str, timestamp_id: int):
coin, _ = pair.split("/")
2022-07-03 08:59:38 +00:00
self.data_path = Path(
self.full_path
/ f"sub-train-{pair.split('/')[0]}_{timestamp_id}"
2022-07-03 08:59:38 +00:00
)
self.model_filename = f"cb_{coin.lower()}_{timestamp_id}"
def set_all_pairs(self) -> None:
self.all_pairs = copy.deepcopy(
self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
)
2022-07-03 08:59:38 +00:00
for pair in self.config.get("exchange", "").get("pair_whitelist"):
if pair not in self.all_pairs:
self.all_pairs.append(pair)
def extract_corr_pair_columns_from_populated_indicators(
self,
dataframe: DataFrame
) -> Dict[str, DataFrame]:
"""
Find the columns of the dataframe corresponding to the corr_pairlist, save them
in a dictionary to be reused and attached to other pairs.
:param dataframe: fully populated dataframe (current pair + corr_pairs)
:return: corr_dataframes, dictionary of dataframes to be attached
to other pairs in same candle.
"""
corr_dataframes: Dict[str, DataFrame] = {}
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
for pair in pairs:
2022-11-03 20:03:48 +00:00
pair = pair.replace(':', '') # lightgbm doesnt like colons
valid_strs = [f"%-{pair}", f"%{pair}", f"%_{pair}"]
pair_cols = [col for col in dataframe.columns if
any(substr in col for substr in valid_strs)]
2022-11-02 19:20:35 +00:00
if pair_cols:
pair_cols.insert(0, 'date')
2022-11-03 20:03:48 +00:00
corr_dataframes[pair] = dataframe.filter(pair_cols, axis=1)
return corr_dataframes
def attach_corr_pair_columns(self, dataframe: DataFrame,
corr_dataframes: Dict[str, DataFrame],
current_pair: str) -> DataFrame:
"""
Attach the existing corr_pair dataframes to the current pair dataframe before training
:param dataframe: current pair strategy dataframe, indicators populated already
:param corr_dataframes: dictionary of saved dataframes from earlier in the same candle
:param current_pair: current pair to which we will attach corr pair dataframe
:return:
:dataframe: current pair dataframe of populated indicators, concatenated with corr_pairs
ready for training
"""
pairs = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
current_pair = current_pair.replace(':', '')
for pair in pairs:
pair = pair.replace(':', '') # lightgbm doesnt work with colons
if current_pair != pair:
dataframe = dataframe.merge(corr_dataframes[pair], how='left', on='date')
return dataframe
2022-07-03 08:59:38 +00:00
def use_strategy_to_populate_indicators(
self,
strategy: IStrategy,
corr_dataframes: dict = {},
base_dataframes: dict = {},
pair: str = "",
prediction_dataframe: DataFrame = pd.DataFrame(),
do_corr_pairs: bool = True,
2022-07-03 08:59:38 +00:00
) -> DataFrame:
"""
2022-10-10 12:15:30 +00:00
Use the user defined strategy for populating indicators during retrain
:param strategy: IStrategy = user defined strategy object
:param corr_dataframes: dict = dict containing the informative pair dataframes
(for user defined timeframes)
:param base_dataframes: dict = dict containing the current pair dataframes
(for user defined timeframes)
:param metadata: dict = strategy furnished pair metadata
:return:
dataframe: DataFrame = dataframe containing populated indicators
"""
# for prediction dataframe creation, we let dataprovider handle everything in the strategy
# so we create empty dictionaries, which allows us to pass None to
# `populate_any_indicators()`. Signaling we want the dp to give us the live dataframe.
tfs: List[str] = self.freqai_config["feature_parameters"].get("include_timeframes")
pairs: List[str] = self.freqai_config["feature_parameters"].get("include_corr_pairlist", [])
if not prediction_dataframe.empty:
dataframe = prediction_dataframe.copy()
for tf in tfs:
base_dataframes[tf] = None
for p in pairs:
if p not in corr_dataframes:
corr_dataframes[p] = {}
corr_dataframes[p][tf] = None
else:
dataframe = base_dataframes[self.config["timeframe"]].copy()
sgi = False
for tf in tfs:
if tf == tfs[-1]:
sgi = True # doing this last allows user to use all tf raw prices in labels
dataframe = strategy.populate_any_indicators(
pair,
dataframe.copy(),
tf,
informative=base_dataframes[tf],
set_generalized_indicators=sgi
)
# ensure corr pairs are always last
for corr_pair in pairs:
if pair == corr_pair:
continue # dont repeat anything from whitelist
for tf in tfs:
if pairs and do_corr_pairs:
dataframe = strategy.populate_any_indicators(
corr_pair,
dataframe.copy(),
tf,
informative=corr_dataframes[corr_pair][tf]
)
self.get_unique_classes_from_labels(dataframe)
dataframe = self.remove_special_chars_from_feature_names(dataframe)
return dataframe
def fit_labels(self) -> None:
"""
Fit the labels with a gaussian distribution
"""
import scipy as spy
2022-07-03 08:59:38 +00:00
self.data["labels_mean"], self.data["labels_std"] = {}, {}
for label in self.data_dictionary["train_labels"].columns:
if self.data_dictionary["train_labels"][label].dtype == object:
continue
2022-07-02 16:09:38 +00:00
f = spy.stats.norm.fit(self.data_dictionary["train_labels"][label])
self.data["labels_mean"][label], self.data["labels_std"][label] = f[0], f[1]
2022-08-10 13:16:50 +00:00
# incase targets are classifications
for label in self.unique_class_list:
self.data["labels_mean"][label], self.data["labels_std"][label] = 0, 0
return
def remove_features_from_df(self, dataframe: DataFrame) -> DataFrame:
"""
Remove the features from the dataframe before returning it to strategy. This keeps it
compact for Frequi purposes.
"""
to_keep = [
col for col in dataframe.columns if not col.startswith("%") or col.startswith("%%")
]
return dataframe[to_keep]
def get_unique_classes_from_labels(self, dataframe: DataFrame) -> None:
# self.find_features(dataframe)
self.find_labels(dataframe)
for key in self.label_list:
if dataframe[key].dtype == object:
self.unique_classes[key] = dataframe[key].dropna().unique()
2022-08-10 13:16:50 +00:00
if self.unique_classes:
for label in self.unique_classes:
self.unique_class_list += list(self.unique_classes[label])
2022-08-31 14:23:48 +00:00
def save_backtesting_prediction(
self, append_df: DataFrame
2022-08-31 14:23:48 +00:00
) -> None:
"""
Save prediction dataframe from backtesting to h5 file format
:param append_df: dataframe for backtesting period
2022-08-31 14:23:48 +00:00
"""
full_predictions_folder = Path(self.full_path / self.backtest_predictions_folder)
if not full_predictions_folder.is_dir():
full_predictions_folder.mkdir(parents=True, exist_ok=True)
2022-08-31 14:23:48 +00:00
append_df.to_hdf(self.backtesting_results_path, key='append_df', mode='w')
def get_backtesting_prediction(
self
) -> DataFrame:
2022-08-31 14:23:48 +00:00
"""
Get prediction dataframe from h5 file format
2022-08-31 14:23:48 +00:00
"""
append_df = pd.read_hdf(self.backtesting_results_path)
2022-08-31 14:23:48 +00:00
return append_df
def check_if_backtest_prediction_is_valid(
self,
len_backtest_df: int
) -> bool:
"""
Check if a backtesting prediction already exists and if the predictions
to append have the same size as the backtesting dataframe slice
:param length_backtesting_dataframe: Length of backtesting dataframe slice
:return:
:boolean: whether the prediction file is valid.
"""
path_to_predictionfile = Path(self.full_path /
self.backtest_predictions_folder /
f"{self.model_filename}_prediction.h5")
self.backtesting_results_path = path_to_predictionfile
file_exists = path_to_predictionfile.is_file()
if file_exists:
append_df = self.get_backtesting_prediction()
if len(append_df) == len_backtest_df:
logger.info(f"Found backtesting prediction file at {path_to_predictionfile}")
return True
else:
logger.info("A new backtesting prediction file is required. "
2022-11-03 18:08:33 +00:00
"(Number of predictions is different from dataframe length).")
return False
else:
logger.info(
f"Could not find backtesting prediction file at {path_to_predictionfile}"
)
return False
2022-09-24 15:28:52 +00:00
def set_timerange_from_ready_models(self):
backtesting_timerange, \
assets_end_dates = (
self.get_timerange_and_assets_end_dates_from_ready_models(self.full_path))
self.backtest_live_models_data = {
"backtesting_timerange": backtesting_timerange,
"assets_end_dates": assets_end_dates
}
return
def get_full_models_path(self, config: Config) -> Path:
"""
Returns default FreqAI model path
:param config: Configuration dictionary
"""
freqai_config: Dict[str, Any] = config["freqai"]
return Path(
config["user_data_dir"] / "models" / str(freqai_config.get("identifier"))
)
def get_timerange_and_assets_end_dates_from_ready_models(
self, models_path: Path) -> Tuple[TimeRange, Dict[str, Any]]:
"""
Returns timerange information based on a FreqAI model directory
:param models_path: FreqAI model path
:return: a Tuple with (Timerange calculated from directory and
a Dict with pair and model end training dates info)
"""
all_models_end_dates = []
assets_end_dates: Dict[str, Any] = self.get_assets_timestamps_training_from_ready_models(
models_path)
for key in assets_end_dates:
for model_end_date in assets_end_dates[key]:
if model_end_date not in all_models_end_dates:
all_models_end_dates.append(model_end_date)
if len(all_models_end_dates) == 0:
raise OperationalException(
'At least 1 saved model is required to '
'run backtest with the freqai-backtest-live-models option'
)
if len(all_models_end_dates) == 1:
logger.warning(
"Only 1 model was found. Backtesting will run with the "
"timerange from the end of the training date to the current date"
)
finish_timestamp = int(datetime.now(tz=timezone.utc).timestamp())
if len(all_models_end_dates) > 1:
# After last model end date, use the same period from previous model
# to finish the backtest
all_models_end_dates.sort(reverse=True)
finish_timestamp = all_models_end_dates[0] + \
(all_models_end_dates[0] - all_models_end_dates[1])
all_models_end_dates.append(finish_timestamp)
all_models_end_dates.sort()
start_date = (datetime(*datetime.fromtimestamp(min(all_models_end_dates),
timezone.utc).timetuple()[:3], tzinfo=timezone.utc))
end_date = (datetime(*datetime.fromtimestamp(max(all_models_end_dates),
timezone.utc).timetuple()[:3], tzinfo=timezone.utc))
# add 1 day to string timerange to ensure BT module will load all dataframe data
end_date = end_date + timedelta(days=1)
backtesting_timerange = TimeRange(
'date', 'date', int(start_date.timestamp()), int(end_date.timestamp())
)
return backtesting_timerange, assets_end_dates
def get_assets_timestamps_training_from_ready_models(
self, models_path: Path) -> Dict[str, Any]:
"""
Scan the models path and returns all assets end training dates (timestamp)
:param models_path: FreqAI model path
:return: a Dict with asset and model end training dates info
"""
assets_end_dates: Dict[str, Any] = {}
if not models_path.is_dir():
raise OperationalException(
'Model folders not found. Saved models are required '
'to run backtest with the freqai-backtest-live-models option'
)
for model_dir in models_path.iterdir():
if str(model_dir.name).startswith("sub-train"):
model_end_date = int(model_dir.name.split("_")[1])
asset = model_dir.name.split("_")[0].replace("sub-train-", "")
model_file_name = (
f"cb_{str(model_dir.name).replace('sub-train-', '').lower()}"
"_model.joblib"
)
model_path_file = Path(model_dir / model_file_name)
if model_path_file.is_file():
if asset not in assets_end_dates:
assets_end_dates[asset] = []
assets_end_dates[asset].append(model_end_date)
return assets_end_dates
def remove_special_chars_from_feature_names(self, dataframe: pd.DataFrame) -> pd.DataFrame:
"""
Remove all special characters from feature strings (:)
:param dataframe: the dataframe that just finished indicator population. (unfiltered)
:return: dataframe with cleaned featrue names
"""
spec_chars = [':']
for c in spec_chars:
dataframe.columns = dataframe.columns.str.replace(c, "")
2022-11-03 18:13:24 +00:00
return dataframe