freqtrade_origin/freqtrade/exchange/exchange_utils.py

343 lines
13 KiB
Python

"""
Exchange support utils
"""
from datetime import datetime, timedelta, timezone
from math import ceil, floor
from typing import Any, Dict, List, Optional, Tuple
import ccxt
from ccxt import (DECIMAL_PLACES, ROUND, ROUND_DOWN, ROUND_UP, SIGNIFICANT_DIGITS, TICK_SIZE,
TRUNCATE, decimal_to_precision)
from freqtrade.exchange.common import (BAD_EXCHANGES, EXCHANGE_HAS_OPTIONAL, EXCHANGE_HAS_REQUIRED,
SUPPORTED_EXCHANGES)
from freqtrade.types import ValidExchangesType
from freqtrade.util import FtPrecise
from freqtrade.util.datetime_helpers import dt_from_ts, dt_ts
CcxtModuleType = Any
def is_exchange_known_ccxt(
exchange_name: str, ccxt_module: Optional[CcxtModuleType] = None) -> bool:
return exchange_name in ccxt_exchanges(ccxt_module)
def ccxt_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
"""
Return the list of all exchanges known to ccxt
"""
return ccxt_module.exchanges if ccxt_module is not None else ccxt.exchanges
def available_exchanges(ccxt_module: Optional[CcxtModuleType] = None) -> List[str]:
"""
Return exchanges available to the bot, i.e. non-bad exchanges in the ccxt list
"""
exchanges = ccxt_exchanges(ccxt_module)
return [x for x in exchanges if validate_exchange(x)[0]]
def validate_exchange(exchange: str) -> Tuple[bool, str]:
ex_mod = getattr(ccxt, exchange.lower())()
if not ex_mod or not ex_mod.has:
return False, ''
missing = [k for k in EXCHANGE_HAS_REQUIRED if ex_mod.has.get(k) is not True]
if missing:
return False, f"missing: {', '.join(missing)}"
missing_opt = [k for k in EXCHANGE_HAS_OPTIONAL if not ex_mod.has.get(k)]
if exchange.lower() in BAD_EXCHANGES:
return False, BAD_EXCHANGES.get(exchange.lower(), '')
if missing_opt:
return True, f"missing opt: {', '.join(missing_opt)}"
return True, ''
def _build_exchange_list_entry(
exchange_name: str, exchangeClasses: Dict[str, Any]) -> ValidExchangesType:
valid, comment = validate_exchange(exchange_name)
result: ValidExchangesType = {
'name': exchange_name,
'valid': valid,
'supported': exchange_name.lower() in SUPPORTED_EXCHANGES,
'comment': comment,
'trade_modes': [{'trading_mode': 'spot', 'margin_mode': ''}],
}
if resolved := exchangeClasses.get(exchange_name.lower()):
supported_modes = [{'trading_mode': 'spot', 'margin_mode': ''}] + [
{'trading_mode': tm.value, 'margin_mode': mm.value}
for tm, mm in resolved['class']._supported_trading_mode_margin_pairs
]
result.update({
'trade_modes': supported_modes,
})
return result
def list_available_exchanges(all_exchanges: bool) -> List[ValidExchangesType]:
"""
:return: List of tuples with exchangename, valid, reason.
"""
exchanges = ccxt_exchanges() if all_exchanges else available_exchanges()
from freqtrade.resolvers.exchange_resolver import ExchangeResolver
subclassed = {e['name'].lower(): e for e in ExchangeResolver.search_all_objects({}, False)}
exchanges_valid: List[ValidExchangesType] = [
_build_exchange_list_entry(e, subclassed) for e in exchanges
]
return exchanges_valid
def timeframe_to_seconds(timeframe: str) -> int:
"""
Translates the timeframe interval value written in the human readable
form ('1m', '5m', '1h', '1d', '1w', etc.) to the number
of seconds for one timeframe interval.
"""
return ccxt.Exchange.parse_timeframe(timeframe)
def timeframe_to_minutes(timeframe: str) -> int:
"""
Same as timeframe_to_seconds, but returns minutes.
"""
return ccxt.Exchange.parse_timeframe(timeframe) // 60
def timeframe_to_msecs(timeframe: str) -> int:
"""
Same as timeframe_to_seconds, but returns milliseconds.
"""
return ccxt.Exchange.parse_timeframe(timeframe) * 1000
def timeframe_to_prev_date(timeframe: str, date: Optional[datetime] = None) -> datetime:
"""
Use Timeframe and determine the candle start date for this date.
Does not round when given a candle start date.
:param timeframe: timeframe in string format (e.g. "5m")
:param date: date to use. Defaults to now(utc)
:returns: date of previous candle (with utc timezone)
"""
if not date:
date = datetime.now(timezone.utc)
new_timestamp = ccxt.Exchange.round_timeframe(timeframe, dt_ts(date), ROUND_DOWN) // 1000
return dt_from_ts(new_timestamp)
def timeframe_to_next_date(timeframe: str, date: Optional[datetime] = None) -> datetime:
"""
Use Timeframe and determine next candle.
:param timeframe: timeframe in string format (e.g. "5m")
:param date: date to use. Defaults to now(utc)
:returns: date of next candle (with utc timezone)
"""
if not date:
date = datetime.now(timezone.utc)
new_timestamp = ccxt.Exchange.round_timeframe(timeframe, dt_ts(date), ROUND_UP) // 1000
return dt_from_ts(new_timestamp)
def date_minus_candles(
timeframe: str, candle_count: int, date: Optional[datetime] = None) -> datetime:
"""
subtract X candles from a date.
:param timeframe: timeframe in string format (e.g. "5m")
:param candle_count: Amount of candles to subtract.
:param date: date to use. Defaults to now(utc)
"""
if not date:
date = datetime.now(timezone.utc)
tf_min = timeframe_to_minutes(timeframe)
new_date = timeframe_to_prev_date(timeframe, date) - timedelta(minutes=tf_min * candle_count)
return new_date
def market_is_active(market: Dict) -> bool:
"""
Return True if the market is active.
"""
# "It's active, if the active flag isn't explicitly set to false. If it's missing or
# true then it's true. If it's undefined, then it's most likely true, but not 100% )"
# See https://github.com/ccxt/ccxt/issues/4874,
# https://github.com/ccxt/ccxt/issues/4075#issuecomment-434760520
return market.get('active', True) is not False
def amount_to_contracts(amount: float, contract_size: Optional[float]) -> float:
"""
Convert amount to contracts.
:param amount: amount to convert
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: num-contracts
"""
if contract_size and contract_size != 1:
return float(FtPrecise(amount) / FtPrecise(contract_size))
else:
return amount
def contracts_to_amount(num_contracts: float, contract_size: Optional[float]) -> float:
"""
Takes num-contracts and converts it to contract size
:param num_contracts: number of contracts
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: Amount
"""
if contract_size and contract_size != 1:
return float(FtPrecise(num_contracts) * FtPrecise(contract_size))
else:
return num_contracts
def amount_to_precision(amount: float, amount_precision: Optional[float],
precisionMode: Optional[int]) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
precision = int(amount_precision) if precisionMode != TICK_SIZE else amount_precision
# precision must be an int for non-ticksize inputs.
amount = float(decimal_to_precision(amount, rounding_mode=TRUNCATE,
precision=precision,
counting_mode=precisionMode,
))
return amount
def amount_to_contract_precision(
amount, amount_precision: Optional[float], precisionMode: Optional[int],
contract_size: Optional[float]) -> float:
"""
Returns the amount to buy or sell to a precision the Exchange accepts
including calculation to and from contracts.
Re-implementation of ccxt internal methods - ensuring we can test the result is correct
based on our definitions.
:param amount: amount to truncate
:param amount_precision: amount precision to use.
should be retrieved from markets[pair]['precision']['amount']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param contract_size: contract size - taken from exchange.get_contract_size(pair)
:return: truncated amount
"""
if amount_precision is not None and precisionMode is not None:
contracts = amount_to_contracts(amount, contract_size)
amount_p = amount_to_precision(contracts, amount_precision, precisionMode)
return contracts_to_amount(amount_p, contract_size)
return amount
def __price_to_precision_significant_digits(
price: float,
price_precision: float,
*,
rounding_mode: int = ROUND,
) -> float:
"""
Implementation of ROUND_UP/Round_down for significant digits mode.
"""
from decimal import ROUND_DOWN as dec_ROUND_DOWN
from decimal import ROUND_UP as dec_ROUND_UP
from decimal import Decimal
dec = Decimal(str(price))
string = f'{dec:f}'
precision = round(price_precision)
q = precision - dec.adjusted() - 1
sigfig = Decimal('10') ** -q
if q < 0:
string_to_precision = string[:precision]
# string_to_precision is '' when we have zero precision
below = sigfig * Decimal(string_to_precision if string_to_precision else '0')
above = below + sigfig
res = above if rounding_mode == ROUND_UP else below
precise = f'{res:f}'
else:
precise = '{:f}'.format(dec.quantize(
sigfig,
rounding=dec_ROUND_DOWN if rounding_mode == ROUND_DOWN else dec_ROUND_UP)
)
return float(precise)
def price_to_precision(
price: float,
price_precision: Optional[float],
precisionMode: Optional[int],
*,
rounding_mode: int = ROUND,
) -> float:
"""
Returns the price rounded to the precision the Exchange accepts.
Partial Re-implementation of ccxt internal method decimal_to_precision(),
which does not support rounding up.
For stoploss calculations, must use ROUND_UP for longs, and ROUND_DOWN for shorts.
TODO: If ccxt supports ROUND_UP for decimal_to_precision(), we could remove this and
align with amount_to_precision().
:param price: price to convert
:param price_precision: price precision to use. Used from markets[pair]['precision']['price']
:param precisionMode: precision mode to use. Should be used from precisionMode
one of ccxt's DECIMAL_PLACES, SIGNIFICANT_DIGITS, or TICK_SIZE
:param rounding_mode: rounding mode to use. Defaults to ROUND
:return: price rounded up to the precision the Exchange accepts
"""
if price_precision is not None and precisionMode is not None:
if rounding_mode not in (ROUND_UP, ROUND_DOWN):
# Use CCXT code where possible.
return float(decimal_to_precision(price, rounding_mode=rounding_mode,
precision=price_precision,
counting_mode=precisionMode
))
if precisionMode == TICK_SIZE:
precision = FtPrecise(price_precision)
price_str = FtPrecise(price)
missing = price_str % precision
if not missing == FtPrecise("0"):
if rounding_mode == ROUND_UP:
res = price_str - missing + precision
elif rounding_mode == ROUND_DOWN:
res = price_str - missing
return round(float(str(res)), 14)
return price
elif precisionMode == DECIMAL_PLACES:
ndigits = round(price_precision)
ticks = price * (10**ndigits)
if rounding_mode == ROUND_UP:
return ceil(ticks) / (10**ndigits)
if rounding_mode == ROUND_DOWN:
return floor(ticks) / (10**ndigits)
raise ValueError(f"Unknown rounding_mode {rounding_mode}")
elif precisionMode == SIGNIFICANT_DIGITS:
if rounding_mode in (ROUND_UP, ROUND_DOWN):
return __price_to_precision_significant_digits(
price, price_precision, rounding_mode=rounding_mode
)
raise ValueError(f"Unknown precisionMode {precisionMode}")
return price