freqtrade_origin/freqtrade/exchange/exchange_utils.py

348 lines
12 KiB
Python
Raw Normal View History

"""
Exchange support utils
"""
2024-05-12 14:58:33 +00:00
import inspect
from datetime import datetime, timedelta, timezone
from math import ceil, floor
from typing import Any, Optional
import ccxt
from ccxt import (
DECIMAL_PLACES,
ROUND,
ROUND_DOWN,
ROUND_UP,
SIGNIFICANT_DIGITS,
TICK_SIZE,
TRUNCATE,
decimal_to_precision,
)
from freqtrade.exchange.common import (
BAD_EXCHANGES,
EXCHANGE_HAS_OPTIONAL,
EXCHANGE_HAS_REQUIRED,
SUPPORTED_EXCHANGES,
)
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_minutes, timeframe_to_prev_date
2024-09-04 04:44:48 +00:00
from freqtrade.ft_types import ValidExchangesType
from freqtrade.util import FtPrecise
CcxtModuleType = Any
2023-01-21 14:01:56 +00:00
def is_exchange_known_ccxt(
2024-05-12 14:58:33 +00:00
exchange_name: str, ccxt_module: Optional[CcxtModuleType] = None
) -> bool:
return exchange_name in ccxt_exchanges(ccxt_module)
def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> list[str]:
"""
Return the list of all exchanges known to ccxt
"""
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> list[str]:
"""
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
"""
exchanges = ccxt_exchanges(ccxt_module)
return [x for x in exchanges if validate_exchange(x)[0]]
def validate_exchange(exchange: str) -> tuple[bool, str, Optional[ccxt.Exchange]]:
"""
returns: can_use, reason, exchange_object
with Reason including both missing and missing_opt
"""
2022-10-10 18:30:38 +00:00
try:
ex_mod = getattr(ccxt.pro, exchange.lower())()
except AttributeError:
ex_mod = getattr(ccxt.async_support, exchange.lower())()
if not ex_mod or not ex_mod.has:
return False, "", None
result = True
2024-05-12 14:58:33 +00:00
reason = ""
missing = [
2024-05-12 14:58:33 +00:00
k
for k, v in EXCHANGE_HAS_REQUIRED.items()
if ex_mod.has.get(k) is not True and not (all(ex_mod.has.get(x) for x in v))
]
if missing:
result = False
reason += f"missing: {', '.join(missing)}"
missing_opt = [k for k in EXCHANGE_HAS_OPTIONAL if not ex_mod.has.get(k)]
if exchange.lower() in BAD_EXCHANGES:
result = False
2024-05-12 14:58:33 +00:00
reason = BAD_EXCHANGES.get(exchange.lower(), "")
if missing_opt:
reason += f"{'. ' if reason else ''}missing opt: {', '.join(missing_opt)}. "
return result, reason, ex_mod
def _build_exchange_list_entry(
exchange_name: str, exchangeClasses: dict[str, Any]
2024-05-12 14:58:33 +00:00
) -> ValidExchangesType:
valid, comment, ex_mod = validate_exchange(exchange_name)
result: ValidExchangesType = {
"name": getattr(ex_mod, "name", exchange_name),
"classname": exchange_name,
2024-05-12 14:58:33 +00:00
"valid": valid,
"supported": exchange_name.lower() in SUPPORTED_EXCHANGES,
"comment": comment,
"dex": getattr(ex_mod, "dex", False),
"is_alias": getattr(ex_mod, "alias", False),
"alias_for": inspect.getmro(ex_mod.__class__)[1]().id
if getattr(ex_mod, "alias", False)
else None,
2024-05-12 14:58:33 +00:00
"trade_modes": [{"trading_mode": "spot", "margin_mode": ""}],
}
if resolved := exchangeClasses.get(exchange_name.lower()):
2024-05-12 14:58:33 +00:00
supported_modes = [{"trading_mode": "spot", "margin_mode": ""}] + [
{"trading_mode": tm.value, "margin_mode": mm.value}
for tm, mm in resolved["class"]._supported_trading_mode_margin_pairs
]
2024-05-12 14:58:33 +00:00
result.update(
{
"trade_modes": supported_modes,
}
)
return result
def list_available_exchanges(all_exchanges: bool) -> list[ValidExchangesType]:
"""
:return: List of tuples with exchangename, valid, reason.
"""
exchanges = ccxt_exchanges() if all_exchanges else available_exchanges()
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
2024-05-12 14:58:33 +00:00
subclassed = {e["name"].lower(): e for e in ExchangeResolver.search_all_objects({}, False)}
exchanges_valid: list[ValidExchangesType] = [
_build_exchange_list_entry(e, subclassed) for e in exchanges
]
return exchanges_valid
def date_minus_candles(
2024-05-12 14:58:33 +00:00
timeframe: str, candle_count: int, date: Optional[datetime] = None
) -> datetime:
"""
subtract X candles from a date.
:param timeframe: timeframe in string format (e.g. "5m")
:param candle_count: Amount of candles to subtract.
:param date: date to use. Defaults to now(utc)
"""
if not date:
date = datetime.now(timezone.utc)
tf_min = timeframe_to_minutes(timeframe)
new_date = timeframe_to_prev_date(timeframe, date) - timedelta(minutes=tf_min * candle_count)
return new_date
def market_is_active(market: dict) -> bool:
"""
Return True if the market is active.
"""
# "It's active, if the active flag isn't explicitly set to false. If it's missing or
# true then it's true. If it's undefined, then it's most likely true, but not 100% )"
# See https://github.com/ccxt/ccxt/issues/4874,
# https://github.com/ccxt/ccxt/issues/4075#issuecomment-434760520
2024-05-12 14:58:33 +00:00
return market.get("active", True) is not False
def amount_to_contracts(amount: float, contract_size: Optional[float]) -> float:
"""
Convert amount to contracts.
:param amount: amount to convert
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: num-contracts
"""
if contract_size and contract_size != 1:
return float(FtPrecise(amount) / FtPrecise(contract_size))
else:
return amount
def contracts_to_amount(num_contracts: float, contract_size: Optional[float]) -> float:
"""
Takes num-contracts and converts it to contract size
:param num_contracts: number of contracts
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: Amount
"""
if contract_size and contract_size != 1:
return float(FtPrecise(num_contracts) * FtPrecise(contract_size))
else:
return num_contracts
2024-05-12 14:58:33 +00:00
def amount_to_precision(
amount: float, amount_precision: Optional[float], precisionMode: Optional[int]
) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
precision = int(amount_precision) if precisionMode != TICK_SIZE else amount_precision
# precision must be an int for non-ticksize inputs.
2024-05-12 14:58:33 +00:00
amount = float(
decimal_to_precision(
amount,
rounding_mode=TRUNCATE,
precision=precision,
counting_mode=precisionMode,
)
)
return amount
def amount_to_contract_precision(
2024-05-12 14:58:33 +00:00
amount,
amount_precision: Optional[float],
precisionMode: Optional[int],
contract_size: Optional[float],
) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
including calculation to and from contracts.
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
contracts = amount_to_contracts(amount, contract_size)
amount_p = amount_to_precision(contracts, amount_precision, precisionMode)
return contracts_to_amount(amount_p, contract_size)
return amount
def __price_to_precision_significant_digits(
price: float,
price_precision: float,
*,
rounding_mode: int = ROUND,
) -> float:
"""
Implementation of ROUND_UP/Round_down for significant digits mode.
"""
from decimal import ROUND_DOWN as dec_ROUND_DOWN
from decimal import ROUND_UP as dec_ROUND_UP
from decimal import Decimal
2024-05-12 14:58:33 +00:00
dec = Decimal(str(price))
2024-05-12 14:58:33 +00:00
string = f"{dec:f}"
precision = round(price_precision)
q = precision - dec.adjusted() - 1
2024-05-12 14:58:33 +00:00
sigfig = Decimal("10") ** -q
if q < 0:
string_to_precision = string[:precision]
# string_to_precision is '' when we have zero precision
2024-05-12 14:58:33 +00:00
below = sigfig * Decimal(string_to_precision if string_to_precision else "0")
above = below + sigfig
res = above if rounding_mode == ROUND_UP else below
2024-05-12 14:58:33 +00:00
precise = f"{res:f}"
else:
2024-05-12 14:58:33 +00:00
precise = "{:f}".format(
dec.quantize(
sigfig, rounding=dec_ROUND_DOWN if rounding_mode == ROUND_DOWN else dec_ROUND_UP
)
)
return float(precise)
def price_to_precision(
price: float,
price_precision: Optional[float],
precisionMode: Optional[int],
*,
rounding_mode: int = ROUND,
) -> float:
"""
Returns the price rounded to the precision the Exchange accepts.
Partial Re-implementation of ccxt internal method decimal_to_precision(),
which does not support rounding up.
For stoploss calculations, must use ROUND_UP for longs, and ROUND_DOWN for shorts.
TODO: If ccxt supports ROUND_UP for decimal_to_precision(), we could remove this and
align with amount_to_precision().
:param price: price to convert
:param price_precision: price precision to use. Used from markets[pair]['precision']['price']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param rounding_mode: rounding mode to use. Defaults to ROUND
:return: price rounded up to the precision the Exchange accepts
"""
if price_precision is not None and precisionMode is not None:
if rounding_mode not in (ROUND_UP, ROUND_DOWN):
# Use CCXT code where possible.
2024-05-12 14:58:33 +00:00
return float(
decimal_to_precision(
price,
rounding_mode=rounding_mode,
precision=price_precision,
counting_mode=precisionMode,
)
)
if precisionMode == TICK_SIZE:
precision = FtPrecise(price_precision)
price_str = FtPrecise(price)
missing = price_str % precision
if not missing == FtPrecise("0"):
if rounding_mode == ROUND_UP:
res = price_str - missing + precision
elif rounding_mode == ROUND_DOWN:
res = price_str - missing
return round(float(str(res)), 14)
return price
elif precisionMode == DECIMAL_PLACES:
ndigits = round(price_precision)
ticks = price * (10**ndigits)
if rounding_mode == ROUND_UP:
return ceil(ticks) / (10**ndigits)
if rounding_mode == ROUND_DOWN:
return floor(ticks) / (10**ndigits)
raise ValueError(f"Unknown rounding_mode {rounding_mode}")
elif precisionMode == SIGNIFICANT_DIGITS:
if rounding_mode in (ROUND_UP, ROUND_DOWN):
return __price_to_precision_significant_digits(
price, price_precision, rounding_mode=rounding_mode
)
raise ValueError(f"Unknown precisionMode {precisionMode}")
return price