mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
348 lines
12 KiB
Python
348 lines
12 KiB
Python
"""
|
|
Exchange support utils
|
|
"""
|
|
|
|
import inspect
|
|
from datetime import datetime, timedelta, timezone
|
|
from math import ceil, floor
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
|
|
import ccxt
|
|
from ccxt import (
|
|
DECIMAL_PLACES,
|
|
ROUND,
|
|
ROUND_DOWN,
|
|
ROUND_UP,
|
|
SIGNIFICANT_DIGITS,
|
|
TICK_SIZE,
|
|
TRUNCATE,
|
|
decimal_to_precision,
|
|
)
|
|
|
|
from freqtrade.exchange.common import (
|
|
BAD_EXCHANGES,
|
|
EXCHANGE_HAS_OPTIONAL,
|
|
EXCHANGE_HAS_REQUIRED,
|
|
SUPPORTED_EXCHANGES,
|
|
)
|
|
from freqtrade.exchange.exchange_utils_timeframe import timeframe_to_minutes, timeframe_to_prev_date
|
|
from freqtrade.ft_types import ValidExchangesType
|
|
from freqtrade.util import FtPrecise
|
|
|
|
|
|
CcxtModuleType = Any
|
|
|
|
|
|
def is_exchange_known_ccxt(
|
|
exchange_name: str, ccxt_module: Optional[CcxtModuleType] = None
|
|
) -> bool:
|
|
return exchange_name in ccxt_exchanges(ccxt_module)
|
|
|
|
|
|
def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
|
|
"""
|
|
Return the list of all exchanges known to ccxt
|
|
"""
|
|
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
|
|
|
|
|
|
def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
|
|
"""
|
|
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
|
|
"""
|
|
exchanges = ccxt_exchanges(ccxt_module)
|
|
return [x for x in exchanges if validate_exchange(x)[0]]
|
|
|
|
|
|
def validate_exchange(exchange: str) -> Tuple[bool, str, Optional[ccxt.Exchange]]:
|
|
"""
|
|
returns: can_use, reason, exchange_object
|
|
with Reason including both missing and missing_opt
|
|
"""
|
|
try:
|
|
ex_mod = getattr(ccxt.pro, exchange.lower())()
|
|
except AttributeError:
|
|
ex_mod = getattr(ccxt.async_support, exchange.lower())()
|
|
|
|
if not ex_mod or not ex_mod.has:
|
|
return False, "", None
|
|
|
|
result = True
|
|
reason = ""
|
|
missing = [
|
|
k
|
|
for k, v in EXCHANGE_HAS_REQUIRED.items()
|
|
if ex_mod.has.get(k) is not True and not (all(ex_mod.has.get(x) for x in v))
|
|
]
|
|
if missing:
|
|
result = False
|
|
reason += f"missing: {', '.join(missing)}"
|
|
|
|
missing_opt = [k for k in EXCHANGE_HAS_OPTIONAL if not ex_mod.has.get(k)]
|
|
|
|
if exchange.lower() in BAD_EXCHANGES:
|
|
result = False
|
|
reason = BAD_EXCHANGES.get(exchange.lower(), "")
|
|
|
|
if missing_opt:
|
|
reason += f"{'. ' if reason else ''}missing opt: {', '.join(missing_opt)}. "
|
|
|
|
return result, reason, ex_mod
|
|
|
|
|
|
def _build_exchange_list_entry(
|
|
exchange_name: str, exchangeClasses: Dict[str, Any]
|
|
) -> ValidExchangesType:
|
|
valid, comment, ex_mod = validate_exchange(exchange_name)
|
|
result: ValidExchangesType = {
|
|
"name": getattr(ex_mod, "name", exchange_name),
|
|
"classname": exchange_name,
|
|
"valid": valid,
|
|
"supported": exchange_name.lower() in SUPPORTED_EXCHANGES,
|
|
"comment": comment,
|
|
"dex": getattr(ex_mod, "dex", False),
|
|
"is_alias": getattr(ex_mod, "alias", False),
|
|
"alias_for": inspect.getmro(ex_mod.__class__)[1]().id
|
|
if getattr(ex_mod, "alias", False)
|
|
else None,
|
|
"trade_modes": [{"trading_mode": "spot", "margin_mode": ""}],
|
|
}
|
|
if resolved := exchangeClasses.get(exchange_name.lower()):
|
|
supported_modes = [{"trading_mode": "spot", "margin_mode": ""}] + [
|
|
{"trading_mode": tm.value, "margin_mode": mm.value}
|
|
for tm, mm in resolved["class"]._supported_trading_mode_margin_pairs
|
|
]
|
|
result.update(
|
|
{
|
|
"trade_modes": supported_modes,
|
|
}
|
|
)
|
|
|
|
return result
|
|
|
|
|
|
def list_available_exchanges(all_exchanges: bool) -> List[ValidExchangesType]:
|
|
"""
|
|
:return: List of tuples with exchangename, valid, reason.
|
|
"""
|
|
exchanges = ccxt_exchanges() if all_exchanges else available_exchanges()
|
|
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
|
|
|
|
subclassed = {e["name"].lower(): e for e in ExchangeResolver.search_all_objects({}, False)}
|
|
|
|
exchanges_valid: List[ValidExchangesType] = [
|
|
_build_exchange_list_entry(e, subclassed) for e in exchanges
|
|
]
|
|
|
|
return exchanges_valid
|
|
|
|
|
|
def date_minus_candles(
|
|
timeframe: str, candle_count: int, date: Optional[datetime] = None
|
|
) -> datetime:
|
|
"""
|
|
subtract X candles from a date.
|
|
:param timeframe: timeframe in string format (e.g. "5m")
|
|
:param candle_count: Amount of candles to subtract.
|
|
:param date: date to use. Defaults to now(utc)
|
|
|
|
"""
|
|
if not date:
|
|
date = datetime.now(timezone.utc)
|
|
|
|
tf_min = timeframe_to_minutes(timeframe)
|
|
new_date = timeframe_to_prev_date(timeframe, date) - timedelta(minutes=tf_min * candle_count)
|
|
return new_date
|
|
|
|
|
|
def market_is_active(market: Dict) -> bool:
|
|
"""
|
|
Return True if the market is active.
|
|
"""
|
|
# "It's active, if the active flag isn't explicitly set to false. If it's missing or
|
|
# true then it's true. If it's undefined, then it's most likely true, but not 100% )"
|
|
# See https://github.com/ccxt/ccxt/issues/4874,
|
|
# https://github.com/ccxt/ccxt/issues/4075#issuecomment-434760520
|
|
return market.get("active", True) is not False
|
|
|
|
|
|
def amount_to_contracts(amount: float, contract_size: Optional[float]) -> float:
|
|
"""
|
|
Convert amount to contracts.
|
|
:param amount: amount to convert
|
|
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
|
|
:return: num-contracts
|
|
"""
|
|
if contract_size and contract_size != 1:
|
|
return float(FtPrecise(amount) / FtPrecise(contract_size))
|
|
else:
|
|
return amount
|
|
|
|
|
|
def contracts_to_amount(num_contracts: float, contract_size: Optional[float]) -> float:
|
|
"""
|
|
Takes num-contracts and converts it to contract size
|
|
:param num_contracts: number of contracts
|
|
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
|
|
:return: Amount
|
|
"""
|
|
|
|
if contract_size and contract_size != 1:
|
|
return float(FtPrecise(num_contracts) * FtPrecise(contract_size))
|
|
else:
|
|
return num_contracts
|
|
|
|
|
|
def amount_to_precision(
|
|
amount: float, amount_precision: Optional[float], precisionMode: Optional[int]
|
|
) -> float:
|
|
"""
|
|
Returns the amount to buy or sell to a precision the Exchange accepts
|
|
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
|
|
based on our definitions.
|
|
:param amount: amount to truncate
|
|
:param amount_precision: amount precision to use.
|
|
should be retrieved from markets[pair]['precision']['amount']
|
|
:param precisionMode: precision mode to use. Should be used from precisionMode
|
|
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
|
|
:return: truncated amount
|
|
"""
|
|
if amount_precision is not None and precisionMode is not None:
|
|
precision = int(amount_precision) if precisionMode != TICK_SIZE else amount_precision
|
|
# precision must be an int for non-ticksize inputs.
|
|
amount = float(
|
|
decimal_to_precision(
|
|
amount,
|
|
rounding_mode=TRUNCATE,
|
|
precision=precision,
|
|
counting_mode=precisionMode,
|
|
)
|
|
)
|
|
|
|
return amount
|
|
|
|
|
|
def amount_to_contract_precision(
|
|
amount,
|
|
amount_precision: Optional[float],
|
|
precisionMode: Optional[int],
|
|
contract_size: Optional[float],
|
|
) -> float:
|
|
"""
|
|
Returns the amount to buy or sell to a precision the Exchange accepts
|
|
including calculation to and from contracts.
|
|
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
|
|
based on our definitions.
|
|
:param amount: amount to truncate
|
|
:param amount_precision: amount precision to use.
|
|
should be retrieved from markets[pair]['precision']['amount']
|
|
:param precisionMode: precision mode to use. Should be used from precisionMode
|
|
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
|
|
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
|
|
:return: truncated amount
|
|
"""
|
|
if amount_precision is not None and precisionMode is not None:
|
|
contracts = amount_to_contracts(amount, contract_size)
|
|
amount_p = amount_to_precision(contracts, amount_precision, precisionMode)
|
|
return contracts_to_amount(amount_p, contract_size)
|
|
return amount
|
|
|
|
|
|
def __price_to_precision_significant_digits(
|
|
price: float,
|
|
price_precision: float,
|
|
*,
|
|
rounding_mode: int = ROUND,
|
|
) -> float:
|
|
"""
|
|
Implementation of ROUND_UP/Round_down for significant digits mode.
|
|
"""
|
|
from decimal import ROUND_DOWN as dec_ROUND_DOWN
|
|
from decimal import ROUND_UP as dec_ROUND_UP
|
|
from decimal import Decimal
|
|
|
|
dec = Decimal(str(price))
|
|
string = f"{dec:f}"
|
|
precision = round(price_precision)
|
|
|
|
q = precision - dec.adjusted() - 1
|
|
sigfig = Decimal("10") ** -q
|
|
if q < 0:
|
|
string_to_precision = string[:precision]
|
|
# string_to_precision is '' when we have zero precision
|
|
below = sigfig * Decimal(string_to_precision if string_to_precision else "0")
|
|
above = below + sigfig
|
|
res = above if rounding_mode == ROUND_UP else below
|
|
precise = f"{res:f}"
|
|
else:
|
|
precise = "{:f}".format(
|
|
dec.quantize(
|
|
sigfig, rounding=dec_ROUND_DOWN if rounding_mode == ROUND_DOWN else dec_ROUND_UP
|
|
)
|
|
)
|
|
return float(precise)
|
|
|
|
|
|
def price_to_precision(
|
|
price: float,
|
|
price_precision: Optional[float],
|
|
precisionMode: Optional[int],
|
|
*,
|
|
rounding_mode: int = ROUND,
|
|
) -> float:
|
|
"""
|
|
Returns the price rounded to the precision the Exchange accepts.
|
|
Partial Re-implementation of ccxt internal method decimal_to_precision(),
|
|
which does not support rounding up.
|
|
For stoploss calculations, must use ROUND_UP for longs, and ROUND_DOWN for shorts.
|
|
|
|
TODO: If ccxt supports ROUND_UP for decimal_to_precision(), we could remove this and
|
|
align with amount_to_precision().
|
|
:param price: price to convert
|
|
:param price_precision: price precision to use. Used from markets[pair]['precision']['price']
|
|
:param precisionMode: precision mode to use. Should be used from precisionMode
|
|
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
|
|
:param rounding_mode: rounding mode to use. Defaults to ROUND
|
|
:return: price rounded up to the precision the Exchange accepts
|
|
"""
|
|
if price_precision is not None and precisionMode is not None:
|
|
if rounding_mode not in (ROUND_UP, ROUND_DOWN):
|
|
# Use CCXT code where possible.
|
|
return float(
|
|
decimal_to_precision(
|
|
price,
|
|
rounding_mode=rounding_mode,
|
|
precision=price_precision,
|
|
counting_mode=precisionMode,
|
|
)
|
|
)
|
|
|
|
if precisionMode == TICK_SIZE:
|
|
precision = FtPrecise(price_precision)
|
|
price_str = FtPrecise(price)
|
|
missing = price_str % precision
|
|
if not missing == FtPrecise("0"):
|
|
if rounding_mode == ROUND_UP:
|
|
res = price_str - missing + precision
|
|
elif rounding_mode == ROUND_DOWN:
|
|
res = price_str - missing
|
|
return round(float(str(res)), 14)
|
|
return price
|
|
elif precisionMode == DECIMAL_PLACES:
|
|
ndigits = round(price_precision)
|
|
ticks = price * (10**ndigits)
|
|
if rounding_mode == ROUND_UP:
|
|
return ceil(ticks) / (10**ndigits)
|
|
if rounding_mode == ROUND_DOWN:
|
|
return floor(ticks) / (10**ndigits)
|
|
|
|
raise ValueError(f"Unknown rounding_mode {rounding_mode}")
|
|
elif precisionMode == SIGNIFICANT_DIGITS:
|
|
if rounding_mode in (ROUND_UP, ROUND_DOWN):
|
|
return __price_to_precision_significant_digits(
|
|
price, price_precision, rounding_mode=rounding_mode
|
|
)
|
|
|
|
raise ValueError(f"Unknown precisionMode {precisionMode}")
|
|
return price
|