freqtrade_origin/freqtrade/plugins/pairlist/RemotePairList.py

324 lines
12 KiB
Python

"""
Remote PairList provider
Provides pair list fetched from a remote source
"""
import logging
from pathlib import Path
from typing import Any, Dict, List, Tuple
import rapidjson
import requests
from cachetools import TTLCache
from freqtrade import __version__
from freqtrade.configuration.load_config import CONFIG_PARSE_MODE
from freqtrade.constants import Config
from freqtrade.exceptions import OperationalException
from freqtrade.exchange.types import Tickers
from freqtrade.plugins.pairlist.IPairList import IPairList, PairlistParameter
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
logger = logging.getLogger(__name__)
class RemotePairList(IPairList):
is_pairlist_generator = True
def __init__(self, exchange, pairlistmanager,
config: Config, pairlistconfig: Dict[str, Any],
pairlist_pos: int) -> None:
super().__init__(exchange, pairlistmanager, config, pairlistconfig, pairlist_pos)
if 'number_assets' not in self._pairlistconfig:
raise OperationalException(
'`number_assets` not specified. Please check your configuration '
'for "pairlist.config.number_assets"')
if 'pairlist_url' not in self._pairlistconfig:
raise OperationalException(
'`pairlist_url` not specified. Please check your configuration '
'for "pairlist.config.pairlist_url"')
self._mode = self._pairlistconfig.get('mode', 'whitelist')
self._processing_mode = self._pairlistconfig.get('processing_mode', 'filter')
self._number_pairs = self._pairlistconfig['number_assets']
self._refresh_period: int = self._pairlistconfig.get('refresh_period', 1800)
self._keep_pairlist_on_failure = self._pairlistconfig.get('keep_pairlist_on_failure', True)
self._pair_cache: TTLCache = TTLCache(maxsize=1, ttl=self._refresh_period)
self._pairlist_url = self._pairlistconfig.get('pairlist_url', '')
self._read_timeout = self._pairlistconfig.get('read_timeout', 60)
self._bearer_token = self._pairlistconfig.get('bearer_token', '')
self._init_done = False
self._save_to_file = self._pairlistconfig.get('save_to_file', None)
self._last_pairlist: List[Any] = list()
if self._mode not in ['whitelist', 'blacklist']:
raise OperationalException(
'`mode` not configured correctly. Supported Modes '
'are "whitelist","blacklist"')
if self._processing_mode not in ['filter', 'append']:
raise OperationalException(
'`processing_mode` not configured correctly. Supported Modes '
'are "filter","append"')
if self._pairlist_pos == 0 and self._mode == 'blacklist':
raise OperationalException(
'A `blacklist` mode RemotePairList can not be on the first '
'position of your pairlist.')
@property
def needstickers(self) -> bool:
"""
Boolean property defining if tickers are necessary.
If no Pairlist requires tickers, an empty Dict is passed
as tickers argument to filter_pairlist
"""
return False
def short_desc(self) -> str:
"""
Short whitelist method description - used for startup-messages
"""
return f"{self.name} - {self._pairlistconfig['number_assets']} pairs from RemotePairlist."
@staticmethod
def description() -> str:
return "Retrieve pairs from a remote API or local file."
@staticmethod
def available_parameters() -> Dict[str, PairlistParameter]:
return {
"pairlist_url": {
"type": "string",
"default": "",
"description": "URL to fetch pairlist from",
"help": "URL to fetch pairlist from",
},
"number_assets": {
"type": "number",
"default": 30,
"description": "Number of assets",
"help": "Number of assets to use from the pairlist.",
},
"mode": {
"type": "option",
"default": "whitelist",
"options": ["whitelist", "blacklist"],
"description": "Pairlist mode",
"help": "Should this pairlist operate as a whitelist or blacklist?",
},
"processing_mode": {
"type": "option",
"default": "filter",
"options": ["filter", "append"],
"description": "Processing mode",
"help": "Append pairs to incomming pairlist or filter them?",
},
**IPairList.refresh_period_parameter(),
"keep_pairlist_on_failure": {
"type": "boolean",
"default": True,
"description": "Keep last pairlist on failure",
"help": "Keep last pairlist on failure",
},
"read_timeout": {
"type": "number",
"default": 60,
"description": "Read timeout",
"help": "Request timeout for remote pairlist",
},
"bearer_token": {
"type": "string",
"default": "",
"description": "Bearer token",
"help": "Bearer token - used for auth against the upstream service.",
},
}
def process_json(self, jsonparse) -> List[str]:
pairlist = jsonparse.get('pairs', [])
remote_refresh_period = int(jsonparse.get('refresh_period', self._refresh_period))
if self._refresh_period < remote_refresh_period:
self.log_once(f'Refresh Period has been increased from {self._refresh_period}'
f' to minimum allowed: {remote_refresh_period} from Remote.', logger.info)
self._refresh_period = remote_refresh_period
self._pair_cache = TTLCache(maxsize=1, ttl=remote_refresh_period)
self._init_done = True
return pairlist
def return_last_pairlist(self) -> List[str]:
if self._keep_pairlist_on_failure:
pairlist = self._last_pairlist
self.log_once('Keeping last fetched pairlist', logger.info)
else:
pairlist = []
return pairlist
def fetch_pairlist(self) -> Tuple[List[str], float]:
headers = {
'User-Agent': 'Freqtrade/' + __version__ + ' Remotepairlist'
}
if self._bearer_token:
headers['Authorization'] = f'Bearer {self._bearer_token}'
try:
response = requests.get(self._pairlist_url, headers=headers,
timeout=self._read_timeout)
content_type = response.headers.get('content-type')
time_elapsed = response.elapsed.total_seconds()
if "application/json" in str(content_type):
jsonparse = response.json()
try:
pairlist = self.process_json(jsonparse)
except Exception as e:
if self._init_done:
pairlist = self.return_last_pairlist()
logger.warning(f'Error while processing JSON data: {type(e)}')
else:
raise OperationalException(f'Error while processing JSON data: {type(e)}')
else:
if self._init_done:
self.log_once(f'Error: RemotePairList is not of type JSON: '
f' {self._pairlist_url}', logger.info)
pairlist = self.return_last_pairlist()
else:
raise OperationalException('RemotePairList is not of type JSON, abort.')
except requests.exceptions.RequestException:
self.log_once(f'Was not able to fetch pairlist from:'
f' {self._pairlist_url}', logger.info)
pairlist = self.return_last_pairlist()
time_elapsed = 0
return pairlist, time_elapsed
def gen_pairlist(self, tickers: Tickers) -> List[str]:
"""
Generate the pairlist
:param tickers: Tickers (from exchange.get_tickers). May be cached.
:return: List of pairs
"""
if self._init_done:
pairlist = self._pair_cache.get('pairlist')
if pairlist == [None]:
# Valid but empty pairlist.
return []
else:
pairlist = []
time_elapsed = 0.0
if pairlist:
# Item found - no refresh necessary
return pairlist.copy()
else:
if self._pairlist_url.startswith("file:///"):
filename = self._pairlist_url.split("file:///", 1)[1]
file_path = Path(filename)
if file_path.exists():
with file_path.open() as json_file:
try:
# Load the JSON data into a dictionary
jsonparse = rapidjson.load(json_file, parse_mode=CONFIG_PARSE_MODE)
pairlist = self.process_json(jsonparse)
except Exception as e:
if self._init_done:
pairlist = self.return_last_pairlist()
logger.warning(f'Error while processing JSON data: {type(e)}')
logger.debug(f'Error while processing JSON data: {e}')
else:
raise OperationalException('Error while processing'
f'JSON data: {type(e)}')
else:
raise ValueError(f"{self._pairlist_url} does not exist.")
else:
# Fetch Pairlist from Remote URL
pairlist, time_elapsed = self.fetch_pairlist()
self.log_once(f"Fetched pairs: {pairlist}", logger.debug)
pairlist = expand_pairlist(pairlist, list(self._exchange.get_markets().keys()))
pairlist = self._whitelist_for_active_markets(pairlist)
pairlist = pairlist[:self._number_pairs]
if pairlist:
self._pair_cache['pairlist'] = pairlist.copy()
else:
# If pairlist is empty, set a dummy value to avoid fetching again
self._pair_cache['pairlist'] = [None]
if time_elapsed != 0.0:
self.log_once(f'Pairlist Fetched in {time_elapsed} seconds.', logger.info)
else:
self.log_once('Fetched Pairlist.', logger.info)
self._last_pairlist = list(pairlist)
if self._save_to_file:
self.save_pairlist(pairlist, self._save_to_file)
return pairlist
def save_pairlist(self, pairlist: List[str], filename: str) -> None:
pairlist_data = {
"pairs": pairlist,
"refresh_period": self._refresh_period
}
try:
file_path = Path(filename)
with file_path.open('w') as json_file:
rapidjson.dump(pairlist_data, json_file)
logger.info(f"Processed pairlist saved to {filename}")
except Exception as e:
logger.error(f"Error saving processed pairlist to {filename}: {e}")
def filter_pairlist(self, pairlist: List[str], tickers: Dict) -> List[str]:
"""
Filters and sorts pairlist and returns the whitelist again.
Called on each bot iteration - please use internal caching if necessary
:param pairlist: pairlist to filter or sort
:param tickers: Tickers (from exchange.get_tickers). May be cached.
:return: new whitelist
"""
rpl_pairlist = self.gen_pairlist(tickers)
merged_list = []
filtered = []
if self._mode == "whitelist":
if self._processing_mode == "filter":
merged_list = [pair for pair in pairlist if pair in rpl_pairlist]
elif self._processing_mode == "append":
merged_list = pairlist + rpl_pairlist
merged_list = sorted(set(merged_list), key=merged_list.index)
else:
for pair in pairlist:
if pair not in rpl_pairlist:
merged_list.append(pair)
else:
filtered.append(pair)
if filtered:
self.log_once(f"Blacklist - Filtered out pairs: {filtered}", logger.info)
merged_list = merged_list[:self._number_pairs]
return merged_list