freqtrade_origin/freqtrade/data/converter/converter.py

622 lines
26 KiB
Python
Raw Normal View History

2017-11-18 07:34:32 +00:00
"""
2018-12-12 18:57:25 +00:00
Functions to convert data from one format to another
2017-11-18 07:34:32 +00:00
"""
2018-03-25 19:37:14 +00:00
import logging
from typing import Dict
import numpy as np
2018-08-05 04:41:06 +00:00
import pandas as pd
2018-03-02 15:22:00 +00:00
from pandas import DataFrame, to_datetime
2018-03-17 21:44:47 +00:00
from freqtrade.constants import DEFAULT_DATAFRAME_COLUMNS, Config
2023-07-09 13:28:05 +00:00
from freqtrade.enums import CandleType, TradingMode
2020-09-28 17:39:41 +00:00
2018-12-30 15:07:47 +00:00
2018-03-25 19:37:14 +00:00
logger = logging.getLogger(__name__)
def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame:
"""
Converts a list with candle (OHLCV) data (in format returned by ccxt.fetch_ohlcv)
to a Dataframe
:param ohlcv: list with candle (OHLCV) data, as returned by exchange.async_get_candle_history
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
2018-12-31 18:42:14 +00:00
:param fill_missing: fill up missing candles with 0 candles
(see ohlcv_fill_up_missing_data for details)
2019-06-09 12:35:58 +00:00
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
logger.debug(
f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
cols = DEFAULT_DATAFRAME_COLUMNS
df = DataFrame(ohlcv, columns=cols)
2023-04-10 14:33:56 +00:00
df['date'] = to_datetime(df['date'], unit='ms', utc=True)
# Some exchanges return int values for Volume and even for OHLC.
2019-02-10 19:23:00 +00:00
# Convert them since TA-LIB indicators used in the strategy assume floats
2019-02-10 19:13:40 +00:00
# and fail with exception...
df = df.astype(dtype={'open': 'float', 'high': 'float', 'low': 'float', 'close': 'float',
'volume': 'float'})
return clean_ohlcv_dataframe(df, timeframe, pair,
fill_missing=fill_missing,
drop_incomplete=drop_incomplete)
def _init_dataframe_with_trades_columns(dataframe: DataFrame):
"""
Populates a dataframe with trades columns
:param dataframe: Dataframe to populate
"""
dataframe['trades'] = dataframe.apply(lambda _: [], axis=1)
dataframe['orderflow'] = dataframe.apply(lambda _: {}, axis=1)
dataframe['bid'] = np.nan
dataframe['ask'] = np.nan
dataframe['delta'] = np.nan
dataframe['min_delta'] = np.nan
dataframe['max_delta'] = np.nan
dataframe['total_trades'] = np.nan
dataframe['stacked_imbalances_bid'] = np.nan
dataframe['stacked_imbalances_ask'] = np.nan
2023-05-11 14:42:59 +00:00
def _convert_timeframe_to_pandas_frequency(timeframe: str):
# convert timeframe to format usable by pandas
from freqtrade.exchange import timeframe_to_minutes
timeframe_minutes = timeframe_to_minutes(timeframe)
timeframe_frequency = f'{timeframe_minutes}min'
return (timeframe_frequency, timeframe_minutes)
def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str):
timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
timeframe)
# calculate ohlcv candle start and end
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes)
df.drop(columns=['datetime'], inplace=True)
def populate_dataframe_with_trades(config: Config, dataframe: DataFrame, trades: DataFrame, *, pair: str) -> DataFrame:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate
:param trades: Trades to populate with
:return: Dataframe with trades populated
"""
config_orderflow = config['orderflow']
timeframe = config['timeframe']
# create columns for trades
_init_dataframe_with_trades_columns(dataframe)
df = dataframe.copy()
try:
start_time = time.time()
# calculate ohlcv candle start and end
_calculate_ohlcv_candle_start_and_end(df, timeframe)
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# slice of trades that are before current ohlcv candles to make groupby faster
trades = trades.loc[trades.candle_start >= df.candle_start[0]]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
# repair 'date' datetime type (otherwise crashes on each compare)
if "date" in dataframe.columns:
dataframe['date'] = pd.to_datetime(dataframe['date'])
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades['candle_start']]
is_between = (candle_start == df['candle_start'])
if np.any(is_between == True):
(timeframe_frequency, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(
timeframe)
candle_next = candle_start + \
pd.Timedelta(minutes=timeframe_minutes)
# skip if there are no trades at next candle because that this candle isn't finished yet
# if not np.any((candle_next == df.candle_start)):
if not candle_next in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades might be unfinished, because no finished trades at {candle_next}")
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
'trades'].apply(lambda _: trades_grouped_df)
# calculate orderflow for each candle
df.loc[is_between, 'orderflow'] = df.loc[is_between, 'orderflow'].apply(
lambda _: trades_to_volumeprofile_with_total_delta_bid_ask(pd.DataFrame(trades_grouped_df), scale=config_orderflow['scale']))
# calculate imbalances for each candle's orderflow
df.loc[is_between, 'imbalances'] = df.loc[is_between, 'orderflow'].apply(
lambda x: trades_orderflow_to_imbalances(x, imbalance_ratio=config_orderflow['imbalance_ratio'], imbalance_volume=config_orderflow['imbalance_volume']))
df.loc[is_between, 'stacked_imbalances_bid'] = df.loc[is_between,
'imbalances'].apply(lambda x: stacked_imbalance_bid(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range']))
df.loc[is_between, 'stacked_imbalances_ask'] = df.loc[is_between,
'imbalances'].apply(lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=config_orderflow['stacked_imbalance_range']))
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('sell'), 0, trades_grouped_df['amount']))
deltas_per_trade = sell - buy
min_delta = 0
max_delta = 0
delta = 0
for deltas in deltas_per_trade:
for d in deltas:
delta += d
if delta > max_delta:
max_delta = delta
if delta < min_delta:
min_delta = delta
df.loc[is_between, 'max_delta'] = max_delta
df.loc[is_between, 'min_delta'] = min_delta
df.loc[is_between, 'bid'] = np.where(trades_grouped_df['side'].str.contains(
'buy'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'ask'] = np.where(trades_grouped_df['side'].str.contains(
'sell'), 0, trades_grouped_df['amount']).sum()
df.loc[is_between, 'delta'] = df.loc[is_between,
'ask'] - df.loc[is_between, 'bid']
min_delta = np.min(deltas_per_trade)
max_delta = np.max(deltas_per_trade)
df.loc[is_between, 'total_trades'] = len(trades_grouped_df)
dataframe.loc[is_between] = df.loc[is_between].copy() # copy to avoid memory leaks
else:
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
return dataframe
# TODO: remove timeframe and pair
def public_trades_to_dataframe(trades: list, timeframe: str, pair: str, *,
fill_missing: bool = True, drop_incomplete: bool = True) -> DataFrame:
"""
Converts a list with candle (TRADES) data (in format returned by ccxt.fetch_trades)
to a Dataframe
:param trades: list with candle (TRADES) data, as returned by exchange.async_get_candle_history
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
:param fill_missing: fill up missing candles with 0 candles
(see trades_fill_up_missing_data for details)
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
logger.debug(
f"Converting candle (TRADES) data to dataframe for pair {pair}.")
cols = DEFAULT_TRADES_COLUMNS
df = DataFrame(trades, columns=cols)
df['date'] = pd.to_datetime(
2023-06-05 07:17:28 +00:00
df['timestamp'], unit='ms', utc=True)
# Some exchanges return int values for Volume and even for OHLC.
# Convert them since TA-LIB indicators used in the strategy assume floats
# and fail with exception...
df = df.astype(dtype={'amount': 'float', 'cost': 'float',
'price': 'float'})
#
# df.columns
# df = clean_duplicate_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
# df = drop_incomplete_and_fill_missing_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
return df
def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: int):
"""
:param trades: dataframe
:param scale: scale aka bin size e.g. 0.5
:return: trades binned to levels according to scale aka orderflow
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(
trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(
trades['side'].str.contains('sell'), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
return df
df['delta'] = df['ask_amount'] - df['bid_amount']
df['total_volume'] = df['ask_amount'] + df['bid_amount']
df['total_trades'] = df['ask'] + df['bid']
# group to bins aka apply scale
df = df.groupby('price').sum(numeric_only=True)
return df
def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalance_volume: int):
"""
:param df: dataframes with bid and ask
:param imbalance_ratio: imbalance_ratio e.g. 300
:param imbalance_volume: imbalance volume e.g. 3)
:return: dataframe with bid and ask imbalance
"""
bid = df.bid
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = DataFrame(
{'bid_imbalance': bid_imbalance_filtered, 'ask_imbalance': ask_imbalance_filtered}, index=df.index)
return dataframe
def stacked_imbalance(df: DataFrame, label: str = "bid", stacked_imbalance_range: int = 3, should_reverse: bool = False):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
"""
imbalance = df[f'{label}_imbalance']
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = int_series * \
(int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1)
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
# TODO: do better than just take first
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
max_stacked_imbalance_idx)[0]
stacked_imbalance_price = imbalance.index[idx]
return stacked_imbalance_price
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int = 3):
return stacked_imbalance(df, 'bid', stacked_imbalance_range)
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int = 3):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
def orderflow_to_volume_profile(orderflow: DataFrame):
"""
:param orderflow: dataframe
:return: volume profile dataframe
"""
df = orderflow
bid = df.groupby('level').bid.sum()
ask = df.groupby('level').ask.sum()
df.groupby('level')['level'].sum()
delta = df.groupby('level').ask.sum() - df.groupby('level').bid.sum()
df = pd.DataFrame({'bid': bid, 'ask': ask, 'delta': delta})
return df
def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
"""
2021-06-25 13:45:49 +00:00
Cleanse a OHLCV dataframe by
* Grouping it by date (removes duplicate tics)
* dropping last candles if requested
* Filling up missing data (if requested)
:param data: DataFrame containing candle (OHLCV) data.
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
:param fill_missing: fill up missing candles with 0 candles
(see ohlcv_fill_up_missing_data for details)
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
# group by index and aggregate results to eliminate duplicate ticks
data = data.groupby(by='date', as_index=False, sort=True).agg({
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'max',
})
2019-06-09 12:35:58 +00:00
# eliminate partial candle
if drop_incomplete:
data.drop(data.tail(1).index, inplace=True)
2019-06-09 12:35:58 +00:00
logger.debug('Dropping last candle')
if fill_missing:
return ohlcv_fill_up_missing_data(data, timeframe, pair)
else:
return data
2018-08-05 04:41:06 +00:00
def warn_of_tick_duplicates(data: DataFrame, pair: str) -> None:
no_dupes_colunms = ['id', 'timestamp', 'datetime']
for col in no_dupes_colunms:
if col in data.columns and data[col].duplicated().any():
sum = data[col].duplicated().sum()
message = f'{sum} duplicated ticks for {pair} in {col} detected.'
if col == 'id':
logger.warning(message)
else:
logger.debug(message)
def clean_duplicate_trades(trades: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
"""
Cleanse a TRADES dataframe by
* Grouping it by date (removes duplicate tics)
* dropping last candles if requested
* Filling up missing data (if requested)
:param data: DataFrame containing candle (TRADES) data.
:param timeframe: timeframe (e.g. 5m). Used to fill up eventual missing data
:param pair: Pair this data is for (used to warn if fillup was necessary)
:param fill_missing: fill up missing candles with 0 candles
(see trades_fill_up_missing_data for details)
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
# group by index and aggregate results to eliminate duplicate ticks
# check if data has duplicate ticks
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
df = pd.DataFrame(trades_remove_duplicates(
trades.values.tolist()), columns=trades.columns)
#
# from freqtrade.exchange import timeframe_to_minutes
# timeframe_minutes = timeframe_to_minutes(timeframe)
# sum_dict = {}
# for col in ['amount']: # TODO: remove side,etc
# sum_dict[col] = 'sum'
# group by index and aggregate results to eliminate duplicate ticks
# df = data.groupby(
# by='date', as_index=False, sort=True).agg(sum_dict) # NOTE: sum doesn't make much sense for eliminating duplicates?
return df
def drop_incomplete_and_fill_missing_trades(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
# eliminate partial candle
if drop_incomplete:
# TODO: this is not correct, as it drops the last trade only
# but we need to drop the last candle until closed
pass
data.drop(data.tail(1).index, inplace=True)
logger.debug('Dropping last trade')
if fill_missing:
return trades_fill_up_missing_data(data, timeframe, pair)
else:
return data
def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame:
2018-12-30 15:07:47 +00:00
"""
Fills up missing data with 0 volume rows,
using the previous close as price for "open", "high" "low" and "close", volume is set to 0
"""
from freqtrade.exchange import timeframe_to_minutes
ohlcv_dict = {
2018-12-30 15:07:47 +00:00
'open': 'first',
'high': 'max',
'low': 'min',
'close': 'last',
'volume': 'sum'
}
timeframe_minutes = timeframe_to_minutes(timeframe)
resample_interval = f'{timeframe_minutes}min'
if timeframe_minutes >= 43200 and timeframe_minutes < 525600:
# Monthly candles need special treatment to stick to the 1st of the month
resample_interval = f'{timeframe}S'
elif timeframe_minutes > 43200:
resample_interval = timeframe
2018-12-30 15:07:47 +00:00
# Resample to create "NAN" values
df = dataframe.resample(resample_interval, on='date').agg(ohlcv_dict)
2018-12-30 15:07:47 +00:00
# Forwardfill close for missing columns
df['close'] = df['close'].ffill()
2018-12-30 15:07:47 +00:00
# Use close for "open, high, low"
df.loc[:, ['open', 'high', 'low']] = df[['open', 'high', 'low']].fillna(
value={'open': df['close'],
'high': df['close'],
'low': df['close'],
})
df.reset_index(inplace=True)
2019-06-15 11:31:14 +00:00
len_before = len(dataframe)
len_after = len(df)
pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0
2019-06-15 11:31:14 +00:00
if len_before != len_after:
message = (f"Missing data fillup for {pair}: before: {len_before} - after: {len_after}"
2021-11-11 14:58:30 +00:00
f" - {pct_missing:.2%}")
if pct_missing > 0.01:
logger.info(message)
else:
# Don't be verbose if only a small amount is missing
logger.debug(message)
2018-12-30 15:07:47 +00:00
return df
def trim_dataframe(df: DataFrame, timerange, *, df_date_col: str = 'date',
startup_candles: int = 0) -> DataFrame:
"""
Trim dataframe based on given timerange
:param df: Dataframe to trim
:param timerange: timerange (use start and end date if available)
:param df_date_col: Column in the dataframe to use as Date column
:param startup_candles: When not 0, is used instead the timerange start date
:return: trimmed dataframe
"""
if startup_candles:
# Trim candles instead of timeframe in case of given startup_candle count
df = df.iloc[startup_candles:, :]
else:
if timerange.starttype == 'date':
df = df.loc[df[df_date_col] >= timerange.startdt, :]
if timerange.stoptype == 'date':
df = df.loc[df[df_date_col] <= timerange.stopdt, :]
return df
2021-05-21 06:52:56 +00:00
def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange,
startup_candles: int) -> Dict[str, DataFrame]:
"""
Trim startup period from analyzed dataframes
:param preprocessed: Dict of pair: dataframe
:param timerange: timerange (use start and end date if available)
:param startup_candles: Startup-candles that should be removed
:return: Dict of trimmed dataframes
"""
processed: Dict[str, DataFrame] = {}
for pair, df in preprocessed.items():
trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles)
if not trimed_df.empty:
processed[pair] = trimed_df
else:
logger.warning(f'{pair} has no data left after adjusting for startup candles, '
f'skipping.')
return processed
2018-08-05 13:08:07 +00:00
def order_book_to_dataframe(bids: list, asks: list) -> DataFrame:
2018-08-05 04:41:06 +00:00
"""
2019-12-28 09:54:10 +00:00
TODO: This should get a dedicated test
2018-08-05 04:41:06 +00:00
Gets order book list, returns dataframe with below format per suggested by creslin
-------------------------------------------------------------------
b_sum b_size bids asks a_size a_sum
-------------------------------------------------------------------
"""
cols = ['bids', 'b_size']
2018-08-05 13:08:07 +00:00
bids_frame = DataFrame(bids, columns=cols)
2018-08-05 04:41:06 +00:00
# add cumulative sum column
bids_frame['b_sum'] = bids_frame['b_size'].cumsum()
cols2 = ['asks', 'a_size']
2018-08-05 13:08:07 +00:00
asks_frame = DataFrame(asks, columns=cols2)
2018-08-05 04:41:06 +00:00
# add cumulative sum column
asks_frame['a_sum'] = asks_frame['a_size'].cumsum()
frame = pd.concat([bids_frame['b_sum'], bids_frame['b_size'], bids_frame['bids'],
asks_frame['asks'], asks_frame['a_size'], asks_frame['a_sum']], axis=1,
keys=['b_sum', 'b_size', 'bids', 'asks', 'a_size', 'a_sum'])
# logger.info('order book %s', frame )
return frame
2019-10-13 17:21:27 +00:00
def convert_ohlcv_format(
2022-09-18 11:20:36 +00:00
config: Config,
convert_from: str,
convert_to: str,
erase: bool,
):
"""
Convert OHLCV from one format to another
:param config: Config dictionary
:param convert_from: Source format
:param convert_to: Target format
2021-08-16 12:16:24 +00:00
:param erase: Erase source data (does not apply if source and target format are identical)
"""
from freqtrade.data.history.idatahandler import get_datahandler
src = get_datahandler(config['datadir'], convert_from)
trg = get_datahandler(config['datadir'], convert_to)
timeframes = config.get('timeframes', [config.get('timeframe')])
logger.info(f"Converting candle (OHLCV) for timeframe {timeframes}")
2023-07-09 13:28:05 +00:00
candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [
c.value for c in CandleType])]
logger.info(candle_types)
paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT)
paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES))
if 'pairs' in config:
# Filter pairs
paircombs = [comb for comb in paircombs if comb[0] in config['pairs']]
if 'timeframes' in config:
paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']]
paircombs = [comb for comb in paircombs if comb[2] in candle_types]
paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value))
formatted_paircombs = '\n'.join([f"{pair}, {timeframe}, {candle_type}"
for pair, timeframe, candle_type in paircombs])
logger.info(f"Converting candle (OHLCV) data for the following pair combinations:\n"
f"{formatted_paircombs}")
for pair, timeframe, candle_type in paircombs:
data = src.ohlcv_load(pair=pair, timeframe=timeframe,
timerange=None,
fill_missing=False,
drop_incomplete=False,
startup_candles=0,
candle_type=candle_type)
logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}")
if len(data) > 0:
trg.ohlcv_store(
pair=pair,
timeframe=timeframe,
data=data,
candle_type=candle_type
2023-07-09 13:28:05 +00:00
)
if erase and convert_from != convert_to:
logger.info(f"Deleting source data for {pair} / {timeframe}")
src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type)
def reduce_dataframe_footprint(df: DataFrame) -> DataFrame:
"""
Ensure all values are float32 in the incoming dataframe.
:param df: Dataframe to be converted to float/int 32s
:return: Dataframe converted to float/int 32s
"""
logger.debug(f"Memory usage of dataframe is "
f"{df.memory_usage().sum() / 1024**2:.2f} MB")
df_dtypes = df.dtypes
for column, dtype in df_dtypes.items():
if column in ['open', 'high', 'low', 'close', 'volume']:
continue
if dtype == np.float64:
df_dtypes[column] = np.float32
elif dtype == np.int64:
df_dtypes[column] = np.int32
df = df.astype(df_dtypes)
logger.debug(f"Memory usage after optimization is: "
f"{df.memory_usage().sum() / 1024**2:.2f} MB")
return df