ruff format: util

This commit is contained in:
Matthias 2024-05-12 16:56:05 +02:00
parent 5f64cc8e76
commit 7ea5e40919
10 changed files with 73 additions and 63 deletions

View File

@ -8,6 +8,7 @@ class LoggingMixin:
Logging Mixin
Shows similar messages only once every `refresh_period`.
"""
# Disable output completely
show_output = True
@ -27,6 +28,7 @@ class LoggingMixin:
:param logmethod: Function that'll be called. Most likely `logger.info`.
:return: None.
"""
@cached(cache=self._log_cache)
def _log_once(message: str):
logmethod(message)

View File

@ -19,21 +19,21 @@ from freqtrade.util.template_renderer import render_template, render_template_wi
__all__ = [
'dt_floor_day',
'dt_from_ts',
'dt_humanize_delta',
'dt_now',
'dt_ts',
'dt_ts_def',
'dt_ts_none',
'dt_utc',
'format_date',
'format_ms_time',
'FtPrecise',
'PeriodicCache',
'shorten_date',
'decimals_per_coin',
'round_value',
'fmt_coin',
'MeasureTime',
"dt_floor_day",
"dt_from_ts",
"dt_humanize_delta",
"dt_now",
"dt_ts",
"dt_ts_def",
"dt_ts_none",
"dt_utc",
"format_date",
"format_ms_time",
"FtPrecise",
"PeriodicCache",
"shorten_date",
"decimals_per_coin",
"round_value",
"fmt_coin",
"MeasureTime",
]

View File

@ -13,8 +13,15 @@ def dt_now() -> datetime:
return datetime.now(timezone.utc)
def dt_utc(year: int, month: int, day: int, hour: int = 0, minute: int = 0, second: int = 0,
microsecond: int = 0) -> datetime:
def dt_utc(
year: int,
month: int,
day: int,
hour: int = 0,
minute: int = 0,
second: int = 0,
microsecond: int = 0,
) -> datetime:
"""Return a datetime in UTC."""
return datetime(year, month, day, hour, minute, second, microsecond, tzinfo=timezone.utc)
@ -69,11 +76,11 @@ def shorten_date(_date: str) -> str:
"""
Trim the date so it fits on small screens
"""
new_date = re.sub('seconds?', 'sec', _date)
new_date = re.sub('minutes?', 'min', new_date)
new_date = re.sub('hours?', 'h', new_date)
new_date = re.sub('days?', 'd', new_date)
new_date = re.sub('^an?', '1', new_date)
new_date = re.sub("seconds?", "sec", _date)
new_date = re.sub("minutes?", "min", new_date)
new_date = re.sub("hours?", "h", new_date)
new_date = re.sub("days?", "d", new_date)
new_date = re.sub("^an?", "1", new_date)
return new_date
@ -92,7 +99,7 @@ def format_date(date: Optional[datetime]) -> str:
"""
if date:
return date.strftime(DATETIME_PRINT_FORMAT)
return ''
return ""
def format_ms_time(date: Union[int, float]) -> str:
@ -100,4 +107,4 @@ def format_ms_time(date: Union[int, float]) -> str:
convert MS date to readable format.
: epoch-string in ms
"""
return dt_from_ts(date).strftime('%Y-%m-%dT%H:%M:%S')
return dt_from_ts(date).strftime("%Y-%m-%dT%H:%M:%S")

View File

@ -16,7 +16,7 @@ def strip_trailing_zeros(value: str) -> str:
:param value: Value to be stripped
:return: Stripped value
"""
return value.rstrip('0').rstrip('.')
return value.rstrip("0").rstrip(".")
def round_value(value: float, decimals: int, keep_trailing_zeros=False) -> str:
@ -33,8 +33,7 @@ def round_value(value: float, decimals: int, keep_trailing_zeros=False) -> str:
return val
def fmt_coin(
value: float, coin: str, show_coin_name=True, keep_trailing_zeros=False) -> str:
def fmt_coin(value: float, coin: str, show_coin_name=True, keep_trailing_zeros=False) -> str:
"""
Format price value for this coin
:param value: Value to be printed

View File

@ -2,6 +2,7 @@
Slim wrapper around ccxt's Precise (string math)
To have imports from freqtrade - and support float initializers
"""
from ccxt import Precise

View File

@ -12,8 +12,10 @@ class MeasureTime:
"""
Measure the time of a block of code and call a callback if the time limit is exceeded.
"""
def __init__(
self, callback: Callable[[float, float], None], time_limit: float, ttl: int = 3600 * 4):
self, callback: Callable[[float, float], None], time_limit: float, ttl: int = 3600 * 4
):
"""
:param callback: The callback to call if the time limit is exceeded.
This callback will be called once every "ttl" seconds,
@ -32,7 +34,7 @@ class MeasureTime:
def __exit__(self, *args):
end = time.time()
if self.__cache.get('value'):
if self.__cache.get("value"):
return
duration = end - self._start
@ -40,4 +42,4 @@ class MeasureTime:
return
self._callback(duration, self._time_limit)
self.__cache['value'] = True
self.__cache["value"] = True

View File

