chore: update data to modern typing syntax

This commit is contained in:
Matthias 2024-10-04 06:41:59 +02:00
parent 6601127693
commit ed7eb01d1b
9 changed files with 65 additions and 70 deletions

View File

@ -6,7 +6,7 @@ import logging
from copy import copy
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Literal, Optional, Union
from typing import Any, Literal, Optional, Union
import numpy as np
import pandas as pd
@ -137,7 +137,7 @@ def get_latest_hyperopt_file(
return directory / get_latest_hyperopt_filename(directory)
def load_backtest_metadata(filename: Union[Path, str]) -> Dict[str, Any]:
def load_backtest_metadata(filename: Union[Path, str]) -> dict[str, Any]:
"""
Read metadata dictionary from backtest results file without reading and deserializing entire
file.
@ -176,7 +176,7 @@ def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType:
return data
def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: Dict[str, Any]):
def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: dict[str, Any]):
"""
Load one strategy from multi-strategy result and merge it with results
:param strategy_name: Name of the strategy contained in the result
@ -195,12 +195,12 @@ def load_and_merge_backtest_result(strategy_name: str, filename: Path, results:
break
def _get_backtest_files(dirname: Path) -> List[Path]:
def _get_backtest_files(dirname: Path) -> list[Path]:
# Weird glob expression here avoids including .meta.json files.
return list(reversed(sorted(dirname.glob("backtest-result-*-[0-9][0-9].json"))))
def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]:
def _extract_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]:
metadata = load_backtest_metadata(filename)
return [
{
@ -220,14 +220,14 @@ def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]:
]
def get_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]:
def get_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]:
"""
Get backtest result read from metadata file
"""
return _extract_backtest_result(filename)
def get_backtest_resultlist(dirname: Path) -> List[BacktestHistoryEntryType]:
def get_backtest_resultlist(dirname: Path) -> list[BacktestHistoryEntryType]:
"""
Get list of backtest results read from metadata files
"""
@ -249,7 +249,7 @@ def delete_backtest_result(file_abs: Path):
file_abs_meta.unlink()
def update_backtest_metadata(filename: Path, strategy: str, content: Dict[str, Any]):
def update_backtest_metadata(filename: Path, strategy: str, content: dict[str, Any]):
"""
Updates backtest metadata file with new content.
:raises: ValueError if metadata file does not exist, or strategy is not in this file.
@ -275,8 +275,8 @@ def get_backtest_market_change(filename: Path, include_ts: bool = True) -> pd.Da
def find_existing_backtest_stats(
dirname: Union[Path, str], run_ids: Dict[str, str], min_backtest_date: Optional[datetime] = None
) -> Dict[str, Any]:
dirname: Union[Path, str], run_ids: dict[str, str], min_backtest_date: Optional[datetime] = None
) -> dict[str, Any]:
"""
Find existing backtest stats that match specified run IDs and load them.
:param dirname: pathlib.Path object, or string pointing to the file.
@ -287,7 +287,7 @@ def find_existing_backtest_stats(
# Copy so we can modify this dict without affecting parent scope.
run_ids = copy(run_ids)
dirname = Path(dirname)
results: Dict[str, Any] = {
results: dict[str, Any] = {
"metadata": {},
"strategy": {},
"strategy_comparison": [],
@ -438,7 +438,7 @@ def evaluate_result_multi(
return df_final[df_final["open_trades"] > max_open_trades]
def trade_list_to_dataframe(trades: Union[List[Trade], List[LocalTrade]]) -> pd.DataFrame:
def trade_list_to_dataframe(trades: Union[list[Trade], list[LocalTrade]]) -> pd.DataFrame:
"""
Convert list of Trade objects to pandas Dataframe
:param trades: List of trade objects

View File

@ -3,7 +3,6 @@ Functions to convert data from one format to another
"""
import logging
from typing import Dict
import numpy as np
import pandas as pd
@ -158,8 +157,8 @@ def trim_dataframe(
def trim_dataframes(
preprocessed: Dict[str, DataFrame], timerange, startup_candles: int
) -> Dict[str, DataFrame]:
preprocessed: dict[str, DataFrame], timerange, startup_candles: int
) -> dict[str, DataFrame]:
"""
Trim startup period from analyzed dataframes
:param preprocessed: Dict of pair: dataframe
@ -167,7 +166,7 @@ def trim_dataframes(
:param startup_candles: Startup-candles that should be removed
:return: Dict of trimmed dataframes
"""
processed: Dict[str, DataFrame] = {}
processed: dict[str, DataFrame] = {}
for pair, df in preprocessed.items():
trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles)

View File

@ -7,7 +7,6 @@ import time
import typing
from collections import OrderedDict
from datetime import datetime
from typing import Tuple
import numpy as np
import pandas as pd
@ -62,11 +61,11 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str):
def populate_dataframe_with_trades(
cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame],
cached_grouped_trades: OrderedDict[tuple[datetime, datetime], pd.DataFrame],
config: Config,
dataframe: pd.DataFrame,
trades: pd.DataFrame,
) -> Tuple[pd.DataFrame, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]]:
) -> tuple[pd.DataFrame, OrderedDict[tuple[datetime, datetime], pd.DataFrame]]:
"""
Populates a dataframe with trades
:param dataframe: Dataframe to populate

View File

@ -4,7 +4,6 @@ Functions to convert data from one format to another
import logging
from pathlib import Path
from typing import Dict, List
import pandas as pd
from pandas import DataFrame, to_datetime
@ -34,7 +33,7 @@ def trades_df_remove_duplicates(trades: pd.DataFrame) -> pd.DataFrame:
return trades.drop_duplicates(subset=["timestamp", "id"])
def trades_dict_to_list(trades: List[Dict]) -> TradeList:
def trades_dict_to_list(trades: list[dict]) -> TradeList:
"""
Convert fetch_trades result into a List (to be more memory efficient).
:param trades: List of trades, as returned by ccxt.fetch_trades.
@ -91,8 +90,8 @@ def trades_to_ohlcv(trades: DataFrame, timeframe: str) -> DataFrame:
def convert_trades_to_ohlcv(
pairs: List[str],
timeframes: List[str],
pairs: list[str],
timeframes: list[str],
datadir: Path,
timerange: TimeRange,
erase: bool,

View File

@ -8,7 +8,7 @@ Common Interface for bot and strategy to access data.
import logging
from collections import deque
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Tuple
from typing import Any, Optional
from pandas import DataFrame, Timedelta, Timestamp, to_timedelta
@ -48,15 +48,15 @@ class DataProvider:
self._exchange = exchange
self._pairlists = pairlists
self.__rpc = rpc
self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {}
self.__cached_pairs: dict[PairWithTimeframe, tuple[DataFrame, datetime]] = {}
self.__slice_index: Optional[int] = None
self.__slice_date: Optional[datetime] = None
self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {}
self.__producer_pairs_df: Dict[
str, Dict[PairWithTimeframe, Tuple[DataFrame, datetime]]
self.__cached_pairs_backtesting: dict[PairWithTimeframe, DataFrame] = {}
self.__producer_pairs_df: dict[
str, dict[PairWithTimeframe, tuple[DataFrame, datetime]]
] = {}
self.__producer_pairs: Dict[str, List[str]] = {}
self.__producer_pairs: dict[str, list[str]] = {}
self._msg_queue: deque = deque()
self._default_candle_type = self._config.get("candle_type_def", CandleType.SPOT)
@ -101,7 +101,7 @@ class DataProvider:
self.__cached_pairs[pair_key] = (dataframe, datetime.now(timezone.utc))
# For multiple producers we will want to merge the pairlists instead of overwriting
def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"):
def _set_producer_pairs(self, pairlist: list[str], producer_name: str = "default"):
"""
Set the pairs received to later be used.
@ -109,7 +109,7 @@ class DataProvider:
"""
self.__producer_pairs[producer_name] = pairlist
def get_producer_pairs(self, producer_name: str = "default") -> List[str]:
def get_producer_pairs(self, producer_name: str = "default") -> list[str]:
"""
Get the pairs cached from the producer
@ -177,7 +177,7 @@ class DataProvider:
timeframe: str,
candle_type: CandleType,
producer_name: str = "default",
) -> Tuple[bool, int]:
) -> tuple[bool, int]:
"""
Append a candle to the existing external dataframe. The incoming dataframe
must have at least 1 candle.
@ -258,7 +258,7 @@ class DataProvider:
timeframe: Optional[str] = None,
candle_type: Optional[CandleType] = None,
producer_name: str = "default",
) -> Tuple[DataFrame, datetime]:
) -> tuple[DataFrame, datetime]:
"""
Get the pair data from producers.
@ -377,7 +377,7 @@ class DataProvider:
logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).")
return data
def get_analyzed_dataframe(self, pair: str, timeframe: str) -> Tuple[DataFrame, datetime]:
def get_analyzed_dataframe(self, pair: str, timeframe: str) -> tuple[DataFrame, datetime]:
"""
Retrieve the analyzed dataframe. Returns the full dataframe in trade mode (live / dry),
and the last 1000 candles (up to the time evaluated at this moment) in all other modes.
@ -408,7 +408,7 @@ class DataProvider:
"""
return RunMode(self._config.get("runmode", RunMode.OTHER))
def current_whitelist(self) -> List[str]:
def current_whitelist(self) -> list[str]:
"""
fetch latest available whitelist.
@ -529,7 +529,7 @@ class DataProvider:
)
return trades_df
def market(self, pair: str) -> Optional[Dict[str, Any]]:
def market(self, pair: str) -> Optional[dict[str, Any]]:
"""
Return market data for the pair
:param pair: Pair to get the data for

View File

@ -1,6 +1,5 @@
import logging
from pathlib import Path
from typing import Dict, List
import joblib
import pandas as pd
@ -48,14 +47,14 @@ def _load_signal_candles(backtest_dir: Path):
return _load_backtest_analysis_data(backtest_dir, "signals")
def _load_exit_signal_candles(backtest_dir: Path) -> Dict[str, Dict[str, pd.DataFrame]]:
def _load_exit_signal_candles(backtest_dir: Path) -> dict[str, dict[str, pd.DataFrame]]:
return _load_backtest_analysis_data(backtest_dir, "exited")
def _process_candles_and_indicators(
pairlist, strategy_name, trades, signal_candles, date_col: str = "open_date"
):
analysed_trades_dict: Dict[str, Dict] = {strategy_name: {}}
analysed_trades_dict: dict[str, dict] = {strategy_name: {}}
try:
logger.info(f"Processing {strategy_name} : {len(pairlist)} pairs")
@ -261,8 +260,8 @@ def prepare_results(
def print_results(
res_df: pd.DataFrame,
exit_df: pd.DataFrame,
analysis_groups: List[str],
indicator_list: List[str],
analysis_groups: list[str],
indicator_list: list[str],
entry_only: bool,
exit_only: bool,
csv_path: Path,
@ -307,7 +306,7 @@ def print_results(
def _merge_dfs(
entry_df: pd.DataFrame,
exit_df: pd.DataFrame,
available_inds: List[str],
available_inds: list[str],
entry_only: bool,
exit_only: bool,
):
@ -438,7 +437,7 @@ def _generate_dfs(
pairlist: list,
enter_reason_list: list,
exit_reason_list: list,
signal_candles: Dict,
signal_candles: dict,
strategy_name: str,
timerange: TimeRange,
trades: pd.DataFrame,

View File

@ -10,7 +10,7 @@ from abc import ABC, abstractmethod
from copy import deepcopy
from datetime import datetime, timezone
from pathlib import Path
from typing import List, Optional, Tuple, Type
from typing import Optional
from pandas import DataFrame, to_datetime
@ -71,7 +71,7 @@ class IDataHandler(ABC):
]
@classmethod
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> List[str]:
def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> list[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
for the specified timeframe
@ -107,7 +107,7 @@ class IDataHandler(ABC):
def ohlcv_data_min_max(
self, pair: str, timeframe: str, candle_type: CandleType
) -> Tuple[datetime, datetime, int]:
) -> tuple[datetime, datetime, int]:
"""
Returns the min and max timestamp for the given pair and timeframe.
:param pair: Pair to get min/max for
@ -168,7 +168,7 @@ class IDataHandler(ABC):
"""
@classmethod
def trades_get_available_data(cls, datadir: Path, trading_mode: TradingMode) -> List[str]:
def trades_get_available_data(cls, datadir: Path, trading_mode: TradingMode) -> list[str]:
"""
Returns a list of all pairs with ohlcv data available in this datadir
:param datadir: Directory to search for ohlcv files
@ -191,7 +191,7 @@ class IDataHandler(ABC):
self,
pair: str,
trading_mode: TradingMode,
) -> Tuple[datetime, datetime, int]:
) -> tuple[datetime, datetime, int]:
"""
Returns the min and max timestamp for the given pair's trades data.
:param pair: Pair to get min/max for
@ -212,7 +212,7 @@ class IDataHandler(ABC):
)
@classmethod
def trades_get_pairs(cls, datadir: Path) -> List[str]:
def trades_get_pairs(cls, datadir: Path) -> list[str]:
"""
Returns a list of all pairs for which trade data is available in this
:param datadir: Directory to search for ohlcv files
@ -532,7 +532,7 @@ class IDataHandler(ABC):
Path(old_name).rename(new_name)
def get_datahandlerclass(datatype: str) -> Type[IDataHandler]:
def get_datahandlerclass(datatype: str) -> type[IDataHandler]:
"""
Get datahandler class.
Could be done using Resolvers, but since this may be called often and resolvers

View File

@ -2,7 +2,7 @@ import logging
import operator
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Optional
from pandas import DataFrame, concat
@ -77,7 +77,7 @@ def load_pair_history(
def load_data(
datadir: Path,
timeframe: str,
pairs: List[str],
pairs: list[str],
*,
timerange: Optional[TimeRange] = None,
fill_up_missing: bool = True,
@ -86,7 +86,7 @@ def load_data(
data_format: str = "feather",
candle_type: CandleType = CandleType.SPOT,
user_futures_funding_rate: Optional[int] = None,
) -> Dict[str, DataFrame]:
) -> dict[str, DataFrame]:
"""
Load ohlcv history data for a list of pairs.
@ -101,7 +101,7 @@ def load_data(
:param candle_type: Any of the enum CandleType (must match trading mode!)
:return: dict(<pair>:<Dataframe>)
"""
result: Dict[str, DataFrame] = {}
result: dict[str, DataFrame] = {}
if startup_candles > 0 and timerange:
logger.info(f"Using indicator startup period: {startup_candles} ...")
@ -135,7 +135,7 @@ def refresh_data(
*,
datadir: Path,
timeframe: str,
pairs: List[str],
pairs: list[str],
exchange: Exchange,
data_format: Optional[str] = None,
timerange: Optional[TimeRange] = None,
@ -172,7 +172,7 @@ def _load_cached_data_for_updating(
data_handler: IDataHandler,
candle_type: CandleType,
prepend: bool = False,
) -> Tuple[DataFrame, Optional[int], Optional[int]]:
) -> tuple[DataFrame, Optional[int], Optional[int]]:
"""
Load cached data to download more data.
If timerange is passed in, checks whether data from an before the stored data will be
@ -318,8 +318,8 @@ def _download_pair_history(
def refresh_backtest_ohlcv_data(
exchange: Exchange,
pairs: List[str],
timeframes: List[str],
pairs: list[str],
timeframes: list[str],
datadir: Path,
trading_mode: str,
timerange: Optional[TimeRange] = None,
@ -327,7 +327,7 @@ def refresh_backtest_ohlcv_data(
erase: bool = False,
data_format: Optional[str] = None,
prepend: bool = False,
) -> List[str]:
) -> list[str]:
"""
Refresh stored ohlcv data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand.
@ -489,14 +489,14 @@ def _download_trades_history(
def refresh_backtest_trades_data(
exchange: Exchange,
pairs: List[str],
pairs: list[str],
datadir: Path,
timerange: TimeRange,
trading_mode: TradingMode,
new_pairs_days: int = 30,
erase: bool = False,
data_format: str = "feather",
) -> List[str]:
) -> list[str]:
"""
Refresh stored trades data for backtesting and hyperopt operations.
Used by freqtrade download-data subcommand.
@ -531,7 +531,7 @@ def refresh_backtest_trades_data(
return pairs_not_available
def get_timerange(data: Dict[str, DataFrame]) -> Tuple[datetime, datetime]:
def get_timerange(data: dict[str, DataFrame]) -> tuple[datetime, datetime]:
"""
Get the maximum common timerange for the given backtest data.
@ -588,7 +588,7 @@ def download_data_main(config: Config) -> None:
# Remove stake-currency to skip checks which are not relevant for datadownload
config["stake_currency"] = ""
pairs_not_available: List[str] = []
pairs_not_available: list[str] = []
# Init exchange
from freqtrade.resolvers.exchange_resolver import ExchangeResolver

View File

@ -2,7 +2,6 @@ import logging
import math
from dataclasses import dataclass
from datetime import datetime
from typing import Dict, Tuple
import numpy as np
import pandas as pd
@ -11,7 +10,7 @@ import pandas as pd
logger = logging.getLogger(__name__)
def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close") -> float:
def calculate_market_change(data: dict[str, pd.DataFrame], column: str = "close") -> float:
"""
Calculate market change based on "column".
Calculation is done by taking the first non-null and the last non-null element of each column
@ -32,7 +31,7 @@ def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close"
def combine_dataframes_by_column(
data: Dict[str, pd.DataFrame], column: str = "close"
data: dict[str, pd.DataFrame], column: str = "close"
) -> pd.DataFrame:
"""
Combine multiple dataframes "column"
@ -50,7 +49,7 @@ def combine_dataframes_by_column(
def combined_dataframes_with_rel_mean(
data: Dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, column: str = "close"
data: dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, column: str = "close"
) -> pd.DataFrame:
"""
Combine multiple dataframes "column"
@ -70,7 +69,7 @@ def combined_dataframes_with_rel_mean(
def combine_dataframes_with_mean(
data: Dict[str, pd.DataFrame], column: str = "close"
data: dict[str, pd.DataFrame], column: str = "close"
) -> pd.DataFrame:
"""
Combine multiple dataframes "column"
@ -222,7 +221,7 @@ def calculate_max_drawdown(
)
def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> Tuple[float, float]:
def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> tuple[float, float]:
"""
Calculate min/max cumsum of trades, to show if the wallet/stake amount ratio is sane
:param trades: DataFrame containing trades (requires columns close_date and profit_percent)
@ -255,7 +254,7 @@ def calculate_cagr(days_passed: int, starting_balance: float, final_balance: flo
return (final_balance / starting_balance) ** (1 / (days_passed / 365)) - 1
def calculate_expectancy(trades: pd.DataFrame) -> Tuple[float, float]:
def calculate_expectancy(trades: pd.DataFrame) -> tuple[float, float]:
"""
Calculate expectancy
:param trades: DataFrame containing trades (requires columns close_date and profit_abs)