Resolve various issues and comments

This commit is contained in:
Joe Schr 2024-02-12 13:09:28 +01:00
parent 6e7a536c7a
commit d7b88194e0
2 changed files with 59 additions and 50 deletions

View File

@ -3,15 +3,16 @@ Functions to convert data from one format to another
"""
import logging
import time
from typing import Dict, List
from typing import Dict
import numpy as np
import pandas as pd
from pandas import DataFrame, to_datetime
from freqtrade.constants import (DEFAULT_DATAFRAME_COLUMNS, DEFAULT_ORDERFLOW_COLUMNS,
DEFAULT_TRADES_COLUMNS, Config)
Config)
from freqtrade.enums import CandleType, TradingMode
from freqtrade.exchange.exchange_utils import timeframe_to_resample_freq
logger = logging.getLogger(__name__)
@ -30,7 +31,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
logger.debug(
f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
cols = DEFAULT_DATAFRAME_COLUMNS
df = DataFrame(ohlcv, columns=cols)
@ -72,13 +74,16 @@ def _convert_timeframe_to_pandas_frequency(timeframe: str):
def _calculate_ohlcv_candle_start_and_end(df: DataFrame, timeframe: str):
timeframe_frequency, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
_, timeframe_minutes = _convert_timeframe_to_pandas_frequency(
timeframe)
timeframe_frequency = timeframe_to_resample_freq(timeframe)
# calculate ohlcv candle start and end
if df is not None and not df.empty:
df['datetime'] = pd.to_datetime(df['date'], unit='ms')
df['candle_start'] = df['datetime'].dt.floor(timeframe_frequency)
df['candle_end'] = df['candle_start'] + pd.Timedelta(timeframe_minutes)
df['candle_end'] = df['candle_start'] + \
pd.Timedelta(minutes=timeframe_minutes)
df.drop(columns=['datetime'], inplace=True)
@ -103,25 +108,26 @@ def populate_dataframe_with_trades(config: Config,
try:
start_time = time.time()
# calculate ohlcv candle start and end
# TODO: check if call is necessary for df.
_calculate_ohlcv_candle_start_and_end(df, timeframe)
_calculate_ohlcv_candle_start_and_end(trades, timeframe)
# slice of trades that are before current ohlcv candles to make groupby faster
# TODO: maybe use df.date instead of df.candle_start at comparision below
trades = trades.loc[trades.candle_start >= df.candle_start[0]]
trades.reset_index(inplace=True, drop=True)
# group trades by candle start
trades_grouped_by_candle_start = trades.groupby('candle_start', group_keys=False)
# repair 'date' datetime type (otherwise crashes on each compare)
if "date" in dataframe.columns:
dataframe['date'] = pd.to_datetime(dataframe['date'])
trades_grouped_by_candle_start = trades.groupby(
'candle_start', group_keys=False)
for candle_start in trades_grouped_by_candle_start.groups:
trades_grouped_df = trades[candle_start == trades['candle_start']]
is_between = (candle_start == df['candle_start'])
if np.any(is_between == True): # noqa: E712
(_, timeframe_minutes) = _convert_timeframe_to_pandas_frequency(timeframe)
candle_next = candle_start + pd.Timedelta(minutes=timeframe_minutes)
candle_next = candle_start + \
pd.Timedelta(minutes=timeframe_minutes)
# skip if there are no trades at next candle
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
@ -153,6 +159,7 @@ def populate_dataframe_with_trades(config: Config,
is_between, 'imbalances'].apply(
lambda x: stacked_imbalance_ask(x, stacked_imbalance_range=_stacked_imb))
# TODO: maybe use simple np.where instead
buy = df.loc[is_between, 'bid'].apply(lambda _: np.where(
trades_grouped_df['side'].str.contains('buy'), 0, trades_grouped_df['amount']))
sell = df.loc[is_between, 'ask'].apply(lambda _: np.where(
@ -184,10 +191,13 @@ def populate_dataframe_with_trades(config: Config,
# copy to avoid memory leaks
dataframe.loc[is_between] = df.loc[is_between].copy()
else:
logger.debug(f"Found NO candles for trades starting with {candle_start}")
logger.debug(f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(
f"Found NO candles for trades starting with {candle_start}")
logger.debug(
f"trades.groups_keys in {time.time() - start_time} seconds")
logger.debug(f"trades.singleton_iterate in {time.time() - start_time} seconds")
logger.debug(
f"trades.singleton_iterate in {time.time() - start_time} seconds")
except Exception as e:
logger.exception("Error populating dataframe with trades:", e)
@ -203,13 +213,16 @@ def trades_to_volumeprofile_with_total_delta_bid_ask(trades: DataFrame, scale: f
"""
df = pd.DataFrame([], columns=DEFAULT_ORDERFLOW_COLUMNS)
# create bid, ask where side is sell or buy
df['bid_amount'] = np.where(trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid_amount'] = np.where(
trades['side'].str.contains('buy'), 0, trades['amount'])
df['ask_amount'] = np.where(
trades['side'].str.contains('sell'), 0, trades['amount'])
df['bid'] = np.where(trades['side'].str.contains('buy'), 0, 1)
df['ask'] = np.where(trades['side'].str.contains('sell'), 0, 1)
# round the prices to the nearest multiple of the scale
df['price'] = ((trades['price'] / scale).round() * scale).astype('float64').values
df['price'] = ((trades['price'] / scale).round()
* scale).astype('float64').values
if df.empty:
df['total'] = np.nan
df['delta'] = np.nan
@ -235,23 +248,25 @@ def trades_orderflow_to_imbalances(df: DataFrame, imbalance_ratio: int, imbalanc
ask = df.ask.shift(-1)
bid_imbalance = (bid / ask) > (imbalance_ratio / 100)
# overwrite bid_imbalance with False if volume is not big enough
bid_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, bid_imbalance)
bid_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, bid_imbalance)
ask_imbalance = (ask / bid) > (imbalance_ratio / 100)
# overwrite ask_imbalance with False if volume is not big enough
ask_imbalance_filtered = np.where(df.total_volume < imbalance_volume, False, ask_imbalance)
ask_imbalance_filtered = np.where(
df.total_volume < imbalance_volume, False, ask_imbalance)
dataframe = DataFrame({
"bid_imbalance": bid_imbalance_filtered,
"ask_imbalance": ask_imbalance_filtered
}, index=df.index,
}, index=df.index,
)
return dataframe
def stacked_imbalance(df: DataFrame,
label: str = "bid",
stacked_imbalance_range: int = 3,
should_reverse: bool = False):
label: str,
stacked_imbalance_range: int,
should_reverse: bool):
"""
y * (y.groupby((y != y.shift()).cumsum()).cumcount() + 1)
https://stackoverflow.com/questions/27626542/counting-consecutive-positive-values-in-python-pandas-array
@ -260,11 +275,13 @@ def stacked_imbalance(df: DataFrame,
int_series = pd.Series(np.where(imbalance, 1, 0))
stacked = (
int_series * (
int_series.groupby((int_series != int_series.shift()).cumsum()).cumcount() + 1
)
int_series.groupby(
(int_series != int_series.shift()).cumsum()).cumcount() + 1
)
)
max_stacked_imbalance_idx = stacked.index[stacked >= stacked_imbalance_range]
max_stacked_imbalance_idx = stacked.index[stacked >=
stacked_imbalance_range]
stacked_imbalance_price = np.nan
if not max_stacked_imbalance_idx.empty:
idx = max_stacked_imbalance_idx[0] if not should_reverse else np.flipud(
@ -273,11 +290,11 @@ def stacked_imbalance(df: DataFrame,
return stacked_imbalance_price
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int = 3):
return stacked_imbalance(df, 'bid', stacked_imbalance_range)
def stacked_imbalance_bid(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'bid', stacked_imbalance_range, should_reverse=False)
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int = 3):
def stacked_imbalance_ask(df: DataFrame, stacked_imbalance_range: int):
return stacked_imbalance(df, 'ask', stacked_imbalance_range, should_reverse=True)
@ -328,20 +345,6 @@ def clean_ohlcv_dataframe(data: DataFrame, timeframe: str, pair: str, *,
return data
def drop_incomplete_and_fill_missing_trades(data: DataFrame, timeframe: str, pair: str, *,
fill_missing: bool, drop_incomplete: bool) -> DataFrame:
# eliminate partial candle
if drop_incomplete:
# TODO: this is not correct, as it drops the last trade only
# but we need to drop the last candle until closed
pass
data.drop(data.tail(1).index, inplace=True)
logger.debug('Dropping last trade')
return data
def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str) -> DataFrame:
"""
Fills up missing data with 0 volume rows,
@ -372,7 +375,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str)
df.reset_index(inplace=True)
len_before = len(dataframe)
len_after = len(df)
pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0
pct_missing = (len_after - len_before) / \
len_before if len_before > 0 else 0
if len_before != len_after:
message = (f"Missing data fillup for {pair}, {timeframe}: "
f"before: {len_before} - after: {len_after} - {pct_missing:.2%}")
@ -417,7 +421,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange,
processed: Dict[str, DataFrame] = {}
for pair, df in preprocessed.items():
trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles)
trimed_df = trim_dataframe(
df, timerange, startup_candles=startup_candles)
if not trimed_df.empty:
processed[pair] = trimed_df
else:
@ -473,15 +478,18 @@ def convert_ohlcv_format(
candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [
c.value for c in CandleType])]
logger.info(candle_types)
paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT)
paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES))
paircombs = src.ohlcv_get_available_data(
config['datadir'], TradingMode.SPOT)
paircombs.extend(src.ohlcv_get_available_data(
config['datadir'], TradingMode.FUTURES))
if 'pairs' in config:
# Filter pairs
paircombs = [comb for comb in paircombs if comb[0] in config['pairs']]
if 'timeframes' in config:
paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']]
paircombs = [comb for comb in paircombs if comb[1]
in config['timeframes']]
paircombs = [comb for comb in paircombs if comb[2] in candle_types]
paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value))
@ -498,7 +506,8 @@ def convert_ohlcv_format(
drop_incomplete=False,
startup_candles=0,
candle_type=candle_type)
logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}")
logger.info(
f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}")
if len(data) > 0:
trg.ohlcv_store(
pair=pair,
@ -508,7 +517,8 @@ def convert_ohlcv_format(
)
if erase and convert_from != convert_to:
logger.info(f"Deleting source data for {pair} / {timeframe}")
src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type)
src.ohlcv_purge(pair=pair, timeframe=timeframe,
candle_type=candle_type)
def reduce_dataframe_footprint(df: DataFrame) -> DataFrame:

View File

@ -2257,7 +2257,6 @@ class Exchange:
candle_type,
all_stored_ticks_list,
cache,
drop_incomplete=False,
first_required_candle_date=first_candle_ms)
results_df[(pair, timeframe, candle_type)] = trades_df
data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS])