merging datarehaul into scanning branch

This commit is contained in:
robcaulk 2022-06-07 22:14:01 -06:00
parent c981ad4608
commit 4d472a0ea1
3 changed files with 94 additions and 74 deletions

View File

@ -71,7 +71,7 @@ class FreqaiDataKitchen:
self.data_drawer = data_drawer
def set_paths(self, metadata: dict, trained_timestamp: int = None,) -> None:
def set_paths(self, pair: str, trained_timestamp: int = None,) -> None:
"""
Set the paths to the data for the present coin/botloop
:params:
@ -83,7 +83,7 @@ class FreqaiDataKitchen:
str(self.freqai_config.get('identifier')))
self.data_path = Path(self.full_path / str("sub-train" + "-" +
metadata['pair'].split("/")[0] +
pair.split("/")[0] +
str(trained_timestamp)))
return
@ -796,12 +796,12 @@ class FreqaiDataKitchen:
return retrain, trained_timerange, data_load_timerange
def set_new_model_names(self, metadata: dict, trained_timerange: TimeRange):
def set_new_model_names(self, pair: str, trained_timerange: TimeRange):
coin, _ = metadata['pair'].split("/")
coin, _ = pair.split("/")
# set the new data_path
self.data_path = Path(self.full_path / str("sub-train" + "-" +
metadata['pair'].split("/")[0] +
pair.split("/")[0] +
str(int(trained_timerange.stopts))))
self.model_filename = "cb_" + coin.lower() + "_" + str(int(trained_timerange.stopts))
@ -918,7 +918,7 @@ class FreqaiDataKitchen:
'trading_mode', 'spot'))
def get_base_and_corr_dataframes(self, timerange: TimeRange,
metadata: dict) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
pair: str) -> Tuple[Dict[Any, Any], Dict[Any, Any]]:
"""
Searches through our historic_data in memory and returns the dataframes relevant
to the present pair.
@ -927,6 +927,7 @@ class FreqaiDataKitchen:
for training according to user defined train_period
metadata: dict = strategy furnished pair metadata
"""
with self.data_drawer.history_lock:
corr_dataframes: Dict[Any, Any] = {}
base_dataframes: Dict[Any, Any] = {}
@ -940,7 +941,7 @@ class FreqaiDataKitchen:
)
if pairs:
for p in pairs:
if metadata['pair'] in p:
if pair in p:
continue # dont repeat anything from whitelist
if p not in corr_dataframes:
corr_dataframes[p] = {}
@ -984,7 +985,7 @@ class FreqaiDataKitchen:
def use_strategy_to_populate_indicators(self, strategy: IStrategy,
corr_dataframes: dict,
base_dataframes: dict,
metadata: dict) -> DataFrame:
pair: str) -> DataFrame:
"""
Use the user defined strategy for populating indicators during
retrain
@ -1003,19 +1004,19 @@ class FreqaiDataKitchen:
for tf in self.freqai_config.get("timeframes"):
dataframe = strategy.populate_any_indicators(
metadata,
metadata['pair'],
pair,
pair,
dataframe.copy(),
tf,
base_dataframes[tf],
coin=metadata['pair'].split("/")[0] + "-"
coin=pair.split("/")[0] + "-"
)
if pairs:
for i in pairs:
if metadata['pair'] in i:
if pair in i:
continue # dont repeat anything from whitelist
dataframe = strategy.populate_any_indicators(
metadata,
pair,
i,
dataframe.copy(),
tf,

View File

@ -63,6 +63,8 @@ class IFreqaiModel(ABC):
self.lock = threading.Lock()
self.follow_mode = self.freqai_info.get('follow_mode', False)
self.identifier = self.freqai_info.get('identifier', 'no_id_provided')
self.scanning = False
self.ready_to_scan = False
def assert_config(self, config: Dict[str, Any]) -> None:
@ -91,17 +93,9 @@ class IFreqaiModel(ABC):
# and we keep the flag self.training_on_separate_threaad in the current object to help
# determine what the current pair will do
if self.live:
if (not self.training_on_separate_thread and
self.data_drawer.pair_dict[metadata['pair']]['priority'] == 1):
self.dh = FreqaiDataKitchen(self.config, self.data_drawer,
self.live, metadata["pair"])
dh = self.start_live(dataframe, metadata, strategy, self.dh, trainable=True)
else:
# we will have at max 2 separate instances of the kitchen at once.
self.dh_fg = FreqaiDataKitchen(self.config, self.data_drawer,
self.live, metadata["pair"])
dh = self.start_live(dataframe, metadata, strategy, self.dh_fg, trainable=False)
self.dh = FreqaiDataKitchen(self.config, self.data_drawer,
self.live, metadata["pair"])
dh = self.start_live(dataframe, metadata, strategy, self.dh)
# For backtesting, each pair enters and then gets trained for each window along the
# sliding window defined by "train_period" (training window) and "backtest_period"
@ -114,8 +108,36 @@ class IFreqaiModel(ABC):
dh = self.start_backtesting(dataframe, metadata, self.dh)
return self.return_values(dataframe, dh)
# return (dh.full_predictions, dh.full_do_predict,
# dh.full_target_mean, dh.full_target_std)
@threaded
def start_scanning(self, strategy: IStrategy) -> None:
while 1:
for pair in self.config.get('exchange', {}).get('pair_whitelist'):
if self.data_drawer.pair_dict[pair]['priority'] != 1:
continue
dh = FreqaiDataKitchen(self.config, self.data_drawer,
self.live, pair)
(model_filename,
trained_timestamp,
_, _) = self.data_drawer.get_pair_dict_info(pair)
file_exists = False
# dh.set_paths(pair, trained_timestamp)
file_exists = self.model_exists(pair,
dh,
trained_timestamp=trained_timestamp,
model_filename=model_filename)
(self.retrain,
new_trained_timerange,
data_load_timerange) = dh.check_if_new_training_required(trained_timestamp)
dh.set_paths(pair, new_trained_timerange.stopts)
if self.retrain or not file_exists:
self.train_model_in_series(new_trained_timerange, pair,
strategy, dh, data_load_timerange)
def start_backtesting(self, dataframe: DataFrame, metadata: dict,
dh: FreqaiDataKitchen) -> FreqaiDataKitchen:
@ -142,7 +164,7 @@ class IFreqaiModel(ABC):
for tr_train, tr_backtest in zip(
dh.training_timeranges, dh.backtesting_timeranges
):
(_, _, _, _) = self.data_drawer.get_pair_dict_info(metadata)
(_, _, _, _) = self.data_drawer.get_pair_dict_info(metadata['pair'])
gc.collect()
dh.data = {} # clean the pair specific data between training window sliding
self.training_timerange = tr_train
@ -163,7 +185,7 @@ class IFreqaiModel(ABC):
str(int(trained_timestamp.stopts))))
if not self.model_exists(metadata["pair"], dh,
trained_timestamp=trained_timestamp.stopts):
self.model = self.train(dataframe_train, metadata, dh)
self.model = self.train(dataframe_train, metadata['pair'], dh)
self.data_drawer.pair_dict[metadata['pair']][
'trained_timestamp'] = trained_timestamp.stopts
dh.set_new_model_names(metadata, trained_timestamp)
@ -184,8 +206,7 @@ class IFreqaiModel(ABC):
return dh
def start_live(self, dataframe: DataFrame, metadata: dict,
strategy: IStrategy, dh: FreqaiDataKitchen,
trainable: bool) -> FreqaiDataKitchen:
strategy: IStrategy, dh: FreqaiDataKitchen) -> FreqaiDataKitchen:
"""
The main broad execution for dry/live. This function will check if a retraining should be
performed, and if so, retrain and reset the model.
@ -203,10 +224,10 @@ class IFreqaiModel(ABC):
self.data_drawer.update_follower_metadata()
# get the model metadata associated with the current pair
(model_filename,
(_,
trained_timestamp,
coin_first,
return_null_array) = self.data_drawer.get_pair_dict_info(metadata)
return_null_array) = self.data_drawer.get_pair_dict_info(metadata['pair'])
# if the metadata doesnt exist, the follower returns null arrays to strategy
if self.follow_mode and return_null_array:
@ -222,20 +243,18 @@ class IFreqaiModel(ABC):
# if trainable, check if model needs training, if so compute new timerange,
# then save model and metadata.
# if not trainable, load existing data
if (trainable or coin_first) and not self.follow_mode:
file_exists = False
if trained_timestamp != 0: # historical model available
dh.set_paths(metadata, trained_timestamp)
file_exists = self.model_exists(metadata['pair'],
dh,
trained_timestamp=trained_timestamp,
model_filename=model_filename)
if not self.follow_mode:
# if trained_timestamp != 0: # historical model available
# dh.set_paths(metadata['pair'], trained_timestamp)
# # file_exists = self.model_exists(metadata['pair'],
# # dh,
# # trained_timestamp=trained_timestamp,
# # model_filename=model_filename)
(self.retrain,
new_trained_timerange,
data_load_timerange) = dh.check_if_new_training_required(trained_timestamp)
dh.set_paths(metadata, new_trained_timerange.stopts)
dh.set_paths(metadata['pair'], new_trained_timerange.stopts)
# download candle history if it is not already in memory
if not self.data_drawer.historic_data:
@ -246,20 +265,17 @@ class IFreqaiModel(ABC):
dh.load_all_pair_histories(data_load_timerange)
# train the model on the trained timerange
if self.retrain or not file_exists:
if coin_first:
self.train_model_in_series(new_trained_timerange, metadata,
strategy, dh, data_load_timerange)
else:
self.training_on_separate_thread = True # acts like a lock
self.retrain_model_on_separate_thread(new_trained_timerange,
metadata, strategy,
dh, data_load_timerange)
if coin_first and not self.scanning:
self.train_model_in_series(new_trained_timerange, metadata['pair'],
strategy, dh, data_load_timerange)
elif not coin_first and not self.scanning:
self.scanning = True
self.start_scanning(strategy)
elif not trainable and not self.follow_mode:
logger.info(f'{metadata["pair"]} holds spot '
f'{self.data_drawer.pair_dict[metadata["pair"]]["priority"]} '
'in training queue')
# elif not trainable and not self.follow_mode:
# logger.info(f'{metadata["pair"]} holds spot '
# f'{self.data_drawer.pair_dict[metadata["pair"]]["priority"]} '
# 'in training queue')
elif self.follow_mode:
dh.set_paths(metadata, trained_timestamp)
logger.info('FreqAI instance set to follow_mode, finding existing pair'
@ -382,7 +398,7 @@ class IFreqaiModel(ABC):
str(self.freqai_info.get('identifier')))
@threaded
def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, metadata: dict,
def retrain_model_on_separate_thread(self, new_trained_timerange: TimeRange, pair: str,
strategy: IStrategy, dh: FreqaiDataKitchen,
data_load_timerange: TimeRange):
"""
@ -403,14 +419,14 @@ class IFreqaiModel(ABC):
# metadata)
corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange,
metadata)
pair)
# protecting from common benign errors associated with grabbing new data from exchange:
try:
unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
metadata)
pair)
unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
except Exception as err:
@ -420,23 +436,23 @@ class IFreqaiModel(ABC):
return
try:
model = self.train(unfiltered_dataframe, metadata, dh)
model = self.train(unfiltered_dataframe, pair, dh)
except ValueError:
logger.warning('Value error encountered during training')
self.training_on_separate_thread = False
self.retrain = False
return
self.data_drawer.pair_dict[metadata['pair']][
self.data_drawer.pair_dict[pair][
'trained_timestamp'] = new_trained_timerange.stopts
dh.set_new_model_names(metadata, new_trained_timerange)
dh.set_new_model_names(pair, new_trained_timerange)
# logger.info('Training queue'
# f'{sorted(self.data_drawer.pair_dict.items(), key=lambda item: item[1])}')
if self.data_drawer.pair_dict[metadata['pair']]['priority'] == 1:
if self.data_drawer.pair_dict[pair]['priority'] == 1:
with self.lock:
self.data_drawer.pair_to_end_of_training_queue(metadata['pair'])
dh.save_data(model, coin=metadata['pair'])
self.data_drawer.pair_to_end_of_training_queue(pair)
dh.save_data(model, coin=pair)
self.training_on_separate_thread = False
self.retrain = False
@ -446,7 +462,7 @@ class IFreqaiModel(ABC):
return
def train_model_in_series(self, new_trained_timerange: TimeRange, metadata: dict,
def train_model_in_series(self, new_trained_timerange: TimeRange, pair: str,
strategy: IStrategy, dh: FreqaiDataKitchen,
data_load_timerange: TimeRange):
"""
@ -464,29 +480,32 @@ class IFreqaiModel(ABC):
# corr_dataframes, base_dataframes = dh.load_pairs_histories(data_load_timerange,
# metadata)
corr_dataframes, base_dataframes = dh.get_base_and_corr_dataframes(data_load_timerange,
metadata)
pair)
unfiltered_dataframe = dh.use_strategy_to_populate_indicators(strategy,
corr_dataframes,
base_dataframes,
metadata)
pair)
unfiltered_dataframe = dh.slice_dataframe(new_trained_timerange, unfiltered_dataframe)
model = self.train(unfiltered_dataframe, metadata, dh)
model = self.train(unfiltered_dataframe, pair, dh)
self.data_drawer.pair_dict[metadata['pair']][
self.data_drawer.pair_dict[pair][
'trained_timestamp'] = new_trained_timerange.stopts
dh.set_new_model_names(metadata, new_trained_timerange)
self.data_drawer.pair_dict[metadata['pair']]['first'] = False
dh.save_data(model, coin=metadata['pair'])
dh.set_new_model_names(pair, new_trained_timerange)
self.data_drawer.pair_dict[pair]['first'] = False
if self.data_drawer.pair_dict[pair]['priority'] == 1 and self.scanning:
with self.lock:
self.data_drawer.pair_to_end_of_training_queue(pair)
dh.save_data(model, coin=pair)
self.retrain = False
# Following methods which are overridden by user made prediction models.
# See freqai/prediction_models/CatboostPredictionModlel.py for an example.
@abstractmethod
def train(self, unfiltered_dataframe: DataFrame, metadata: dict, dh: FreqaiDataKitchen) -> Any:
def train(self, unfiltered_dataframe: DataFrame, pair: str, dh: FreqaiDataKitchen) -> Any:
"""
Filter the training data and train a model to it. Train makes heavy use of the datahandler
for storing, saving, loading, and analyzing the data.

View File

@ -532,7 +532,7 @@ class IStrategy(ABC, HyperStrategyMixin):
"""
return None
def populate_any_indicators(self, metadata: dict, pair: str, df: DataFrame, tf: str,
def populate_any_indicators(self, basepair: str, pair: str, df: DataFrame, tf: str,
informative: DataFrame = None, coin: str = "") -> DataFrame:
"""
Function designed to automatically generate, name and merge features