From ed7eb01d1b947d9e5d2de934af222e3c9d7d366d Mon Sep 17 00:00:00 2001 From: Matthias Date: Fri, 4 Oct 2024 06:41:59 +0200 Subject: [PATCH] chore: update data to modern typing syntax --- freqtrade/data/btanalysis.py | 24 ++++++++--------- freqtrade/data/converter/converter.py | 7 +++-- freqtrade/data/converter/orderflow.py | 5 ++-- freqtrade/data/converter/trade_converter.py | 7 +++-- freqtrade/data/dataprovider.py | 26 +++++++++---------- freqtrade/data/entryexitanalysis.py | 13 +++++----- .../data/history/datahandlers/idatahandler.py | 14 +++++----- freqtrade/data/history/history_utils.py | 26 +++++++++---------- freqtrade/data/metrics.py | 13 +++++----- 9 files changed, 65 insertions(+), 70 deletions(-) diff --git a/freqtrade/data/btanalysis.py b/freqtrade/data/btanalysis.py index 580807a76..76670d0e9 100644 --- a/freqtrade/data/btanalysis.py +++ b/freqtrade/data/btanalysis.py @@ -6,7 +6,7 @@ import logging from copy import copy from datetime import datetime, timezone from pathlib import Path -from typing import Any, Dict, List, Literal, Optional, Union +from typing import Any, Literal, Optional, Union import numpy as np import pandas as pd @@ -137,7 +137,7 @@ def get_latest_hyperopt_file( return directory / get_latest_hyperopt_filename(directory) -def load_backtest_metadata(filename: Union[Path, str]) -> Dict[str, Any]: +def load_backtest_metadata(filename: Union[Path, str]) -> dict[str, Any]: """ Read metadata dictionary from backtest results file without reading and deserializing entire file. @@ -176,7 +176,7 @@ def load_backtest_stats(filename: Union[Path, str]) -> BacktestResultType: return data -def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: Dict[str, Any]): +def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: dict[str, Any]): """ Load one strategy from multi-strategy result and merge it with results :param strategy_name: Name of the strategy contained in the result @@ -195,12 +195,12 @@ def load_and_merge_backtest_result(strategy_name: str, filename: Path, results: break -def _get_backtest_files(dirname: Path) -> List[Path]: +def _get_backtest_files(dirname: Path) -> list[Path]: # Weird glob expression here avoids including .meta.json files. return list(reversed(sorted(dirname.glob("backtest-result-*-[0-9][0-9].json")))) -def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]: +def _extract_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]: metadata = load_backtest_metadata(filename) return [ { @@ -220,14 +220,14 @@ def _extract_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]: ] -def get_backtest_result(filename: Path) -> List[BacktestHistoryEntryType]: +def get_backtest_result(filename: Path) -> list[BacktestHistoryEntryType]: """ Get backtest result read from metadata file """ return _extract_backtest_result(filename) -def get_backtest_resultlist(dirname: Path) -> List[BacktestHistoryEntryType]: +def get_backtest_resultlist(dirname: Path) -> list[BacktestHistoryEntryType]: """ Get list of backtest results read from metadata files """ @@ -249,7 +249,7 @@ def delete_backtest_result(file_abs: Path): file_abs_meta.unlink() -def update_backtest_metadata(filename: Path, strategy: str, content: Dict[str, Any]): +def update_backtest_metadata(filename: Path, strategy: str, content: dict[str, Any]): """ Updates backtest metadata file with new content. :raises: ValueError if metadata file does not exist, or strategy is not in this file. @@ -275,8 +275,8 @@ def get_backtest_market_change(filename: Path, include_ts: bool = True) -> pd.Da def find_existing_backtest_stats( - dirname: Union[Path, str], run_ids: Dict[str, str], min_backtest_date: Optional[datetime] = None -) -> Dict[str, Any]: + dirname: Union[Path, str], run_ids: dict[str, str], min_backtest_date: Optional[datetime] = None +) -> dict[str, Any]: """ Find existing backtest stats that match specified run IDs and load them. :param dirname: pathlib.Path object, or string pointing to the file. @@ -287,7 +287,7 @@ def find_existing_backtest_stats( # Copy so we can modify this dict without affecting parent scope. run_ids = copy(run_ids) dirname = Path(dirname) - results: Dict[str, Any] = { + results: dict[str, Any] = { "metadata": {}, "strategy": {}, "strategy_comparison": [], @@ -438,7 +438,7 @@ def evaluate_result_multi( return df_final[df_final["open_trades"] > max_open_trades] -def trade_list_to_dataframe(trades: Union[List[Trade], List[LocalTrade]]) -> pd.DataFrame: +def trade_list_to_dataframe(trades: Union[list[Trade], list[LocalTrade]]) -> pd.DataFrame: """ Convert list of Trade objects to pandas Dataframe :param trades: List of trade objects diff --git a/freqtrade/data/converter/converter.py b/freqtrade/data/converter/converter.py index 0475ddee2..48a07082e 100644 --- a/freqtrade/data/converter/converter.py +++ b/freqtrade/data/converter/converter.py @@ -3,7 +3,6 @@ Functions to convert data from one format to another """ import logging -from typing import Dict import numpy as np import pandas as pd @@ -158,8 +157,8 @@ def trim_dataframe( def trim_dataframes( - preprocessed: Dict[str, DataFrame], timerange, startup_candles: int -) -> Dict[str, DataFrame]: + preprocessed: dict[str, DataFrame], timerange, startup_candles: int +) -> dict[str, DataFrame]: """ Trim startup period from analyzed dataframes :param preprocessed: Dict of pair: dataframe @@ -167,7 +166,7 @@ def trim_dataframes( :param startup_candles: Startup-candles that should be removed :return: Dict of trimmed dataframes """ - processed: Dict[str, DataFrame] = {} + processed: dict[str, DataFrame] = {} for pair, df in preprocessed.items(): trimed_df = trim_dataframe(df, timerange, startup_candles=startup_candles) diff --git a/freqtrade/data/converter/orderflow.py b/freqtrade/data/converter/orderflow.py index f0cc726d2..e64caa88b 100644 --- a/freqtrade/data/converter/orderflow.py +++ b/freqtrade/data/converter/orderflow.py @@ -7,7 +7,6 @@ import time import typing from collections import OrderedDict from datetime import datetime -from typing import Tuple import numpy as np import pandas as pd @@ -62,11 +61,11 @@ def _calculate_ohlcv_candle_start_and_end(df: pd.DataFrame, timeframe: str): def populate_dataframe_with_trades( - cached_grouped_trades: OrderedDict[Tuple[datetime, datetime], pd.DataFrame], + cached_grouped_trades: OrderedDict[tuple[datetime, datetime], pd.DataFrame], config: Config, dataframe: pd.DataFrame, trades: pd.DataFrame, -) -> Tuple[pd.DataFrame, OrderedDict[Tuple[datetime, datetime], pd.DataFrame]]: +) -> tuple[pd.DataFrame, OrderedDict[tuple[datetime, datetime], pd.DataFrame]]: """ Populates a dataframe with trades :param dataframe: Dataframe to populate diff --git a/freqtrade/data/converter/trade_converter.py b/freqtrade/data/converter/trade_converter.py index 9b8fe718e..4d34fa83f 100644 --- a/freqtrade/data/converter/trade_converter.py +++ b/freqtrade/data/converter/trade_converter.py @@ -4,7 +4,6 @@ Functions to convert data from one format to another import logging from pathlib import Path -from typing import Dict, List import pandas as pd from pandas import DataFrame, to_datetime @@ -34,7 +33,7 @@ def trades_df_remove_duplicates(trades: pd.DataFrame) -> pd.DataFrame: return trades.drop_duplicates(subset=["timestamp", "id"]) -def trades_dict_to_list(trades: List[Dict]) -> TradeList: +def trades_dict_to_list(trades: list[dict]) -> TradeList: """ Convert fetch_trades result into a List (to be more memory efficient). :param trades: List of trades, as returned by ccxt.fetch_trades. @@ -91,8 +90,8 @@ def trades_to_ohlcv(trades: DataFrame, timeframe: str) -> DataFrame: def convert_trades_to_ohlcv( - pairs: List[str], - timeframes: List[str], + pairs: list[str], + timeframes: list[str], datadir: Path, timerange: TimeRange, erase: bool, diff --git a/freqtrade/data/dataprovider.py b/freqtrade/data/dataprovider.py index e40228511..491728695 100644 --- a/freqtrade/data/dataprovider.py +++ b/freqtrade/data/dataprovider.py @@ -8,7 +8,7 @@ Common Interface for bot and strategy to access data. import logging from collections import deque from datetime import datetime, timezone -from typing import Any, Dict, List, Optional, Tuple +from typing import Any, Optional from pandas import DataFrame, Timedelta, Timestamp, to_timedelta @@ -48,15 +48,15 @@ class DataProvider: self._exchange = exchange self._pairlists = pairlists self.__rpc = rpc - self.__cached_pairs: Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] = {} + self.__cached_pairs: dict[PairWithTimeframe, tuple[DataFrame, datetime]] = {} self.__slice_index: Optional[int] = None self.__slice_date: Optional[datetime] = None - self.__cached_pairs_backtesting: Dict[PairWithTimeframe, DataFrame] = {} - self.__producer_pairs_df: Dict[ - str, Dict[PairWithTimeframe, Tuple[DataFrame, datetime]] + self.__cached_pairs_backtesting: dict[PairWithTimeframe, DataFrame] = {} + self.__producer_pairs_df: dict[ + str, dict[PairWithTimeframe, tuple[DataFrame, datetime]] ] = {} - self.__producer_pairs: Dict[str, List[str]] = {} + self.__producer_pairs: dict[str, list[str]] = {} self._msg_queue: deque = deque() self._default_candle_type = self._config.get("candle_type_def", CandleType.SPOT) @@ -101,7 +101,7 @@ class DataProvider: self.__cached_pairs[pair_key] = (dataframe, datetime.now(timezone.utc)) # For multiple producers we will want to merge the pairlists instead of overwriting - def _set_producer_pairs(self, pairlist: List[str], producer_name: str = "default"): + def _set_producer_pairs(self, pairlist: list[str], producer_name: str = "default"): """ Set the pairs received to later be used. @@ -109,7 +109,7 @@ class DataProvider: """ self.__producer_pairs[producer_name] = pairlist - def get_producer_pairs(self, producer_name: str = "default") -> List[str]: + def get_producer_pairs(self, producer_name: str = "default") -> list[str]: """ Get the pairs cached from the producer @@ -177,7 +177,7 @@ class DataProvider: timeframe: str, candle_type: CandleType, producer_name: str = "default", - ) -> Tuple[bool, int]: + ) -> tuple[bool, int]: """ Append a candle to the existing external dataframe. The incoming dataframe must have at least 1 candle. @@ -258,7 +258,7 @@ class DataProvider: timeframe: Optional[str] = None, candle_type: Optional[CandleType] = None, producer_name: str = "default", - ) -> Tuple[DataFrame, datetime]: + ) -> tuple[DataFrame, datetime]: """ Get the pair data from producers. @@ -377,7 +377,7 @@ class DataProvider: logger.warning(f"No data found for ({pair}, {timeframe}, {candle_type}).") return data - def get_analyzed_dataframe(self, pair: str, timeframe: str) -> Tuple[DataFrame, datetime]: + def get_analyzed_dataframe(self, pair: str, timeframe: str) -> tuple[DataFrame, datetime]: """ Retrieve the analyzed dataframe. Returns the full dataframe in trade mode (live / dry), and the last 1000 candles (up to the time evaluated at this moment) in all other modes. @@ -408,7 +408,7 @@ class DataProvider: """ return RunMode(self._config.get("runmode", RunMode.OTHER)) - def current_whitelist(self) -> List[str]: + def current_whitelist(self) -> list[str]: """ fetch latest available whitelist. @@ -529,7 +529,7 @@ class DataProvider: ) return trades_df - def market(self, pair: str) -> Optional[Dict[str, Any]]: + def market(self, pair: str) -> Optional[dict[str, Any]]: """ Return market data for the pair :param pair: Pair to get the data for diff --git a/freqtrade/data/entryexitanalysis.py b/freqtrade/data/entryexitanalysis.py index f7ab7836f..b4b5b966a 100644 --- a/freqtrade/data/entryexitanalysis.py +++ b/freqtrade/data/entryexitanalysis.py @@ -1,6 +1,5 @@ import logging from pathlib import Path -from typing import Dict, List import joblib import pandas as pd @@ -48,14 +47,14 @@ def _load_signal_candles(backtest_dir: Path): return _load_backtest_analysis_data(backtest_dir, "signals") -def _load_exit_signal_candles(backtest_dir: Path) -> Dict[str, Dict[str, pd.DataFrame]]: +def _load_exit_signal_candles(backtest_dir: Path) -> dict[str, dict[str, pd.DataFrame]]: return _load_backtest_analysis_data(backtest_dir, "exited") def _process_candles_and_indicators( pairlist, strategy_name, trades, signal_candles, date_col: str = "open_date" ): - analysed_trades_dict: Dict[str, Dict] = {strategy_name: {}} + analysed_trades_dict: dict[str, dict] = {strategy_name: {}} try: logger.info(f"Processing {strategy_name} : {len(pairlist)} pairs") @@ -261,8 +260,8 @@ def prepare_results( def print_results( res_df: pd.DataFrame, exit_df: pd.DataFrame, - analysis_groups: List[str], - indicator_list: List[str], + analysis_groups: list[str], + indicator_list: list[str], entry_only: bool, exit_only: bool, csv_path: Path, @@ -307,7 +306,7 @@ def print_results( def _merge_dfs( entry_df: pd.DataFrame, exit_df: pd.DataFrame, - available_inds: List[str], + available_inds: list[str], entry_only: bool, exit_only: bool, ): @@ -438,7 +437,7 @@ def _generate_dfs( pairlist: list, enter_reason_list: list, exit_reason_list: list, - signal_candles: Dict, + signal_candles: dict, strategy_name: str, timerange: TimeRange, trades: pd.DataFrame, diff --git a/freqtrade/data/history/datahandlers/idatahandler.py b/freqtrade/data/history/datahandlers/idatahandler.py index db1660dc8..940c4d71e 100644 --- a/freqtrade/data/history/datahandlers/idatahandler.py +++ b/freqtrade/data/history/datahandlers/idatahandler.py @@ -10,7 +10,7 @@ from abc import ABC, abstractmethod from copy import deepcopy from datetime import datetime, timezone from pathlib import Path -from typing import List, Optional, Tuple, Type +from typing import Optional from pandas import DataFrame, to_datetime @@ -71,7 +71,7 @@ class IDataHandler(ABC): ] @classmethod - def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> List[str]: + def ohlcv_get_pairs(cls, datadir: Path, timeframe: str, candle_type: CandleType) -> list[str]: """ Returns a list of all pairs with ohlcv data available in this datadir for the specified timeframe @@ -107,7 +107,7 @@ class IDataHandler(ABC): def ohlcv_data_min_max( self, pair: str, timeframe: str, candle_type: CandleType - ) -> Tuple[datetime, datetime, int]: + ) -> tuple[datetime, datetime, int]: """ Returns the min and max timestamp for the given pair and timeframe. :param pair: Pair to get min/max for @@ -168,7 +168,7 @@ class IDataHandler(ABC): """ @classmethod - def trades_get_available_data(cls, datadir: Path, trading_mode: TradingMode) -> List[str]: + def trades_get_available_data(cls, datadir: Path, trading_mode: TradingMode) -> list[str]: """ Returns a list of all pairs with ohlcv data available in this datadir :param datadir: Directory to search for ohlcv files @@ -191,7 +191,7 @@ class IDataHandler(ABC): self, pair: str, trading_mode: TradingMode, - ) -> Tuple[datetime, datetime, int]: + ) -> tuple[datetime, datetime, int]: """ Returns the min and max timestamp for the given pair's trades data. :param pair: Pair to get min/max for @@ -212,7 +212,7 @@ class IDataHandler(ABC): ) @classmethod - def trades_get_pairs(cls, datadir: Path) -> List[str]: + def trades_get_pairs(cls, datadir: Path) -> list[str]: """ Returns a list of all pairs for which trade data is available in this :param datadir: Directory to search for ohlcv files @@ -532,7 +532,7 @@ class IDataHandler(ABC): Path(old_name).rename(new_name) -def get_datahandlerclass(datatype: str) -> Type[IDataHandler]: +def get_datahandlerclass(datatype: str) -> type[IDataHandler]: """ Get datahandler class. Could be done using Resolvers, but since this may be called often and resolvers diff --git a/freqtrade/data/history/history_utils.py b/freqtrade/data/history/history_utils.py index 8a65db26a..274202e1b 100644 --- a/freqtrade/data/history/history_utils.py +++ b/freqtrade/data/history/history_utils.py @@ -2,7 +2,7 @@ import logging import operator from datetime import datetime, timedelta from pathlib import Path -from typing import Dict, List, Optional, Tuple +from typing import Optional from pandas import DataFrame, concat @@ -77,7 +77,7 @@ def load_pair_history( def load_data( datadir: Path, timeframe: str, - pairs: List[str], + pairs: list[str], *, timerange: Optional[TimeRange] = None, fill_up_missing: bool = True, @@ -86,7 +86,7 @@ def load_data( data_format: str = "feather", candle_type: CandleType = CandleType.SPOT, user_futures_funding_rate: Optional[int] = None, -) -> Dict[str, DataFrame]: +) -> dict[str, DataFrame]: """ Load ohlcv history data for a list of pairs. @@ -101,7 +101,7 @@ def load_data( :param candle_type: Any of the enum CandleType (must match trading mode!) :return: dict(:) """ - result: Dict[str, DataFrame] = {} + result: dict[str, DataFrame] = {} if startup_candles > 0 and timerange: logger.info(f"Using indicator startup period: {startup_candles} ...") @@ -135,7 +135,7 @@ def refresh_data( *, datadir: Path, timeframe: str, - pairs: List[str], + pairs: list[str], exchange: Exchange, data_format: Optional[str] = None, timerange: Optional[TimeRange] = None, @@ -172,7 +172,7 @@ def _load_cached_data_for_updating( data_handler: IDataHandler, candle_type: CandleType, prepend: bool = False, -) -> Tuple[DataFrame, Optional[int], Optional[int]]: +) -> tuple[DataFrame, Optional[int], Optional[int]]: """ Load cached data to download more data. If timerange is passed in, checks whether data from an before the stored data will be @@ -318,8 +318,8 @@ def _download_pair_history( def refresh_backtest_ohlcv_data( exchange: Exchange, - pairs: List[str], - timeframes: List[str], + pairs: list[str], + timeframes: list[str], datadir: Path, trading_mode: str, timerange: Optional[TimeRange] = None, @@ -327,7 +327,7 @@ def refresh_backtest_ohlcv_data( erase: bool = False, data_format: Optional[str] = None, prepend: bool = False, -) -> List[str]: +) -> list[str]: """ Refresh stored ohlcv data for backtesting and hyperopt operations. Used by freqtrade download-data subcommand. @@ -489,14 +489,14 @@ def _download_trades_history( def refresh_backtest_trades_data( exchange: Exchange, - pairs: List[str], + pairs: list[str], datadir: Path, timerange: TimeRange, trading_mode: TradingMode, new_pairs_days: int = 30, erase: bool = False, data_format: str = "feather", -) -> List[str]: +) -> list[str]: """ Refresh stored trades data for backtesting and hyperopt operations. Used by freqtrade download-data subcommand. @@ -531,7 +531,7 @@ def refresh_backtest_trades_data( return pairs_not_available -def get_timerange(data: Dict[str, DataFrame]) -> Tuple[datetime, datetime]: +def get_timerange(data: dict[str, DataFrame]) -> tuple[datetime, datetime]: """ Get the maximum common timerange for the given backtest data. @@ -588,7 +588,7 @@ def download_data_main(config: Config) -> None: # Remove stake-currency to skip checks which are not relevant for datadownload config["stake_currency"] = "" - pairs_not_available: List[str] = [] + pairs_not_available: list[str] = [] # Init exchange from freqtrade.resolvers.exchange_resolver import ExchangeResolver diff --git a/freqtrade/data/metrics.py b/freqtrade/data/metrics.py index 2e4673fb5..77f29080c 100644 --- a/freqtrade/data/metrics.py +++ b/freqtrade/data/metrics.py @@ -2,7 +2,6 @@ import logging import math from dataclasses import dataclass from datetime import datetime -from typing import Dict, Tuple import numpy as np import pandas as pd @@ -11,7 +10,7 @@ import pandas as pd logger = logging.getLogger(__name__) -def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close") -> float: +def calculate_market_change(data: dict[str, pd.DataFrame], column: str = "close") -> float: """ Calculate market change based on "column". Calculation is done by taking the first non-null and the last non-null element of each column @@ -32,7 +31,7 @@ def calculate_market_change(data: Dict[str, pd.DataFrame], column: str = "close" def combine_dataframes_by_column( - data: Dict[str, pd.DataFrame], column: str = "close" + data: dict[str, pd.DataFrame], column: str = "close" ) -> pd.DataFrame: """ Combine multiple dataframes "column" @@ -50,7 +49,7 @@ def combine_dataframes_by_column( def combined_dataframes_with_rel_mean( - data: Dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, column: str = "close" + data: dict[str, pd.DataFrame], fromdt: datetime, todt: datetime, column: str = "close" ) -> pd.DataFrame: """ Combine multiple dataframes "column" @@ -70,7 +69,7 @@ def combined_dataframes_with_rel_mean( def combine_dataframes_with_mean( - data: Dict[str, pd.DataFrame], column: str = "close" + data: dict[str, pd.DataFrame], column: str = "close" ) -> pd.DataFrame: """ Combine multiple dataframes "column" @@ -222,7 +221,7 @@ def calculate_max_drawdown( ) -def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> Tuple[float, float]: +def calculate_csum(trades: pd.DataFrame, starting_balance: float = 0) -> tuple[float, float]: """ Calculate min/max cumsum of trades, to show if the wallet/stake amount ratio is sane :param trades: DataFrame containing trades (requires columns close_date and profit_percent) @@ -255,7 +254,7 @@ def calculate_cagr(days_passed: int, starting_balance: float, final_balance: flo return (final_balance / starting_balance) ** (1 / (days_passed / 365)) - 1 -def calculate_expectancy(trades: pd.DataFrame) -> Tuple[float, float]: +def calculate_expectancy(trades: pd.DataFrame) -> tuple[float, float]: """ Calculate expectancy :param trades: DataFrame containing trades (requires columns close_date and profit_abs)