Group things logically in exchange class

This commit is contained in:
Matthias 2024-03-16 17:04:48 +01:00
parent 86fe765180
commit 21bca95b6a

View File

@ -2041,13 +2041,6 @@ class Exchange:
data = sorted(data, key=lambda x: x[0])
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
def needed_candle_ms(self, timeframe: str, candle_type: CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type)
move_to = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
def _build_coroutine(
self, pair: str, timeframe: str, candle_type: CandleType,
since_ms: Optional[int], cache: bool) -> Coroutine[Any, Any, OHLCVResponse]:
@ -2136,37 +2129,6 @@ class Exchange:
self._klines[(pair, timeframe, c_type)] = ohlcv_df
return ohlcv_df
def _process_trades_df(self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = trades_list_to_df(ticks, True)
# keeping last candle time as last refreshed time of the pair
if ticks and cache:
idx = -1
# NOTE: // is floor: divides and rounds to nearest int
self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # noqa
if cache:
if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
combined_df = concat([old, trades_df], axis=0)
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
trades_df = DataFrame(trades_df_remove_duplicates(combined_df),
columns=combined_df.columns)
# Age out old candles
if first_required_candle_date:
# slice of older dates
trades_df = trades_df[
first_required_candle_date < trades_df['timestamp']]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
def refresh_latest_ohlcv(self, pair_list: ListPairsWithTimeframes, *,
since_ms: Optional[int] = None, cache: bool = True,
drop_incomplete: Optional[bool] = None
@ -2217,6 +2179,170 @@ class Exchange:
return results_df
def refresh_ohlcv_with_cache(
self,
pairs: List[PairWithTimeframe],
since_ms: int
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh ohlcv data for all pairs in needed_pairs if necessary.
Caches data with expiring per timeframe.
Should only be used for pairlists which need "on time" expirarion, and no longer cache.
"""
timeframes = {p[1] for p in pairs}
for timeframe in timeframes:
if (timeframe, since_ms) not in self._expiring_candle_cache:
timeframe_in_sec = timeframe_to_seconds(timeframe)
# Initialise cache
self._expiring_candle_cache[(timeframe, since_ms)] = PeriodicCache(
ttl=timeframe_in_sec, maxsize=1000)
# Get candles from cache
candles = {
c: self._expiring_candle_cache[(c[1], since_ms)].get(c, None) for c in pairs
if c in self._expiring_candle_cache[(c[1], since_ms)]
}
pairs_to_download = [p for p in pairs if p not in candles]
if pairs_to_download:
candles = self.refresh_latest_ohlcv(
pairs_to_download, since_ms=since_ms, cache=False
)
for c, val in candles.items():
self._expiring_candle_cache[(c[1], since_ms)][c] = val
return candles
def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec
# current,active candle open date
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
@retrier_async
async def _async_get_candle_history(
self,
pair: str,
timeframe: str,
candle_type: CandleType,
since_ms: Optional[int] = None,
) -> OHLCVResponse:
"""
Asynchronously get candle history data using fetch_ohlcv
:param candle_type: '', mark, index, premiumIndex, or funding_rate
returns tuple: (pair, timeframe, ohlcv_list)
"""
try:
# Fetch OHLCV asynchronously
s = '(' + dt_from_ts(since_ms).isoformat() + ') ' if since_ms is not None else ''
logger.debug(
"Fetching pair %s, %s, interval %s, since %s %s...",
pair, candle_type, timeframe, since_ms, s
)
params = deepcopy(self._ft_has.get('ohlcv_params', {}))
candle_limit = self.ohlcv_candle_limit(
timeframe, candle_type=candle_type, since_ms=since_ms)
if candle_type and candle_type != CandleType.SPOT:
params.update({'price': candle_type.value})
if candle_type != CandleType.FUNDING_RATE:
data = await self._api_async.fetch_ohlcv(
pair, timeframe=timeframe, since=since_ms,
limit=candle_limit, params=params)
else:
# Funding rate
data = await self._fetch_funding_rate_history(
pair=pair,
timeframe=timeframe,
limit=candle_limit,
since_ms=since_ms,
)
# Some exchanges sort OHLCV in ASC order and others in DESC.
# Ex: Bittrex returns the list of OHLCV in ASC order (oldest first, newest last)
# while GDAX returns the list of OHLCV in DESC order (newest first, oldest last)
# Only sort if necessary to save computing time
try:
if data and data[0][0] > data[-1][0]:
data = sorted(data, key=lambda x: x[0])
except IndexError:
logger.exception("Error loading %s. Result was %s.", pair, data)
return pair, timeframe, candle_type, [], self._ohlcv_partial_candle
logger.debug("Done fetching pair %s, %s interval %s...", pair, candle_type, timeframe)
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical '
f'candle (OHLCV) data. Message: {e}') from e
except ccxt.DDoSProtection as e:
raise DDosProtection(e) from e
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair} due to {e.__class__.__name__}. '
f'Message: {e}') from e
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair}. Message: {e}') from e
async def _fetch_funding_rate_history(
self,
pair: str,
timeframe: str,
limit: int,
since_ms: Optional[int] = None,
) -> List[List]:
"""
Fetch funding rate history - used to selectively override this by subclasses.
"""
# Funding rate
data = await self._api_async.fetch_funding_rate_history(
pair, since=since_ms,
limit=limit)
# Convert funding rate to candle pattern
data = [[x['timestamp'], x['fundingRate'], 0, 0, 0, 0] for x in data]
return data
# fetch Trade data stuff
def needed_candle_ms(self, timeframe: str, candle_type: CandleType):
one_call = timeframe_to_msecs(timeframe) * self.ohlcv_candle_limit(
timeframe, candle_type)
move_to = one_call * self.required_candle_call_count
now = timeframe_to_next_date(timeframe)
return int((now - timedelta(seconds=move_to // 1000)).timestamp() * 1000)
def _process_trades_df(self,
pair: str,
timeframe: str,
c_type: CandleType,
ticks: List[List],
cache: bool,
first_required_candle_date: Optional[int]) -> DataFrame:
# keeping parsed dataframe in cache
trades_df = trades_list_to_df(ticks, True)
# keeping last candle time as last refreshed time of the pair
if ticks and cache:
idx = -1
# NOTE: // is floor: divides and rounds to nearest int
self._trades_last_refresh_time[(pair, timeframe, c_type)] = trades_df['timestamp'].iat[idx] // 1000 # noqa
if cache:
if (pair, timeframe, c_type) in self._trades:
old = self._trades[(pair, timeframe, c_type)]
# Reassign so we return the updated, combined df
combined_df = concat([old, trades_df], axis=0)
logger.debug(f"Clean duplicated ticks from Trades data {pair}")
trades_df = DataFrame(trades_df_remove_duplicates(combined_df),
columns=combined_df.columns)
# Age out old candles
if first_required_candle_date:
# slice of older dates
trades_df = trades_df[
first_required_candle_date < trades_df['timestamp']]
trades_df = trades_df.reset_index(drop=True)
self._trades[(pair, timeframe, c_type)] = trades_df
return trades_df
def refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes,
data_handler: Any, # using IDataHandler ends with circular import
@ -2305,47 +2431,6 @@ class Exchange:
return results_df
def refresh_ohlcv_with_cache(
self,
pairs: List[PairWithTimeframe],
since_ms: int
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh ohlcv data for all pairs in needed_pairs if necessary.
Caches data with expiring per timeframe.
Should only be used for pairlists which need "on time" expirarion, and no longer cache.
"""
timeframes = {p[1] for p in pairs}
for timeframe in timeframes:
if (timeframe, since_ms) not in self._expiring_candle_cache:
timeframe_in_sec = timeframe_to_seconds(timeframe)
# Initialise cache
self._expiring_candle_cache[(timeframe, since_ms)] = PeriodicCache(
ttl=timeframe_in_sec, maxsize=1000)
# Get candles from cache
candles = {
c: self._expiring_candle_cache[(c[1], since_ms)].get(c, None) for c in pairs
if c in self._expiring_candle_cache[(c[1], since_ms)]
}
pairs_to_download = [p for p in pairs if p not in candles]
if pairs_to_download:
candles = self.refresh_latest_ohlcv(
pairs_to_download, since_ms=since_ms, cache=False
)
for c, val in candles.items():
self._expiring_candle_cache[(c[1], since_ms)][c] = val
return candles
def _now_is_time_to_refresh(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
plr = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec
# current,active candle open date
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
def _now_is_time_to_refresh_trades(self,
pair: str,
timeframe: str,
@ -2358,89 +2443,6 @@ class Exchange:
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
@retrier_async
async def _async_get_candle_history(
self,
pair: str,
timeframe: str,
candle_type: CandleType,
since_ms: Optional[int] = None,
) -> OHLCVResponse:
"""
Asynchronously get candle history data using fetch_ohlcv
:param candle_type: '', mark, index, premiumIndex, or funding_rate
returns tuple: (pair, timeframe, ohlcv_list)
"""
try:
# Fetch OHLCV asynchronously
s = '(' + dt_from_ts(since_ms).isoformat() + ') ' if since_ms is not None else ''
logger.debug(
"Fetching pair %s, %s, interval %s, since %s %s...",
pair, candle_type, timeframe, since_ms, s
)
params = deepcopy(self._ft_has.get('ohlcv_params', {}))
candle_limit = self.ohlcv_candle_limit(
timeframe, candle_type=candle_type, since_ms=since_ms)
if candle_type and candle_type != CandleType.SPOT:
params.update({'price': candle_type.value})
if candle_type != CandleType.FUNDING_RATE:
data = await self._api_async.fetch_ohlcv(
pair, timeframe=timeframe, since=since_ms,
limit=candle_limit, params=params)
else:
# Funding rate
data = await self._fetch_funding_rate_history(
pair=pair,
timeframe=timeframe,
limit=candle_limit,
since_ms=since_ms,
)
# Some exchanges sort OHLCV in ASC order and others in DESC.
# Ex: Bittrex returns the list of OHLCV in ASC order (oldest first, newest last)
# while GDAX returns the list of OHLCV in DESC order (newest first, oldest last)
# Only sort if necessary to save computing time
try:
if data and data[0][0] > data[-1][0]:
data = sorted(data, key=lambda x: x[0])
except IndexError:
logger.exception("Error loading %s. Result was %s.", pair, data)
return pair, timeframe, candle_type, [], self._ohlcv_partial_candle
logger.debug("Done fetching pair %s, %s interval %s...", pair, candle_type, timeframe)
return pair, timeframe, candle_type, data, self._ohlcv_partial_candle
except ccxt.NotSupported as e:
raise OperationalException(
f'Exchange {self._api.name} does not support fetching historical '
f'candle (OHLCV) data. Message: {e}') from e
except ccxt.DDoSProtection as e:
raise DDosProtection(e) from e
except (ccxt.NetworkError, ccxt.ExchangeError) as e:
raise TemporaryError(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair} due to {e.__class__.__name__}. '
f'Message: {e}') from e
except ccxt.BaseError as e:
raise OperationalException(f'Could not fetch historical candle (OHLCV) data '
f'for pair {pair}. Message: {e}') from e
async def _fetch_funding_rate_history(
self,
pair: str,
timeframe: str,
limit: int,
since_ms: Optional[int] = None,
) -> List[List]:
"""
Fetch funding rate history - used to selectively override this by subclasses.
"""
# Funding rate
data = await self._api_async.fetch_funding_rate_history(
pair, since=since_ms,
limit=limit)
# Convert funding rate to candle pattern
data = [[x['timestamp'], x['fundingRate'], 0, 0, 0, 0] for x in data]
return data
# Fetch historic trades
@retrier_async