mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
Merge branch 'develop' into feature/fetch-public-trades
This commit is contained in:
commit
c891e38f2b
|
@ -204,9 +204,10 @@ Mandatory parameters are marked as **Required**, which means that they are requi
|
|||
| `exchange.uid` | API uid to use for the exchange. Only required when you are in production mode and for exchanges that use uid for API requests.<br>**Keep it in secret, do not disclose publicly.** <br> **Datatype:** String
|
||||
| `exchange.pair_whitelist` | List of pairs to use by the bot for trading and to check for potential trades during backtesting. Supports regex pairs as `.*/BTC`. Not used by VolumePairList. [More information](plugins.md#pairlists-and-pairlist-handlers). <br> **Datatype:** List
|
||||
| `exchange.pair_blacklist` | List of pairs the bot must absolutely avoid for trading and backtesting. [More information](plugins.md#pairlists-and-pairlist-handlers). <br> **Datatype:** List
|
||||
| `exchange.ccxt_config` | Additional CCXT parameters passed to both ccxt instances (sync and async). This is usually the correct place for additional ccxt configurations. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://ccxt.readthedocs.io/en/latest/manual.html#instantiation). Please avoid adding exchange secrets here (use the dedicated fields instead), as they may be contained in logs. <br> **Datatype:** Dict
|
||||
| `exchange.ccxt_sync_config` | Additional CCXT parameters passed to the regular (sync) ccxt instance. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://ccxt.readthedocs.io/en/latest/manual.html#instantiation) <br> **Datatype:** Dict
|
||||
| `exchange.ccxt_async_config` | Additional CCXT parameters passed to the async ccxt instance. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://ccxt.readthedocs.io/en/latest/manual.html#instantiation) <br> **Datatype:** Dict
|
||||
| `exchange.ccxt_config` | Additional CCXT parameters passed to both ccxt instances (sync and async). This is usually the correct place for additional ccxt configurations. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://docs.ccxt.com/#/README?id=overriding-exchange-properties-upon-instantiation). Please avoid adding exchange secrets here (use the dedicated fields instead), as they may be contained in logs. <br> **Datatype:** Dict
|
||||
| `exchange.ccxt_sync_config` | Additional CCXT parameters passed to the regular (sync) ccxt instance. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://docs.ccxt.com/#/README?id=overriding-exchange-properties-upon-instantiation) <br> **Datatype:** Dict
|
||||
| `exchange.ccxt_async_config` | Additional CCXT parameters passed to the async ccxt instance. Parameters may differ from exchange to exchange and are documented in the [ccxt documentation](https://docs.ccxt.com/#/README?id=overriding-exchange-properties-upon-instantiation) <br> **Datatype:** Dict
|
||||
| `exchange.enable_ws` | Enable the usage of Websockets for the exchange. <br>[More information](#consuming-exchange-websockets).<br>*Defaults to `true`.* <br> **Datatype:** Boolean
|
||||
| `exchange.markets_refresh_interval` | The interval in minutes in which markets are reloaded. <br>*Defaults to `60` minutes.* <br> **Datatype:** Positive Integer
|
||||
| `exchange.skip_pair_validation` | Skip pairlist validation on startup.<br>*Defaults to `false`*<br> **Datatype:** Boolean
|
||||
| `exchange.skip_open_order_update` | Skips open order updates on startup should the exchange cause problems. Only relevant in live conditions.<br>*Defaults to `false`*<br> **Datatype:** Boolean
|
||||
|
@ -409,6 +410,8 @@ Or another example if your position adjustment assumes it can do 1 additional bu
|
|||
|
||||
--8<-- "includes/pricing.md"
|
||||
|
||||
## Further Configuration details
|
||||
|
||||
### Understand minimal_roi
|
||||
|
||||
The `minimal_roi` configuration parameter is a JSON object where the key is a duration
|
||||
|
@ -614,6 +617,30 @@ Freqtrade supports both Demo and Pro coingecko API keys.
|
|||
The Coingecko API key is NOT required for the bot to function correctly.
|
||||
It is only used for the conversion of coin to fiat in the Telegram reports, which usually also work without API key.
|
||||
|
||||
## Consuming exchange Websockets
|
||||
|
||||
Freqtrade can consume websockets through ccxt.pro.
|
||||
|
||||
Freqtrade aims ensure data is available at all times.
|
||||
Should the websocket connection fail (or be disabled), the bot will fall back to REST API calls.
|
||||
|
||||
Should you experience problems you suspect are caused by websockets, you can disable these via the setting `exchange.enable_ws`, which defaults to true.
|
||||
|
||||
```jsonc
|
||||
"exchange": {
|
||||
// ...
|
||||
"enable_ws": false,
|
||||
// ...
|
||||
}
|
||||
```
|
||||
|
||||
Should you be required to use a proxy, please refer to the [proxy section](#using-proxy-with-freqtrade) for more information.
|
||||
|
||||
!!! Info "Rollout"
|
||||
We're implementing this out slowly, ensuring stability of your bots.
|
||||
Currently, usage is limited to ohlcv data streams.
|
||||
It's also limited to a few exchanges, with new exchanges being added on an ongoing basis.
|
||||
|
||||
## Using Dry-run mode
|
||||
|
||||
We recommend starting the bot in the Dry-run mode to see how your bot will
|
||||
|
@ -702,7 +729,7 @@ You should also make sure to read the [Exchanges](exchanges.md) section of the d
|
|||
|
||||
**NEVER** share your private configuration file or your exchange keys with anyone!
|
||||
|
||||
### Using proxy with Freqtrade
|
||||
## Using a proxy with Freqtrade
|
||||
|
||||
To use a proxy with freqtrade, export your proxy settings using the variables `"HTTP_PROXY"` and `"HTTPS_PROXY"` set to the appropriate values.
|
||||
This will have the proxy settings applied to everything (telegram, coingecko, ...) **except** for exchange requests.
|
||||
|
@ -713,7 +740,7 @@ export HTTPS_PROXY="http://addr:port"
|
|||
freqtrade
|
||||
```
|
||||
|
||||
#### Proxy exchange requests
|
||||
### Proxy exchange requests
|
||||
|
||||
To use a proxy for exchange connections - you will have to define the proxies as part of the ccxt configuration.
|
||||
|
||||
|
@ -722,6 +749,7 @@ To use a proxy for exchange connections - you will have to define the proxies as
|
|||
"exchange": {
|
||||
"ccxt_config": {
|
||||
"httpsProxy": "http://addr:port",
|
||||
"wsProxy": "http://addr:port",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -558,6 +558,7 @@ CONF_SCHEMA = {
|
|||
"type": "object",
|
||||
"properties": {
|
||||
"name": {"type": "string"},
|
||||
"enable_ws": {"type": "boolean", "default": True},
|
||||
"key": {"type": "string", "default": ""},
|
||||
"secret": {"type": "string", "default": ""},
|
||||
"password": {"type": "string", "default": ""},
|
||||
|
|
|
@ -30,6 +30,7 @@ class Binance(Exchange):
|
|||
"trades_pagination_arg": "fromId",
|
||||
"trades_has_history": True,
|
||||
"l2_limit_range": [5, 10, 20, 50, 100, 500, 1000],
|
||||
"ws.enabled": True,
|
||||
}
|
||||
_ft_has_futures: Dict = {
|
||||
"stoploss_order_types": {"limit": "stop", "market": "stop_market"},
|
||||
|
@ -42,6 +43,7 @@ class Binance(Exchange):
|
|||
PriceType.LAST: "CONTRACT_PRICE",
|
||||
PriceType.MARK: "MARK_PRICE",
|
||||
},
|
||||
"ws.enabled": False,
|
||||
}
|
||||
|
||||
_supported_trading_mode_margin_pairs: List[Tuple[TradingMode, MarginMode]] = [
|
||||
|
|
|
@ -33,6 +33,7 @@ class Bybit(Exchange):
|
|||
"ohlcv_candle_limit": 1000,
|
||||
"ohlcv_has_history": True,
|
||||
"order_time_in_force": ["GTC", "FOK", "IOC", "PO"],
|
||||
"ws.enabled": True,
|
||||
"trades_has_history": False, # Endpoint doesn't support pagination
|
||||
}
|
||||
_ft_has_futures: Dict = {
|
||||
|
|
|
@ -92,6 +92,8 @@ EXCHANGE_HAS_OPTIONAL = [
|
|||
# 'fetchMarketLeverageTiers', # Futures initialization
|
||||
# 'fetchOpenOrder', 'fetchClosedOrder', # replacement for fetchOrder
|
||||
# 'fetchOpenOrders', 'fetchClosedOrders', # 'fetchOrders', # Refinding balance...
|
||||
# ccxt.pro
|
||||
"watchOHLCV",
|
||||
]
|
||||
|
||||
|
||||
|
|
|
@ -14,7 +14,7 @@ from threading import Lock
|
|||
from typing import Any, Coroutine, Dict, List, Literal, Optional, Tuple, Union
|
||||
|
||||
import ccxt
|
||||
import ccxt.async_support as ccxt_async
|
||||
import ccxt.pro as ccxt_pro
|
||||
from cachetools import TTLCache
|
||||
from ccxt import TICK_SIZE
|
||||
from dateutil import parser
|
||||
|
@ -41,7 +41,15 @@ from freqtrade.data.converter import (
|
|||
trades_dict_to_list,
|
||||
trades_list_to_df,
|
||||
)
|
||||
from freqtrade.enums import OPTIMIZE_MODES, CandleType, MarginMode, PriceType, RunMode, TradingMode
|
||||
from freqtrade.enums import (
|
||||
OPTIMIZE_MODES,
|
||||
TRADE_MODES,
|
||||
CandleType,
|
||||
MarginMode,
|
||||
PriceType,
|
||||
RunMode,
|
||||
TradingMode,
|
||||
)
|
||||
from freqtrade.exceptions import (
|
||||
ConfigurationError,
|
||||
DDosProtection,
|
||||
|
@ -63,7 +71,6 @@ from freqtrade.exchange.exchange_utils import (
|
|||
ROUND,
|
||||
ROUND_DOWN,
|
||||
ROUND_UP,
|
||||
CcxtModuleType,
|
||||
amount_to_contract_precision,
|
||||
amount_to_contracts,
|
||||
amount_to_precision,
|
||||
|
@ -80,6 +87,7 @@ from freqtrade.exchange.exchange_utils_timeframe import (
|
|||
timeframe_to_prev_date,
|
||||
timeframe_to_seconds,
|
||||
)
|
||||
from freqtrade.exchange.exchange_ws import ExchangeWS
|
||||
from freqtrade.exchange.types import OHLCVResponse, OrderBook, Ticker, Tickers
|
||||
from freqtrade.misc import (
|
||||
chunks,
|
||||
|
@ -90,7 +98,7 @@ from freqtrade.misc import (
|
|||
)
|
||||
from freqtrade.plugins.pairlist.pairlist_helpers import expand_pairlist
|
||||
from freqtrade.util import dt_from_ts, dt_now
|
||||
from freqtrade.util.datetime_helpers import dt_humanize_delta, dt_ts
|
||||
from freqtrade.util.datetime_helpers import dt_humanize_delta, dt_ts, format_ms_time
|
||||
from freqtrade.util.periodic_cache import PeriodicCache
|
||||
|
||||
|
||||
|
@ -138,6 +146,7 @@ class Exchange:
|
|||
"marketOrderRequiresPrice": False,
|
||||
"exchange_has_overrides": {}, # Dictionary overriding ccxt's "has".
|
||||
# Expected to be in the format {"fetchOHLCV": True} or {"fetchOHLCV": False}
|
||||
"ws.enabled": False, # Set to true for exchanges with tested websocket support
|
||||
}
|
||||
_ft_has: Dict = {}
|
||||
_ft_has_futures: Dict = {}
|
||||
|
@ -160,7 +169,9 @@ class Exchange:
|
|||
:return: None
|
||||
"""
|
||||
self._api: ccxt.Exchange
|
||||
self._api_async: ccxt_async.Exchange
|
||||
self._api_async: ccxt_pro.Exchange
|
||||
self._ws_async: ccxt_pro.Exchange = None
|
||||
self._exchange_ws: Optional[ExchangeWS] = None
|
||||
self._markets: Dict = {}
|
||||
self._trading_fees: Dict[str, Any] = {}
|
||||
self._leverage_tiers: Dict[str, List[Dict]] = {}
|
||||
|
@ -232,7 +243,7 @@ class Exchange:
|
|||
ccxt_config = deep_merge_dicts(exchange_conf.get("ccxt_config", {}), ccxt_config)
|
||||
ccxt_config = deep_merge_dicts(exchange_conf.get("ccxt_sync_config", {}), ccxt_config)
|
||||
|
||||
self._api = self._init_ccxt(exchange_conf, ccxt_kwargs=ccxt_config)
|
||||
self._api = self._init_ccxt(exchange_conf, True, ccxt_config)
|
||||
|
||||
ccxt_async_config = self._ccxt_config
|
||||
ccxt_async_config = deep_merge_dicts(
|
||||
|
@ -241,7 +252,15 @@ class Exchange:
|
|||
ccxt_async_config = deep_merge_dicts(
|
||||
exchange_conf.get("ccxt_async_config", {}), ccxt_async_config
|
||||
)
|
||||
self._api_async = self._init_ccxt(exchange_conf, ccxt_async, ccxt_kwargs=ccxt_async_config)
|
||||
self._api_async = self._init_ccxt(exchange_conf, False, ccxt_async_config)
|
||||
self._has_watch_ohlcv = self.exchange_has("watchOHLCV") and self._ft_has["ws.enabled"]
|
||||
if (
|
||||
self._config["runmode"] in TRADE_MODES
|
||||
and exchange_conf.get("enable_ws", True)
|
||||
and self._has_watch_ohlcv
|
||||
):
|
||||
self._ws_async = self._init_ccxt(exchange_conf, False, ccxt_async_config)
|
||||
self._exchange_ws = ExchangeWS(self._config, self._ws_async)
|
||||
|
||||
logger.info(f'Using Exchange "{self.name}"')
|
||||
self.required_candle_call_count = 1
|
||||
|
@ -270,6 +289,8 @@ class Exchange:
|
|||
self.close()
|
||||
|
||||
def close(self):
|
||||
if self._exchange_ws:
|
||||
self._exchange_ws.cleanup()
|
||||
logger.debug("Exchange object destroyed, closing async loop")
|
||||
if (
|
||||
getattr(self, "_api_async", None)
|
||||
|
@ -278,6 +299,14 @@ class Exchange:
|
|||
):
|
||||
logger.debug("Closing async ccxt session.")
|
||||
self.loop.run_until_complete(self._api_async.close())
|
||||
if (
|
||||
self._ws_async
|
||||
and inspect.iscoroutinefunction(self._ws_async.close)
|
||||
and self._ws_async.session
|
||||
):
|
||||
logger.debug("Closing ws ccxt session.")
|
||||
self.loop.run_until_complete(self._ws_async.close())
|
||||
|
||||
if self.loop and not self.loop.is_closed():
|
||||
self.loop.close()
|
||||
|
||||
|
@ -301,18 +330,22 @@ class Exchange:
|
|||
self.validate_pricing(config["entry_pricing"])
|
||||
|
||||
def _init_ccxt(
|
||||
self,
|
||||
exchange_config: Dict[str, Any],
|
||||
ccxt_module: CcxtModuleType = ccxt,
|
||||
*,
|
||||
ccxt_kwargs: Dict,
|
||||
self, exchange_config: Dict[str, Any], sync: bool, ccxt_kwargs: Dict[str, Any]
|
||||
) -> ccxt.Exchange:
|
||||
"""
|
||||
Initialize ccxt with given config and return valid
|
||||
ccxt instance.
|
||||
Initialize ccxt with given config and return valid ccxt instance.
|
||||
"""
|
||||
# Find matching class for the given exchange name
|
||||
name = exchange_config["name"]
|
||||
if sync:
|
||||
ccxt_module = ccxt
|
||||
else:
|
||||
ccxt_module = ccxt_pro
|
||||
if not is_exchange_known_ccxt(name, ccxt_module):
|
||||
# Fall back to async if pro doesn't support this exchange
|
||||
import ccxt.async_support as ccxt_async
|
||||
|
||||
ccxt_module = ccxt_async
|
||||
|
||||
if not is_exchange_known_ccxt(name, ccxt_module):
|
||||
raise OperationalException(f"Exchange {name} is not supported by ccxt")
|
||||
|
@ -553,6 +586,13 @@ class Exchange:
|
|||
amount, self.get_precision_amount(pair), self.precisionMode, contract_size
|
||||
)
|
||||
|
||||
def ws_connection_reset(self):
|
||||
"""
|
||||
called at regular intervals to reset the websocket connection
|
||||
"""
|
||||
if self._exchange_ws:
|
||||
self._exchange_ws.reset_connections()
|
||||
|
||||
def _load_async_markets(self, reload: bool = False) -> Dict[str, Any]:
|
||||
try:
|
||||
markets = self.loop.run_until_complete(
|
||||
|
@ -584,6 +624,9 @@ class Exchange:
|
|||
# Reload async markets, then assign them to sync api
|
||||
self._markets = self._load_async_markets(reload=True)
|
||||
self._api.set_markets(self._api_async.markets, self._api_async.currencies)
|
||||
if self._exchange_ws:
|
||||
# Set markets to avoid reloading on websocket api
|
||||
self._ws_async.set_markets(self._api.markets, self._api.currencies)
|
||||
self._last_markets_refresh = dt_ts()
|
||||
|
||||
if is_initial and self._ft_has["needs_trading_fees"]:
|
||||
|
@ -817,7 +860,7 @@ class Exchange:
|
|||
"""
|
||||
if endpoint in self._ft_has.get("exchange_has_overrides", {}):
|
||||
return self._ft_has["exchange_has_overrides"][endpoint]
|
||||
return endpoint in self._api.has and self._api.has[endpoint]
|
||||
return endpoint in self._api_async.has and self._api_async.has[endpoint]
|
||||
|
||||
def get_precision_amount(self, pair: str) -> Optional[float]:
|
||||
"""
|
||||
|
@ -2250,9 +2293,40 @@ class Exchange:
|
|||
cache: bool,
|
||||
) -> Coroutine[Any, Any, OHLCVResponse]:
|
||||
not_all_data = cache and self.required_candle_call_count > 1
|
||||
if cache and candle_type in (CandleType.SPOT, CandleType.FUTURES):
|
||||
if self._has_watch_ohlcv and self._exchange_ws:
|
||||
# Subscribe to websocket
|
||||
self._exchange_ws.schedule_ohlcv(pair, timeframe, candle_type)
|
||||
|
||||
if cache and (pair, timeframe, candle_type) in self._klines:
|
||||
candle_limit = self.ohlcv_candle_limit(timeframe, candle_type)
|
||||
min_date = date_minus_candles(timeframe, candle_limit - 5).timestamp()
|
||||
min_date = int(date_minus_candles(timeframe, candle_limit - 5).timestamp())
|
||||
|
||||
if self._exchange_ws:
|
||||
candle_date = int(timeframe_to_prev_date(timeframe).timestamp() * 1000)
|
||||
prev_candle_date = int(date_minus_candles(timeframe, 1).timestamp() * 1000)
|
||||
candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
|
||||
half_candle = int(candle_date - (candle_date - prev_candle_date) * 0.5)
|
||||
last_refresh_time = int(
|
||||
self._exchange_ws.klines_last_refresh.get((pair, timeframe, candle_type), 0)
|
||||
)
|
||||
|
||||
if (
|
||||
candles
|
||||
and candles[-1][0] >= prev_candle_date
|
||||
and last_refresh_time >= half_candle
|
||||
):
|
||||
# Usable result, candle contains the previous candle.
|
||||
# Also, we check if the last refresh time is no more than half the candle ago.
|
||||
logger.debug(f"reuse watch result for {pair}, {timeframe}, {last_refresh_time}")
|
||||
|
||||
return self._exchange_ws.get_ohlcv(pair, timeframe, candle_type, candle_date)
|
||||
logger.info(
|
||||
f"Failed to reuse watch {pair}, {timeframe}, {candle_date < last_refresh_time},"
|
||||
f" {candle_date}, {last_refresh_time}, "
|
||||
f"{format_ms_time(candle_date)}, {format_ms_time(last_refresh_time)} "
|
||||
)
|
||||
|
||||
# Check if 1 call can get us updated candles without hole in the data.
|
||||
if min_date < self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0):
|
||||
# Cache can be used - do one-off call.
|
||||
|
@ -2285,7 +2359,7 @@ class Exchange:
|
|||
|
||||
def _build_ohlcv_dl_jobs(
|
||||
self, pair_list: ListPairsWithTimeframes, since_ms: Optional[int], cache: bool
|
||||
) -> Tuple[List[Coroutine], List[Tuple[str, str, CandleType]]]:
|
||||
) -> Tuple[List[Coroutine], List[PairWithTimeframe]]:
|
||||
"""
|
||||
Build Coroutines to execute as part of refresh_latest_ohlcv
|
||||
"""
|
||||
|
|
|
@ -58,7 +58,10 @@ def validate_exchange(exchange: str) -> Tuple[bool, str, bool]:
|
|||
returns: can_use, reason
|
||||
with Reason including both missing and missing_opt
|
||||
"""
|
||||
ex_mod = getattr(ccxt, exchange.lower())()
|
||||
try:
|
||||
ex_mod = getattr(ccxt.pro, exchange.lower())()
|
||||
except AttributeError:
|
||||
ex_mod = getattr(ccxt.async_support, exchange.lower())()
|
||||
|
||||
if not ex_mod or not ex_mod.has:
|
||||
return False, "", False
|
||||
|
|
186
freqtrade/exchange/exchange_ws.py
Normal file
186
freqtrade/exchange/exchange_ws.py
Normal file
|
@ -0,0 +1,186 @@
|
|||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from copy import deepcopy
|
||||
from functools import partial
|
||||
from threading import Thread
|
||||
from typing import Dict, Set
|
||||
|
||||
import ccxt
|
||||
|
||||
from freqtrade.constants import Config, PairWithTimeframe
|
||||
from freqtrade.enums.candletype import CandleType
|
||||
from freqtrade.exchange.exchange import timeframe_to_seconds
|
||||
from freqtrade.exchange.types import OHLCVResponse
|
||||
from freqtrade.util import dt_ts, format_ms_time
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ExchangeWS:
|
||||
def __init__(self, config: Config, ccxt_object: ccxt.Exchange) -> None:
|
||||
self.config = config
|
||||
self.ccxt_object = ccxt_object
|
||||
self._background_tasks: Set[asyncio.Task] = set()
|
||||
|
||||
self._klines_watching: Set[PairWithTimeframe] = set()
|
||||
self._klines_scheduled: Set[PairWithTimeframe] = set()
|
||||
self.klines_last_refresh: Dict[PairWithTimeframe, float] = {}
|
||||
self.klines_last_request: Dict[PairWithTimeframe, float] = {}
|
||||
self._thread = Thread(name="ccxt_ws", target=self._start_forever)
|
||||
self._thread.start()
|
||||
self.__cleanup_called = False
|
||||
|
||||
def _start_forever(self) -> None:
|
||||
self._loop = asyncio.new_event_loop()
|
||||
try:
|
||||
self._loop.run_forever()
|
||||
finally:
|
||||
if self._loop.is_running():
|
||||
self._loop.stop()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
logger.debug("Cleanup called - stopping")
|
||||
self._klines_watching.clear()
|
||||
for task in self._background_tasks:
|
||||
task.cancel()
|
||||
if hasattr(self, "_loop") and not self._loop.is_closed():
|
||||
self.reset_connections()
|
||||
|
||||
self._loop.call_soon_threadsafe(self._loop.stop)
|
||||
time.sleep(0.1)
|
||||
if not self._loop.is_closed():
|
||||
self._loop.close()
|
||||
|
||||
self._thread.join()
|
||||
logger.debug("Stopped")
|
||||
|
||||
def reset_connections(self) -> None:
|
||||
"""
|
||||
Reset all connections - avoids "connection-reset" errors that happen after ~9 days
|
||||
"""
|
||||
if hasattr(self, "_loop") and not self._loop.is_closed():
|
||||
logger.info("Resetting WS connections.")
|
||||
asyncio.run_coroutine_threadsafe(self._cleanup_async(), loop=self._loop)
|
||||
while not self.__cleanup_called:
|
||||
time.sleep(0.1)
|
||||
self.__cleanup_called = False
|
||||
|
||||
async def _cleanup_async(self) -> None:
|
||||
try:
|
||||
await self.ccxt_object.close()
|
||||
# Clear the cache.
|
||||
# Not doing this will cause problems on startup with dynamic pairlists
|
||||
self.ccxt_object.ohlcvs.clear()
|
||||
except Exception:
|
||||
logger.exception("Exception in _cleanup_async")
|
||||
finally:
|
||||
self.__cleanup_called = True
|
||||
|
||||
def cleanup_expired(self) -> None:
|
||||
"""
|
||||
Remove pairs from watchlist if they've not been requested within
|
||||
the last timeframe (+ offset)
|
||||
"""
|
||||
changed = False
|
||||
for p in list(self._klines_watching):
|
||||
_, timeframe, _ = p
|
||||
timeframe_s = timeframe_to_seconds(timeframe)
|
||||
last_refresh = self.klines_last_request.get(p, 0)
|
||||
if last_refresh > 0 and (dt_ts() - last_refresh) > ((timeframe_s + 20) * 1000):
|
||||
logger.info(f"Removing {p} from watchlist")
|
||||
self._klines_watching.discard(p)
|
||||
changed = True
|
||||
if changed:
|
||||
logger.info(f"Removal done: new watch list ({len(self._klines_watching)})")
|
||||
|
||||
async def _schedule_while_true(self) -> None:
|
||||
# For the ones we should be watching
|
||||
for p in self._klines_watching:
|
||||
# Check if they're already scheduled
|
||||
if p not in self._klines_scheduled:
|
||||
self._klines_scheduled.add(p)
|
||||
pair, timeframe, candle_type = p
|
||||
task = asyncio.create_task(
|
||||
self._continuously_async_watch_ohlcv(pair, timeframe, candle_type)
|
||||
)
|
||||
self._background_tasks.add(task)
|
||||
task.add_done_callback(
|
||||
partial(
|
||||
self._continuous_stopped,
|
||||
pair=pair,
|
||||
timeframe=timeframe,
|
||||
candle_type=candle_type,
|
||||
)
|
||||
)
|
||||
|
||||
def _continuous_stopped(
|
||||
self, task: asyncio.Task, pair: str, timeframe: str, candle_type: CandleType
|
||||
):
|
||||
self._background_tasks.discard(task)
|
||||
result = "done"
|
||||
if task.cancelled():
|
||||
result = "cancelled"
|
||||
else:
|
||||
if (result1 := task.result()) is not None:
|
||||
result = str(result1)
|
||||
|
||||
logger.info(f"{pair}, {timeframe}, {candle_type} - Task finished - {result}")
|
||||
self._klines_scheduled.discard((pair, timeframe, candle_type))
|
||||
|
||||
async def _continuously_async_watch_ohlcv(
|
||||
self, pair: str, timeframe: str, candle_type: CandleType
|
||||
) -> None:
|
||||
try:
|
||||
while (pair, timeframe, candle_type) in self._klines_watching:
|
||||
start = dt_ts()
|
||||
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
|
||||
self.klines_last_refresh[(pair, timeframe, candle_type)] = dt_ts()
|
||||
logger.debug(
|
||||
f"watch done {pair}, {timeframe}, data {len(data)} "
|
||||
f"in {dt_ts() - start:.2f}s"
|
||||
)
|
||||
except ccxt.ExchangeClosedByUser:
|
||||
logger.debug("Exchange connection closed by user")
|
||||
except ccxt.BaseError:
|
||||
logger.exception(f"Exception in continuously_async_watch_ohlcv for {pair}, {timeframe}")
|
||||
finally:
|
||||
self._klines_watching.discard((pair, timeframe, candle_type))
|
||||
|
||||
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||
"""
|
||||
Schedule a pair/timeframe combination to be watched
|
||||
"""
|
||||
self._klines_watching.add((pair, timeframe, candle_type))
|
||||
self.klines_last_request[(pair, timeframe, candle_type)] = dt_ts()
|
||||
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
||||
asyncio.run_coroutine_threadsafe(self._schedule_while_true(), loop=self._loop)
|
||||
self.cleanup_expired()
|
||||
|
||||
async def get_ohlcv(
|
||||
self,
|
||||
pair: str,
|
||||
timeframe: str,
|
||||
candle_type: CandleType,
|
||||
candle_date: int,
|
||||
) -> OHLCVResponse:
|
||||
"""
|
||||
Returns cached klines from ccxt's "watch" cache.
|
||||
:param candle_date: timestamp of the end-time of the candle.
|
||||
"""
|
||||
# Deepcopy the response - as it might be modified in the background as new messages arrive
|
||||
candles = deepcopy(self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe))
|
||||
refresh_date = self.klines_last_refresh[(pair, timeframe, candle_type)]
|
||||
drop_hint = False
|
||||
if refresh_date > candle_date:
|
||||
# Refreshed after candle was complete.
|
||||
# logger.info(f"{candles[-1][0]} >= {candle_date}")
|
||||
drop_hint = candles[-1][0] >= candle_date
|
||||
logger.debug(
|
||||
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
|
||||
f"{format_ms_time(candles[-1][0])}, "
|
||||
f"lref={format_ms_time(refresh_date)}, "
|
||||
f"candle_date={format_ms_time(candle_date)}, {drop_hint=}"
|
||||
)
|
||||
return pair, timeframe, candle_type, candles, drop_hint
|
|
@ -168,6 +168,8 @@ class FreqtradeBot(LoggingMixin):
|
|||
t = str(time(time_slot, minutes, 2))
|
||||
self._schedule.every().day.at(t).do(update)
|
||||
|
||||
self._schedule.every().day.at("00:02").do(self.exchange.ws_connection_reset)
|
||||
|
||||
self.strategy.ft_bot_start()
|
||||
# Initialize protections AFTER bot start - otherwise parameters are not loaded.
|
||||
self.protections = ProtectionManager(self.config, self.strategy.protections)
|
||||
|
@ -289,8 +291,7 @@ class FreqtradeBot(LoggingMixin):
|
|||
# Then looking for entry opportunities
|
||||
if self.get_free_open_trades():
|
||||
self.enter_positions()
|
||||
if self.trading_mode == TradingMode.FUTURES:
|
||||
self._schedule.run_pending()
|
||||
self._schedule.run_pending()
|
||||
Trade.commit()
|
||||
self.rpc.process_msg_queue(self.dataprovider._msg_queue)
|
||||
self.last_process = datetime.now(timezone.utc)
|
||||
|
|
|
@ -15,6 +15,7 @@ pytest-asyncio==0.23.7
|
|||
pytest-cov==5.0.0
|
||||
pytest-mock==3.14.0
|
||||
pytest-random-order==1.1.1
|
||||
pytest-timeout==2.3.1
|
||||
pytest-xdist==3.6.1
|
||||
isort==5.13.2
|
||||
# For datetime mocking
|
||||
|
|
|
@ -587,6 +587,7 @@ def get_default_conf(testdatadir):
|
|||
"exchange": {
|
||||
"name": "binance",
|
||||
"key": "key",
|
||||
"enable_ws": False,
|
||||
"secret": "secret",
|
||||
"pair_whitelist": ["ETH/BTC", "LTC/BTC", "XRP/BTC", "NEO/BTC"],
|
||||
"pair_blacklist": [
|
||||
|
@ -629,6 +630,7 @@ def get_default_conf_usdt(testdatadir):
|
|||
"name": "binance",
|
||||
"enabled": True,
|
||||
"key": "key",
|
||||
"enable_ws": False,
|
||||
"secret": "secret",
|
||||
"pair_whitelist": [
|
||||
"ETH/USDT",
|
||||
|
|
69
tests/exchange/test_exchange_ws.py
Normal file
69
tests/exchange/test_exchange_ws.py
Normal file
|
@ -0,0 +1,69 @@
|
|||
import asyncio
|
||||
import threading
|
||||
from time import sleep
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
from freqtrade.enums import CandleType
|
||||
from freqtrade.exchange.exchange_ws import ExchangeWS
|
||||
|
||||
|
||||
def test_exchangews_init(mocker):
|
||||
config = MagicMock()
|
||||
ccxt_object = MagicMock()
|
||||
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
|
||||
|
||||
exchange_ws = ExchangeWS(config, ccxt_object)
|
||||
sleep(0.1)
|
||||
|
||||
assert exchange_ws.config == config
|
||||
assert exchange_ws.ccxt_object == ccxt_object
|
||||
assert exchange_ws._thread.name == "ccxt_ws"
|
||||
assert exchange_ws._background_tasks == set()
|
||||
assert exchange_ws._klines_watching == set()
|
||||
assert exchange_ws._klines_scheduled == set()
|
||||
assert exchange_ws.klines_last_refresh == {}
|
||||
assert exchange_ws.klines_last_request == {}
|
||||
# Cleanup
|
||||
exchange_ws.cleanup()
|
||||
|
||||
|
||||
def patch_eventloop_threading(exchange):
|
||||
is_init = False
|
||||
|
||||
def thread_fuck():
|
||||
nonlocal is_init
|
||||
exchange._loop = asyncio.new_event_loop()
|
||||
is_init = True
|
||||
exchange._loop.run_forever()
|
||||
|
||||
x = threading.Thread(target=thread_fuck, daemon=True)
|
||||
x.start()
|
||||
while not is_init:
|
||||
pass
|
||||
|
||||
|
||||
async def test_exchangews_ohlcv(mocker):
|
||||
config = MagicMock()
|
||||
ccxt_object = MagicMock()
|
||||
ccxt_object.watch_ohlcv = AsyncMock()
|
||||
ccxt_object.close = AsyncMock()
|
||||
mocker.patch("freqtrade.exchange.exchange_ws.ExchangeWS._start_forever", MagicMock())
|
||||
|
||||
exchange_ws = ExchangeWS(config, ccxt_object)
|
||||
patch_eventloop_threading(exchange_ws)
|
||||
try:
|
||||
assert exchange_ws._klines_watching == set()
|
||||
assert exchange_ws._klines_scheduled == set()
|
||||
|
||||
exchange_ws.schedule_ohlcv("ETH/BTC", "1m", CandleType.SPOT)
|
||||
asyncio.sleep(0.5)
|
||||
|
||||
assert exchange_ws._klines_watching == {("ETH/BTC", "1m", CandleType.SPOT)}
|
||||
assert exchange_ws._klines_scheduled == {("ETH/BTC", "1m", CandleType.SPOT)}
|
||||
asyncio.sleep(0.1)
|
||||
assert ccxt_object.watch_ohlcv.call_count == 1
|
||||
except Exception as e:
|
||||
print(e)
|
||||
finally:
|
||||
# Cleanup
|
||||
exchange_ws.cleanup()
|
|
@ -11,6 +11,8 @@ from tests.conftest import EXMS, get_default_conf_usdt
|
|||
|
||||
|
||||
EXCHANGE_FIXTURE_TYPE = Tuple[Exchange, str]
|
||||
EXCHANGE_WS_FIXTURE_TYPE = Tuple[Exchange, str, str]
|
||||
|
||||
|
||||
# Exchanges that should be tested online
|
||||
EXCHANGES = {
|
||||
|
@ -360,6 +362,7 @@ def set_test_proxy(config: Config, use_proxy: bool) -> Config:
|
|||
config1 = deepcopy(config)
|
||||
config1["exchange"]["ccxt_config"] = {
|
||||
"httpsProxy": proxy,
|
||||
"wsProxy": proxy,
|
||||
}
|
||||
return config1
|
||||
|
||||
|
@ -376,7 +379,7 @@ def get_exchange(exchange_name, exchange_conf):
|
|||
exchange_conf, validate=True, load_leverage_tiers=True
|
||||
)
|
||||
|
||||
yield exchange, exchange_name
|
||||
return exchange, exchange_name
|
||||
|
||||
|
||||
def get_futures_exchange(exchange_name, exchange_conf, class_mocker):
|
||||
|
@ -398,15 +401,41 @@ def get_futures_exchange(exchange_name, exchange_conf, class_mocker):
|
|||
class_mocker.patch(f"{EXMS}.load_cached_leverage_tiers", return_value=None)
|
||||
class_mocker.patch(f"{EXMS}.cache_leverage_tiers")
|
||||
|
||||
yield from get_exchange(exchange_name, exchange_conf)
|
||||
return get_exchange(exchange_name, exchange_conf)
|
||||
|
||||
|
||||
@pytest.fixture(params=EXCHANGES, scope="class")
|
||||
def exchange(request, exchange_conf, class_mocker):
|
||||
class_mocker.patch("freqtrade.exchange.bybit.Bybit.additional_exchange_init")
|
||||
yield from get_exchange(request.param, exchange_conf)
|
||||
return get_exchange(request.param, exchange_conf)
|
||||
|
||||
|
||||
@pytest.fixture(params=EXCHANGES, scope="class")
|
||||
def exchange_futures(request, exchange_conf, class_mocker):
|
||||
yield from get_futures_exchange(request.param, exchange_conf, class_mocker)
|
||||
return get_futures_exchange(request.param, exchange_conf, class_mocker)
|
||||
|
||||
|
||||
@pytest.fixture(params=["spot", "futures"], scope="class")
|
||||
def exchange_mode(request):
|
||||
return request.param
|
||||
|
||||
|
||||
@pytest.fixture(params=EXCHANGES, scope="class")
|
||||
def exchange_ws(request, exchange_conf, exchange_mode, class_mocker):
|
||||
class_mocker.patch("freqtrade.exchange.bybit.Bybit.additional_exchange_init")
|
||||
exchange_conf["exchange"]["enable_ws"] = True
|
||||
if exchange_mode == "spot":
|
||||
exchange, name = get_exchange(request.param, exchange_conf)
|
||||
pair = EXCHANGES[request.param]["pair"]
|
||||
elif EXCHANGES[request.param].get("futures"):
|
||||
exchange, name = get_futures_exchange(
|
||||
request.param, exchange_conf, class_mocker=class_mocker
|
||||
)
|
||||
pair = EXCHANGES[request.param]["futures_pair"]
|
||||
else:
|
||||
pytest.skip("Exchange does not support futures.")
|
||||
|
||||
if not exchange._has_watch_ohlcv:
|
||||
pytest.skip("Exchange does not support watch_ohlcv.")
|
||||
yield exchange, name, pair
|
||||
exchange.close()
|
||||
|
|
64
tests/exchange_online/test_ccxt_ws_compat.py
Normal file
64
tests/exchange_online/test_ccxt_ws_compat.py
Normal file
|
@ -0,0 +1,64 @@
|
|||
"""
|
||||
Tests in this file do NOT mock network calls, so they are expected to be fluky at times.
|
||||
|
||||
However, these tests aim to test ccxt compatibility, specifically regarding websockets.
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import timedelta
|
||||
from time import sleep
|
||||
|
||||
import pytest
|
||||
|
||||
from freqtrade.enums import CandleType
|
||||
from freqtrade.exchange.exchange_utils import timeframe_to_prev_date
|
||||
from freqtrade.loggers.set_log_levels import set_loggers
|
||||
from freqtrade.util.datetime_helpers import dt_now
|
||||
from tests.conftest import log_has_re
|
||||
from tests.exchange_online.conftest import EXCHANGE_WS_FIXTURE_TYPE
|
||||
|
||||
|
||||
@pytest.mark.longrun
|
||||
@pytest.mark.timeout(3 * 60)
|
||||
class TestCCXTExchangeWs:
|
||||
def test_ccxt_ohlcv(self, exchange_ws: EXCHANGE_WS_FIXTURE_TYPE, caplog, mocker):
|
||||
exch, exchangename, pair = exchange_ws
|
||||
|
||||
assert exch._ws_async is not None
|
||||
timeframe = "1m"
|
||||
pair_tf = (pair, timeframe, CandleType.SPOT)
|
||||
m_hist = mocker.spy(exch, "_async_get_historic_ohlcv")
|
||||
m_cand = mocker.spy(exch, "_async_get_candle_history")
|
||||
|
||||
res = exch.refresh_latest_ohlcv([pair_tf])
|
||||
assert m_cand.call_count == 1
|
||||
|
||||
# Currently open candle
|
||||
next_candle = timeframe_to_prev_date(timeframe, dt_now())
|
||||
now = next_candle - timedelta(seconds=1)
|
||||
# Currently closed candle
|
||||
curr_candle = timeframe_to_prev_date(timeframe, now)
|
||||
|
||||
assert pair_tf in exch._exchange_ws._klines_watching
|
||||
assert pair_tf in exch._exchange_ws._klines_scheduled
|
||||
assert res[pair_tf] is not None
|
||||
df1 = res[pair_tf]
|
||||
caplog.set_level(logging.DEBUG)
|
||||
set_loggers(1)
|
||||
assert df1.iloc[-1]["date"] == curr_candle
|
||||
|
||||
# Wait until the next candle (might be up to 1 minute).
|
||||
while True:
|
||||
caplog.clear()
|
||||
res = exch.refresh_latest_ohlcv([pair_tf])
|
||||
df2 = res[pair_tf]
|
||||
assert df2 is not None
|
||||
if df2.iloc[-1]["date"] == next_candle:
|
||||
break
|
||||
assert df2.iloc[-1]["date"] == curr_candle
|
||||
sleep(1)
|
||||
|
||||
assert m_hist.call_count == 0
|
||||
# shouldn't have tried fetch_ohlcv a second time.
|
||||
assert m_cand.call_count == 1
|
||||
assert log_has_re(r"watch result.*", caplog)
|
|
@ -2176,7 +2176,7 @@ def test_api_exchanges(botclient):
|
|||
"valid": True,
|
||||
"supported": False,
|
||||
"dex": True,
|
||||
"comment": "",
|
||||
"comment": ANY,
|
||||
"trade_modes": [{"trading_mode": "spot", "margin_mode": ""}],
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user