optimize and fix issues with refresh_latest_trades

return types, timings and other issues
This commit is contained in:
Joe Schr 2023-05-02 11:35:41 +02:00
parent d96f314f16
commit 0796bfadd5

View File

@ -2283,7 +2283,7 @@ class Exchange:
# Return cached trades
for pair, timeframe, c_type in cached_pairs:
results_df[(pair, timeframe, c_type)] = self.trades(
results_df[(pair, timeframe, c_type)] = self.klines(
(pair, timeframe, c_type),
copy=False
)
@ -2307,14 +2307,14 @@ class Exchange:
use_public_trades = self._config.get(
'exchange', {}).get('use_public_trades', False)
if use_public_trades:
self._refresh_latest_trades(pair_list, data_handler, cache=cache)
return self._refresh_latest_trades(pair_list, data_handler, cache=cache)
return {}
def _refresh_latest_trades(self,
pair_list: ListPairsWithTimeframes ,
data_handler: Callable,# IDataHandler,
*,
cache: bool = True,
) -> Dict[PairWithTimeframe, DataFrame]:
"""
Refresh in-memory TRADES asynchronously and set `_trades` with the result
@ -2339,7 +2339,7 @@ class Exchange:
# b. no cache used
# c. need new data
is_in_cache = (pair, timeframe, candle_type) in self._trades
if ( not is_in_cache or not cache or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)):
if ( not is_in_cache or not cache or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type, 0)):
logger.debug(f"Refreshing TRADES data for {pair}")
# fetch trades since latest _trades and
# store together with existing trades
@ -2347,11 +2347,10 @@ class Exchange:
until = None
from_id = None
if is_in_cache:
trades = self._trades[(pair, timeframe, candle_type)]
from_id = trades.iloc[-1]['id']
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]['id']
last_candle_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), 0)
until = last_candle_refresh * 1000 if last_candle_refresh else arrow.now('UTC').int_timestamp * 1000
last_candle_refresh = self._pairs_last_refresh_time.get((pair, timeframe, candle_type), arrow.utcnow().int_timestamp)
until = last_candle_refresh * 1000
else:
next_closed_candle_time = timeframe_to_next_date(timeframe)
@ -2359,16 +2358,16 @@ class Exchange:
all_stored_ticks = data_handler.trades_load(f"{pair}-cached")
if all_stored_ticks:
if all_stored_ticks[0][0] <= first_candle_ms:
from_id = all_stored_ticks[-1][1]
# from_id overrides simce_ms
since_ms = all_stored_ticks[-1][0]
last_cached_ms = all_stored_ticks[-1][0]
# only use cached if it's closer than first_candle_ms
since_ms = last_cached_ms if last_cached_ms > first_candle_ms else first_candle_ms
# doesn't go far enough
else:
all_stored_ticks = []
# from_id overrules with exchange set to id paginate
# TODO: DEBUG:
# since_ms = 1681284338000
# since_ms = 1682609520000
# from_id = None
# TODO: /DEBUG
[ticks_pair, new_ticks]=self._download_trades_history(pair,
@ -2401,12 +2400,11 @@ class Exchange:
now = int(timeframe_to_prev_date(timeframe).timestamp())
return plr < now
def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, candle_type: CandleType) -> bool:
def _now_is_time_to_refresh_trades(self, pair: str, timeframe: str, candle_type: CandleType, refresh_earlier_seconds=5) -> bool:
# Timeframe in seconds
interval_in_sec = timeframe_to_seconds(timeframe)
plr = self._trades_last_refresh_time.get((pair, timeframe, candle_type), 0) + interval_in_sec
REFRESH_EARLIER_SECONDS = 5
return plr < arrow.utcnow().int_timestamp - REFRESH_EARLIER_SECONDS
return plr < arrow.utcnow().int_timestamp - refresh_earlier_seconds
@retrier_async
async def _async_get_candle_history(