mypy fixes

This commit is contained in:
Joe Schr 2024-02-08 11:57:15 +01:00
parent 14fb29516a
commit 1a0610f3e4
4 changed files with 114 additions and 87 deletions

View File

@ -3,7 +3,7 @@ Functions to convert data from one format to another
"""
import logging
import time
from typing import Dict
from typing import Dict, List
import numpy as np
import pandas as pd
@ -31,7 +31,8 @@ def ohlcv_to_dataframe(ohlcv: list, timeframe: str, pair: str, *,
:param drop_incomplete: Drop the last candle of the dataframe, assuming it's incomplete
:return: DataFrame
"""
logger.debug(f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
logger.debug(
f"Converting candle (OHLCV) data to dataframe for pair {pair}.")
cols = DEFAULT_DATAFRAME_COLUMNS
df = DataFrame(ohlcv, columns=cols)
@ -130,7 +131,7 @@ def populate_dataframe_with_trades(config: Config,
# because that this candle isn't finished yet
if candle_next not in trades_grouped_by_candle_start.groups:
logger.warning(
f"candle at {candle_start} with {len(trades_grouped_df)} trades might be unfinished, because no finished trades at {candle_next}") # noqa
f"candle at {candle_start} with {len(trades_grouped_df)} trades might be unfinished, because no finished trades at {candle_next}") # noqa
# add trades to each candle
df.loc[is_between, 'trades'] = df.loc[is_between,
@ -200,11 +201,9 @@ def populate_dataframe_with_trades(config: Config,
return dataframe
def public_trades_to_dataframe(trades: list,
timeframe: str,
pair: str, *,
fill_missing: bool = True,
drop_incomplete: bool = True) -> DataFrame:
def public_trades_to_dataframe(trades: List,
pair: str,
) -> DataFrame:
"""
Converts a list with candle (TRADES) data (in format returned by ccxt.fetch_trades)
to a Dataframe
@ -228,15 +227,6 @@ def public_trades_to_dataframe(trades: list,
# and fail with exception...
df = df.astype(dtype={'amount': 'float', 'cost': 'float',
'price': 'float'})
#
# df.columns
# df = clean_duplicate_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
# df = drop_incomplete_and_fill_missing_trades(df, timeframe, pair,
# fill_missing=fill_missing,
# drop_incomplete=drop_incomplete)
return df
@ -459,7 +449,8 @@ def ohlcv_fill_up_missing_data(dataframe: DataFrame, timeframe: str, pair: str)
df.reset_index(inplace=True)
len_before = len(dataframe)
len_after = len(df)
pct_missing = (len_after - len_before) / len_before if len_before > 0 else 0
pct_missing = (len_after - len_before) / \
len_before if len_before > 0 else 0
if len_before != len_after:
message = (f"Missing data fillup for {pair}, {timeframe}: "
f"before: {len_before} - after: {len_after} - {pct_missing:.2%}")
@ -504,7 +495,8 @@ def trim_dataframes(preprocessed: Dict[str, DataFrame], timerange,
processed: Dict[str, DataFrame] = {}
for pair, df in preprocessed.items():
trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles)
trimed_df = trim_dataframe(
df, timerange, startup_candles=startup_candles)
if not trimed_df.empty:
processed[pair] = trimed_df
else:
@ -560,15 +552,18 @@ def convert_ohlcv_format(
candle_types = [CandleType.from_string(ct) for ct in config.get('candle_types', [
c.value for c in CandleType])]
logger.info(candle_types)
paircombs = src.ohlcv_get_available_data(config['datadir'], TradingMode.SPOT)
paircombs.extend(src.ohlcv_get_available_data(config['datadir'], TradingMode.FUTURES))
paircombs = src.ohlcv_get_available_data(
config['datadir'], TradingMode.SPOT)
paircombs.extend(src.ohlcv_get_available_data(
config['datadir'], TradingMode.FUTURES))
if 'pairs' in config:
# Filter pairs
paircombs = [comb for comb in paircombs if comb[0] in config['pairs']]
if 'timeframes' in config:
paircombs = [comb for comb in paircombs if comb[1] in config['timeframes']]
paircombs = [comb for comb in paircombs if comb[1]
in config['timeframes']]
paircombs = [comb for comb in paircombs if comb[2] in candle_types]
paircombs = sorted(paircombs, key=lambda x: (x[0], x[1], x[2].value))
@ -585,7 +580,8 @@ def convert_ohlcv_format(
drop_incomplete=False,
startup_candles=0,
candle_type=candle_type)
logger.info(f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}")
logger.info(
f"Converting {len(data)} {timeframe} {candle_type} candles for {pair}")
if len(data) > 0:
trg.ohlcv_store(
pair=pair,
@ -595,7 +591,8 @@ def convert_ohlcv_format(
)
if erase and convert_from != convert_to:
logger.info(f"Deleting source data for {pair} / {timeframe}")
src.ohlcv_purge(pair=pair, timeframe=timeframe, candle_type=candle_type)
src.ohlcv_purge(pair=pair, timeframe=timeframe,
candle_type=candle_type)
def reduce_dataframe_footprint(df: DataFrame) -> DataFrame:

View File

@ -46,23 +46,27 @@ class DataProvider:
self._exchange = exchange
self._pairlists = pairlists
self.__rpc = rpc
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
self.__cached_pairs: Dict[PairWithTimeframe,
Tuple[DataFrame, datetime]] = {}
self.__slice_index: Optional[int] = None
self.__slice_date: Optional[datetime] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {
}
self.__producer_pairs_df: Dict[str,
Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]] = {}
self.__producer_pairs: Dict[str, List[str]] = {}
self._msg_queue: deque = deque()
self._default_candle_type = self._config.get('candle_type_def', CandleType.SPOT)
self._default_candle_type = self._config.get(
'candle_type_def', CandleType.SPOT)
self._default_timeframe = self._config.get('timeframe', '1h')
self.__msg_cache = PeriodicCache(
maxsize=1000, ttl=timeframe_to_seconds(self._default_timeframe))
self.producers = self._config.get('external_message_consumer', {}).get('producers', [])
self.producers = self._config.get(
'external_message_consumer', {}).get('producers', [])
self.external_data_enabled = len(self.producers) > 0
def _set_dataframe_max_index(self, limit_index: int):
@ -133,19 +137,19 @@ class DataProvider:
"""
if self.__rpc:
msg: RPCAnalyzedDFMsg = {
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
'df': dataframe.tail(1),
'la': datetime.now(timezone.utc)
}
'type': RPCMessageType.ANALYZED_DF,
'data': {
'key': pair_key,
'df': dataframe.tail(1),
'la': datetime.now(timezone.utc)
}
}
self.__rpc.send_msg(msg)
if new_candle:
self.__rpc.send_msg({
'type': RPCMessageType.NEW_CANDLE,
'data': pair_key,
})
'type': RPCMessageType.NEW_CANDLE,
'data': pair_key,
})
def _replace_external_df(
self,
@ -168,10 +172,13 @@ class DataProvider:
if producer_name not in self.__producer_pairs_df:
self.__producer_pairs_df[producer_name] = {}
_last_analyzed = datetime.now(timezone.utc) if not last_analyzed else last_analyzed
_last_analyzed = datetime.now(
timezone.utc) if not last_analyzed else last_analyzed
self.__producer_pairs_df[producer_name][pair_key] = (dataframe, _last_analyzed)
logger.debug(f"External DataFrame for {pair_key} from {producer_name} added.")
self.__producer_pairs_df[producer_name][pair_key] = (
dataframe, _last_analyzed)
logger.debug(
f"External DataFrame for {pair_key} from {producer_name} added.")
def _add_external_df(
self,
@ -222,7 +229,8 @@ class DataProvider:
# CHECK FOR MISSING CANDLES
# Convert the timeframe to a timedelta for pandas
timeframe_delta: Timedelta = to_timedelta(timeframe)
local_last: Timestamp = existing_df.iloc[-1]['date'] # We want the last date from our copy
# We want the last date from our copy
local_last: Timestamp = existing_df.iloc[-1]['date']
# We want the first date from the incoming
incoming_first: Timestamp = dataframe.iloc[0]['date']
@ -245,13 +253,13 @@ class DataProvider:
# Everything is good, we appended
self._replace_external_df(
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
pair,
appended_df,
last_analyzed=last_analyzed,
timeframe=timeframe,
candle_type=candle_type,
producer_name=producer_name
)
return (True, 0)
def get_producer_df(
@ -339,10 +347,13 @@ class DataProvider:
startup_candles = self._config.get('startup_candle_count', 0)
indicator_periods = freqai_config['feature_parameters']['indicator_periods_candles']
# make sure the startupcandles is at least the set maximum indicator periods
self._config['startup_candle_count'] = max(startup_candles, max(indicator_periods))
self._config['startup_candle_count'] = max(
startup_candles, max(indicator_periods))
tf_seconds = timeframe_to_seconds(timeframe)
train_candles = freqai_config['train_period_days'] * 86400 / tf_seconds
total_candles = int(self._config['startup_candle_count'] + train_candles)
train_candles = freqai_config['train_period_days'] * \
86400 / tf_seconds
total_candles = int(
self._config['startup_candle_count'] + train_candles)
logger.info(
f'Increasing startup_candle_count for freqai on {timeframe} to {total_candles}')
return total_candles
@ -365,18 +376,22 @@ class DataProvider:
"""
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
# Get live OHLCV data.
data = self.ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
data = self.ohlcv(pair=pair, timeframe=timeframe,
candle_type=candle_type)
else:
# Get historical OHLCV data (cached on disk).
timeframe = timeframe or self._config['timeframe']
data = self.historic_ohlcv(pair=pair, timeframe=timeframe, candle_type=candle_type)
data = self.historic_ohlcv(
pair=pair, timeframe=timeframe, candle_type=candle_type)
# Cut date to timeframe-specific date.
# This is necessary to prevent lookahead bias in callbacks through informative pairs.
if self.__slice_date:
cutoff_date = timeframe_to_prev_date(timeframe, self.__slice_date)
cutoff_date = timeframe_to_prev_date(
timeframe, self.__slice_date)
data = data.loc[data['date'] < cutoff_date]
if len(data) == 0:
logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).")
logger.warning(
f"No data found for ({pair}, {timeframe}, {candle_type}).")
return data
def get_analyzed_dataframe(self, pair: str, timeframe: str) -> Tuple[DataFrame, datetime]:
@ -389,7 +404,8 @@ class DataProvider:
combination.
Returns empty dataframe and Epoch 0 (1970-01-01) if no dataframe was cached.
"""
pair_key = (pair, timeframe, self._config.get('candle_type_def', CandleType.SPOT))
pair_key = (pair, timeframe, self._config.get(
'candle_type_def', CandleType.SPOT))
if pair_key in self.__cached_pairs:
if self.runmode in (RunMode.DRY_RUN, RunMode.LIVE):
df, date = self.__cached_pairs[pair_key]
@ -397,7 +413,8 @@ class DataProvider:
df, date = self.__cached_pairs[pair_key]
if self.__slice_index is not None:
max_index = self.__slice_index
df = df.iloc[max(0, max_index - MAX_DATAFRAME_CANDLES):max_index]
df = df.iloc[max(
0, max_index - MAX_DATAFRAME_CANDLES):max_index]
return df, date
else:
return (DataFrame(), datetime.fromtimestamp(0, tz=timezone.utc))
@ -422,7 +439,8 @@ class DataProvider:
if self._pairlists:
return self._pairlists.whitelist.copy()
else:
raise OperationalException("Dataprovider was not initialized with a pairlist provider.")
raise OperationalException(
"Dataprovider was not initialized with a pairlist provider.")
def clear_cache(self):
"""
@ -461,8 +479,8 @@ class DataProvider:
if use_public_trades:
datahandler = get_datahandler(
self._config['datadir'], data_format=self._config['dataformat_trades'])
return self._exchange.refresh_latest_trades(pairlist, datahandler)
return {}
if self._exchange:
self._exchange.refresh_latest_trades(pairlist, datahandler)
@property
def available_pairs(self) -> ListPairsWithTimeframes:
@ -533,8 +551,8 @@ class DataProvider:
data_handler = get_datahandler(
self._config['datadir'], data_format=self._config['dataformat_trades'])
ticks = data_handler.trades_load(pair)
trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False,
drop_incomplete=False)
trades_df = public_trades_to_dataframe(
ticks.values.tolist(), pair=pair)
return trades_df
else:

View File

@ -107,9 +107,11 @@ def load_data(datadir: Path,
result[pair] = hist
else:
if candle_type is CandleType.FUNDING_RATE and user_futures_funding_rate is not None:
logger.warn(f"{pair} using user specified [{user_futures_funding_rate}]")
logger.warn(
f"{pair} using user specified [{user_futures_funding_rate}]")
elif candle_type not in (CandleType.SPOT, CandleType.FUTURES):
result[pair] = DataFrame(columns=["date", "open", "close", "high", "low", "volume"])
result[pair] = DataFrame(
columns=["date", "open", "close", "high", "low", "volume"])
if fail_without_data and not result:
raise OperationalException("No data found. Terminating.")
@ -217,7 +219,8 @@ def _download_pair_history(pair: str, *,
try:
if erase:
if data_handler.ohlcv_purge(pair, timeframe, candle_type=candle_type):
logger.info(f'Deleting existing data for pair {pair}, {timeframe}, {candle_type}.')
logger.info(
f'Deleting existing data for pair {pair}, {timeframe}, {candle_type}.')
data, since_ms, until_ms = _load_cached_data_for_updating(
pair, timeframe, timerange,
@ -266,7 +269,8 @@ def _download_pair_history(pair: str, *,
f"{data.iloc[-1]['date']:{DATETIME_PRINT_FORMAT}}"
if not data.empty else 'None')
data_handler.ohlcv_store(pair, timeframe, data=data, candle_type=candle_type)
data_handler.ohlcv_store(
pair, timeframe, data=data, candle_type=candle_type)
return True
except Exception:
@ -299,7 +303,8 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes
continue
for timeframe in timeframes:
logger.debug(f'Downloading pair {pair}, {candle_type}, interval {timeframe}.')
logger.debug(
f'Downloading pair {pair}, {candle_type}, interval {timeframe}.')
process = f'{idx}/{len(pairs)}'
_download_pair_history(pair=pair, process=process,
datadir=datadir, exchange=exchange,
@ -313,12 +318,15 @@ def refresh_backtest_ohlcv_data(exchange: Exchange, pairs: List[str], timeframes
tf_mark = exchange.get_option('mark_ohlcv_timeframe')
tf_funding_rate = exchange.get_option('funding_fee_timeframe')
fr_candle_type = CandleType.from_string(exchange.get_option('mark_ohlcv_price'))
fr_candle_type = CandleType.from_string(
exchange.get_option('mark_ohlcv_price'))
# All exchanges need FundingRate for futures trading.
# The timeframe is aligned to the mark-price timeframe.
combs = ((CandleType.FUNDING_RATE, tf_funding_rate), (fr_candle_type, tf_mark))
combs = ((CandleType.FUNDING_RATE, tf_funding_rate),
(fr_candle_type, tf_mark))
for candle_type_f, tf in combs:
logger.debug(f'Downloading pair {pair}, {candle_type_f}, interval {tf}.')
logger.debug(
f'Downloading pair {pair}, {candle_type_f}, interval {tf}.')
_download_pair_history(pair=pair, process=process,
datadir=datadir, exchange=exchange,
timerange=timerange, data_handler=data_handler,
@ -368,8 +376,9 @@ def _download_trades_history(exchange: Exchange,
# Reset since to the last available point
# - 5 seconds (to ensure we're getting all trades)
since = trades.iloc[-1]['timestamp'] - (5 * 1000)
logger.info(f"Using last trade date -5s - Downloading trades for {pair} "
f"since: {format_ms_time(since)}.")
if since:
logger.info(f"Using last trade date -5s - Downloading trades for {pair} "
f"since: {format_ms_time(since)}.")
if not since:
since = dt_ts(dt_now() - timedelta(days=new_pairs_days))
@ -443,7 +452,8 @@ def get_timerange(data: Dict[str, DataFrame]) -> Tuple[datetime, datetime]:
:return: tuple containing min_date, max_date
"""
timeranges = [
(frame['date'].min().to_pydatetime(), frame['date'].max().to_pydatetime())
(frame['date'].min().to_pydatetime(),
frame['date'].max().to_pydatetime())
for frame in data.values()
]
return (min(timeranges, key=operator.itemgetter(0))[0],
@ -462,7 +472,8 @@ def validate_backtest_data(data: DataFrame, pair: str, min_date: datetime,
:param timeframe_min: Timeframe in minutes
"""
# total difference in minutes / timeframe-minutes
expected_frames = int((max_date - min_date).total_seconds() // 60 // timeframe_min)
expected_frames = int(
(max_date - min_date).total_seconds() // 60 // timeframe_min)
found_missing = False
dflen = len(data)
if dflen < expected_frames:
@ -476,7 +487,8 @@ def download_data_main(config: Config) -> None:
timerange = TimeRange()
if 'days' in config:
time_since = (datetime.now() - timedelta(days=config['days'])).strftime("%Y%m%d")
time_since = (datetime.now() -
timedelta(days=config['days'])).strftime("%Y%m%d")
timerange = TimeRange.parse_timerange(f'{time_since}-')
if 'timerange' in config:
@ -493,7 +505,7 @@ def download_data_main(config: Config) -> None:
available_pairs = [
p for p in exchange.get_markets(
tradable_only=True, active_only=not config.get('include_inactive')
).keys()
).keys()
]
expanded_pairs = dynamic_expand_pairlist(config, available_pairs)
@ -526,7 +538,8 @@ def download_data_main(config: Config) -> None:
# Convert downloaded trade data to different timeframes
convert_trades_to_ohlcv(
pairs=expanded_pairs, timeframes=config['timeframes'],
datadir=config['datadir'], timerange=timerange, erase=bool(config.get('erase')),
datadir=config['datadir'], timerange=timerange, erase=bool(
config.get('erase')),
data_format_ohlcv=config['dataformat_ohlcv'],
data_format_trades=config['dataformat_trades'],
)
@ -536,7 +549,7 @@ def download_data_main(config: Config) -> None:
f"Historic klines not available for {exchange.name}. "
"Please use `--dl-trades` instead for this exchange "
"(will unfortunately take a long time)."
)
)
migrate_data(config, exchange)
pairs_not_available = refresh_backtest_ohlcv_data(
exchange, pairs=expanded_pairs, timeframes=config['timeframes'],

View File

@ -2114,8 +2114,7 @@ class Exchange:
def _process_trades_df(self, pair: str, timeframe: str, c_type: CandleType, ticks: List[List],
cache: bool, drop_incomplete: bool, first_required_candle_date: Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = public_trades_to_dataframe(ticks, timeframe, pair=pair, fill_missing=False,
drop_incomplete=drop_incomplete)
trades_df = public_trades_to_dataframe(ticks, pair=pair)
# keeping last candle time as last refreshed time of the pair
if ticks and cache:
idx = -2 if drop_incomplete and len(ticks) > 1 else -1
@ -2188,7 +2187,7 @@ class Exchange:
def refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes,
data_handler: Callable, # using IDataHandler ends with circular import,
data_handler: Any, # using IDataHandler ends with circular import
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
@ -2207,7 +2206,7 @@ class Exchange:
since_ms = None
results_df = {}
for pair, timeframe, candle_type in set(pair_list):
new_ticks = []
new_ticks: List = []
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ['date'])
first_candle_ms = self.needed_candle_ms(timeframe, candle_type)
# refresh, if
@ -2273,7 +2272,7 @@ class Exchange:
data_handler.trades_store(f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS])
else:
raise "no new ticks"
raise OperationalException("no new ticks")
return results_df
@ -2442,7 +2441,7 @@ class Exchange:
return trades[-1].get('timestamp')
async def _async_get_trade_history_id(self, pair: str,
until: int,
until: Optional[int],
since: Optional[int] = None,
from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = True) -> Tuple[str, List[List]]:
@ -2464,7 +2463,7 @@ class Exchange:
x = slice(None, -1) if has_overlap else slice(None)
if not until and not stop_on_from_id:
raise "stop_on_from_id must be set if until is not set"
raise OperationalException("stop_on_from_id must be set if until is not set")
if not from_id or not self._valid_trade_pagination_id(pair, from_id):
# Fetch first elements using timebased method to get an ID to paginate on
# Depending on the Exchange, this can introduce a drift at the start of the interval
@ -2602,9 +2601,9 @@ class Exchange:
new_pairs_days: int = 30,
since: Optional[int] = None,
until: Optional[int] = None,
from_id: Optional[int] = None,
from_id: Optional[str] = None,
stop_on_from_id: Optional[bool] = False
) -> bool:
) -> Tuple[str, List]:
"""
Download trade history from the exchange.