@ -14,27 +14,28 @@ logger = logging.getLogger(__name__)
def migrate_binance_futures_names(config: Config):
if (
not (config.get('trading_mode', TradingMode.SPOT) == TradingMode.FUTURES
and config['exchange']['name'] == 'binance')
if not (
config.get("trading_mode", TradingMode.SPOT) == TradingMode.FUTURES
and config["exchange"]["name"] == "binance"
):
# only act on new futures
return
import ccxt
if version.parse("2.6.26") > version.parse(ccxt.__version__):
raise OperationalException(
"Please follow the update instructions in the docs "
f"({DOCS_LINK}/updating/) to install a compatible ccxt version.")
f"({DOCS_LINK}/updating/) to install a compatible ccxt version."
)
_migrate_binance_futures_db(config)
migrate_binance_futures_data(config)
def _migrate_binance_futures_db(config: Config):
logger.warning('Migrating binance futures pairs in database.')
trades = Trade.get_trades([Trade.exchange == 'binance', Trade.trading_mode == 'FUTURES']).all()
logger.warning("Migrating binance futures pairs in database.")
trades = Trade.get_trades([Trade.exchange == "binance", Trade.trading_mode == "FUTURES"]).all()
for trade in trades:
if ':' in trade.pair:
if ":" in trade.pair:
# already migrated
continue
new_pair = f"{trade.pair}:{trade.stake_currency}"
@ -45,34 +46,33 @@ def _migrate_binance_futures_db(config: Config):
# Should symbol be migrated too?
# order.symbol = new_pair
Trade.commit()
pls = PairLock.session.scalars(select(PairLock).filter(PairLock.pair.notlike('%:%'))).all()
pls = PairLock.session.scalars(select(PairLock).filter(PairLock.pair.notlike("%:%"))).all()
for pl in pls:
pl.pair = f"{pl.pair}:{config['stake_currency']}"
# print(pls)
# pls.update({'pair': concat(PairLock.pair,':USDT')})
Trade.commit()
logger.warning('Done migrating binance futures pairs in database.')
logger.warning("Done migrating binance futures pairs in database.")
def migrate_binance_futures_data(config: Config):
if (
not (config.get('trading_mode', TradingMode.SPOT) == TradingMode.FUTURES
and config['exchange']['name'] == 'binance')
if not (
config.get("trading_mode", TradingMode.SPOT) == TradingMode.FUTURES
and config["exchange"]["name"] == "binance"
):
# only act on new futures
return
from freqtrade.data.history import get_datahandler
dhc = get_datahandler(config['datadir'], config['dataformat_ohlcv'])
dhc = get_datahandler(config["datadir"], config["dataformat_ohlcv"])
paircombs = dhc.ohlcv_get_available_data(
config['datadir'],
config.get('trading_mode', TradingMode.SPOT)
)
config["datadir"], config.get("trading_mode", TradingMode.SPOT)
)
for pair, timeframe, candle_type in paircombs:
if ':' in pair:
if ":" in pair:
# already migrated
continue
new_pair = f"{pair}:{config['stake_currency']}"

View File

@ -11,17 +11,16 @@ logger = logging.getLogger(__name__)
def migrate_funding_fee_timeframe(config: Config, exchange: Optional[Exchange]):
if (
config.get('trading_mode', TradingMode.SPOT) != TradingMode.FUTURES
):
if config.get("trading_mode", TradingMode.SPOT) != TradingMode.FUTURES:
# only act on futures
return
if not exchange:
from freqtrade.resolvers import ExchangeResolver
exchange = ExchangeResolver.load_exchange(config, validate=False)
ff_timeframe = exchange.get_option('funding_fee_timeframe')
ff_timeframe = exchange.get_option("funding_fee_timeframe")
dhc = get_datahandler(config['datadir'], config['dataformat_ohlcv'])
dhc = get_datahandler(config["datadir"], config["dataformat_ohlcv"])
dhc.fix_funding_fee_timeframe(ff_timeframe)

View File

@ -12,7 +12,7 @@ class PeriodicCache(TTLCache):
def __init__(self, maxsize, ttl, getsizeof=None):
def local_timer():
ts = datetime.now(timezone.utc).timestamp()
offset = (ts % ttl)
offset = ts % ttl
return ts - offset
# Init with smlight offset

View File

@ -2,28 +2,28 @@
Jinja2 rendering utils, used to generate new strategy and configurations.
"""
from typing import Dict, Optional
def render_template(templatefile: str, arguments: Dict) -> str:
from jinja2 import Environment, PackageLoader, select_autoescape
env = Environment(
loader=PackageLoader('freqtrade', 'templates'),
autoescape=select_autoescape(['html', 'xml'])
loader=PackageLoader("freqtrade", "templates"),
autoescape=select_autoescape(["html", "xml"]),
)
template = env.get_template(templatefile)
return template.render(**arguments)
def render_template_with_fallback(templatefile: str, templatefallbackfile: str,
arguments: Optional[Dict] = None) -> str:
def render_template_with_fallback(
templatefile: str, templatefallbackfile: str, arguments: Optional[Dict] = None
) -> str:
"""
Use templatefile if possible, otherwise fall back to templatefallbackfile
"""
from jinja2.exceptions import TemplateNotFound
if arguments is None:
arguments = {}
try: