freqtrade_origin/freqtrade/exchange/exchange_utils.py

348 lines
12 KiB
Python

"""
Exchange support utils
"""
import inspect
from datetime import datetime, timedelta, timezone
from math import ceil, floor
from typing import Any, Dict, List, Optional, Tuple
import ccxt
from ccxt import (
DECIMAL_PLACES,
ROUND,
ROUND_DOWN,
ROUND_UP,
SIGNIFICANT_DIGITS,
TICK_SIZE,
TRUNCATE,
decimal_to_precision,
)
from freqtrade.exchange.common import (
BAD_EXCHANGES,
EXCHANGE_HAS_OPTIONAL,
EXCHANGE_HAS_REQUIRED,
SUPPORTED_EXCHANGES,
)
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_minutes, timeframe_to_prev_date
from freqtrade.types import ValidExchangesType
from freqtrade.util import FtPrecise
CcxtModuleType = Any
def is_exchange_known_ccxt(
exchange_name: str, ccxt_module: Optional[CcxtModuleType] = None
) -> bool:
return exchange_name in ccxt_exchanges(ccxt_module)
def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
"""
Return the list of all exchanges known to ccxt
"""
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
"""
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
"""
exchanges = ccxt_exchanges(ccxt_module)
return [x for x in exchanges if validate_exchange(x)[0]]
def validate_exchange(exchange: str) -> Tuple[bool, str, Optional[ccxt.Exchange]]:
"""
returns: can_use, reason, exchange_object
with Reason including both missing and missing_opt
"""
try:
ex_mod = getattr(ccxt.pro, exchange.lower())()
except AttributeError:
ex_mod = getattr(ccxt.async_support, exchange.lower())()
if not ex_mod or not ex_mod.has:
return False, "", None
result = True
reason = ""
missing = [
k
for k, v in EXCHANGE_HAS_REQUIRED.items()
if ex_mod.has.get(k) is not True and not (all(ex_mod.has.get(x) for x in v))
]
if missing:
result = False
reason += f"missing: {', '.join(missing)}"
missing_opt = [k for k in EXCHANGE_HAS_OPTIONAL if not ex_mod.has.get(k)]
if exchange.lower() in BAD_EXCHANGES:
result = False
reason = BAD_EXCHANGES.get(exchange.lower(), "")
if missing_opt:
reason += f"{'. ' if reason else ''}missing opt: {', '.join(missing_opt)}. "
return result, reason, ex_mod
def _build_exchange_list_entry(
exchange_name: str, exchangeClasses: Dict[str, Any]
) -> ValidExchangesType:
valid, comment, ex_mod = validate_exchange(exchange_name)
result: ValidExchangesType = {
"name": getattr(ex_mod, "name", exchange_name),
"classname": exchange_name,
"valid": valid,
"supported": exchange_name.lower() in SUPPORTED_EXCHANGES,
"comment": comment,
"dex": getattr(ex_mod, "dex", False),
"is_alias": getattr(ex_mod, "alias", False),
"alias_for": inspect.getmro(ex_mod.__class__)[1]().id
if getattr(ex_mod, "alias", False)
else None,
"trade_modes": [{"trading_mode": "spot", "margin_mode": ""}],
}
if resolved := exchangeClasses.get(exchange_name.lower()):
supported_modes = [{"trading_mode": "spot", "margin_mode": ""}] + [
{"trading_mode": tm.value, "margin_mode": mm.value}
for tm, mm in resolved["class"]._supported_trading_mode_margin_pairs
]
result.update(
{
"trade_modes": supported_modes,
}
)
return result
def list_available_exchanges(all_exchanges: bool) -> List[ValidExchangesType]:
"""
:return: List of tuples with exchangename, valid, reason.
"""
exchanges = ccxt_exchanges() if all_exchanges else available_exchanges()
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
subclassed = {e["name"].lower(): e for e in ExchangeResolver.search_all_objects({}, False)}
exchanges_valid: List[ValidExchangesType] = [
_build_exchange_list_entry(e, subclassed) for e in exchanges
]
return exchanges_valid
def date_minus_candles(
timeframe: str, candle_count: int, date: Optional[datetime] = None
) -> datetime:
"""
subtract X candles from a date.
:param timeframe: timeframe in string format (e.g. "5m")
:param candle_count: Amount of candles to subtract.
:param date: date to use. Defaults to now(utc)
"""
if not date:
date = datetime.now(timezone.utc)
tf_min = timeframe_to_minutes(timeframe)
new_date = timeframe_to_prev_date(timeframe, date) - timedelta(minutes=tf_min * candle_count)
return new_date
def market_is_active(market: Dict) -> bool:
"""
Return True if the market is active.
"""
# "It's active, if the active flag isn't explicitly set to false. If it's missing or
# true then it's true. If it's undefined, then it's most likely true, but not 100% )"
# See https://github.com/ccxt/ccxt/issues/4874,
# https://github.com/ccxt/ccxt/issues/4075#issuecomment-434760520
return market.get("active", True) is not False
def amount_to_contracts(amount: float, contract_size: Optional[float]) -> float:
"""
Convert amount to contracts.
:param amount: amount to convert
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: num-contracts
"""
if contract_size and contract_size != 1:
return float(FtPrecise(amount) / FtPrecise(contract_size))
else:
return amount
def contracts_to_amount(num_contracts: float, contract_size: Optional[float]) -> float:
"""
Takes num-contracts and converts it to contract size
:param num_contracts: number of contracts
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: Amount
"""
if contract_size and contract_size != 1:
return float(FtPrecise(num_contracts) * FtPrecise(contract_size))
else:
return num_contracts
def amount_to_precision(
amount: float, amount_precision: Optional[float], precisionMode: Optional[int]
) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
precision = int(amount_precision) if precisionMode != TICK_SIZE else amount_precision
# precision must be an int for non-ticksize inputs.
amount = float(
decimal_to_precision(
amount,
rounding_mode=TRUNCATE,
precision=precision,
counting_mode=precisionMode,
)
)
return amount
def amount_to_contract_precision(
amount,
amount_precision: Optional[float],
precisionMode: Optional[int],
contract_size: Optional[float],
) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
including calculation to and from contracts.
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
contracts = amount_to_contracts(amount, contract_size)
amount_p = amount_to_precision(contracts, amount_precision, precisionMode)
return contracts_to_amount(amount_p, contract_size)
return amount
def __price_to_precision_significant_digits(
price: float,
price_precision: float,
*,
rounding_mode: int = ROUND,
) -> float:
"""
Implementation of ROUND_UP/Round_down for significant digits mode.
"""
from decimal import ROUND_DOWN as dec_ROUND_DOWN
from decimal import ROUND_UP as dec_ROUND_UP
from decimal import Decimal
dec = Decimal(str(price))
string = f"{dec:f}"
precision = round(price_precision)
q = precision - dec.adjusted() - 1
sigfig = Decimal("10") ** -q
if q < 0:
string_to_precision = string[:precision]
# string_to_precision is '' when we have zero precision
below = sigfig * Decimal(string_to_precision if string_to_precision else "0")
above = below + sigfig
res = above if rounding_mode == ROUND_UP else below
precise = f"{res:f}"
else:
precise = "{:f}".format(
dec.quantize(
sigfig, rounding=dec_ROUND_DOWN if rounding_mode == ROUND_DOWN else dec_ROUND_UP
)
)
return float(precise)
def price_to_precision(
price: float,
price_precision: Optional[float],
precisionMode: Optional[int],
*,
rounding_mode: int = ROUND,
) -> float:
"""
Returns the price rounded to the precision the Exchange accepts.
Partial Re-implementation of ccxt internal method decimal_to_precision(),
which does not support rounding up.
For stoploss calculations, must use ROUND_UP for longs, and ROUND_DOWN for shorts.
TODO: If ccxt supports ROUND_UP for decimal_to_precision(), we could remove this and
align with amount_to_precision().
:param price: price to convert
:param price_precision: price precision to use. Used from markets[pair]['precision']['price']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param rounding_mode: rounding mode to use. Defaults to ROUND
:return: price rounded up to the precision the Exchange accepts
"""
if price_precision is not None and precisionMode is not None:
if rounding_mode not in (ROUND_UP, ROUND_DOWN):
# Use CCXT code where possible.
return float(
decimal_to_precision(
price,
rounding_mode=rounding_mode,
precision=price_precision,
counting_mode=precisionMode,
)
)
if precisionMode == TICK_SIZE:
precision = FtPrecise(price_precision)
price_str = FtPrecise(price)
missing = price_str % precision
if not missing == FtPrecise("0"):
if rounding_mode == ROUND_UP:
res = price_str - missing + precision
elif rounding_mode == ROUND_DOWN:
res = price_str - missing
return round(float(str(res)), 14)
return price
elif precisionMode == DECIMAL_PLACES:
ndigits = round(price_precision)
ticks = price * (10**ndigits)
if rounding_mode == ROUND_UP:
return ceil(ticks) / (10**ndigits)
if rounding_mode == ROUND_DOWN:
return floor(ticks) / (10**ndigits)
raise ValueError(f"Unknown rounding_mode {rounding_mode}")
elif precisionMode == SIGNIFICANT_DIGITS:
if rounding_mode in (ROUND_UP, ROUND_DOWN):
return __price_to_precision_significant_digits(
price, price_precision, rounding_mode=rounding_mode
)
raise ValueError(f"Unknown precisionMode {precisionMode}")
return price