mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-09-20 09:31:12 +00:00
feat: move trades-refresh to async
This commit is contained in:
parent
abef8e376c
commit
a840969512
|
@ -2678,6 +2678,94 @@ class Exchange:
|
|||
self._trades[(pair, timeframe, c_type)] = trades_df
|
||||
return trades_df
|
||||
|
||||
async def _build_trades_dl_jobs(
|
||||
self, pairwt: PairWithTimeframe, data_handler, cache: bool
|
||||
) -> Tuple[PairWithTimeframe, Optional[DataFrame]]:
|
||||
"""
|
||||
Build coroutines to refresh trades for (they're then called through async.gather)
|
||||
"""
|
||||
pair, timeframe, candle_type = pairwt
|
||||
since_ms = None
|
||||
new_ticks: List = []
|
||||
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
|
||||
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
|
||||
# refresh, if
|
||||
# a. not in _trades
|
||||
# b. no cache used
|
||||
# c. need new data
|
||||
is_in_cache = (pair, timeframe, candle_type) in self._trades
|
||||
if (
|
||||
not is_in_cache
|
||||
or not cache
|
||||
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
|
||||
):
|
||||
logger.debug(f"Refreshing TRADES data for {pair}")
|
||||
# fetch trades since latest _trades and
|
||||
# store together with existing trades
|
||||
try:
|
||||
until = None
|
||||
from_id = None
|
||||
if is_in_cache:
|
||||
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
|
||||
until = dt_ts() # now
|
||||
|
||||
else:
|
||||
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
|
||||
all_stored_ticks_df = data_handler.trades_load(
|
||||
f"{pair}-cached", self.trading_mode
|
||||
)
|
||||
|
||||
if not all_stored_ticks_df.empty:
|
||||
if (
|
||||
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms
|
||||
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
|
||||
):
|
||||
# Use cache and populate further
|
||||
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
|
||||
from_id = all_stored_ticks_df.iloc[-1]["id"]
|
||||
# only use cached if it's closer than first_candle_ms
|
||||
since_ms = (
|
||||
last_cached_ms
|
||||
if last_cached_ms > first_candle_ms
|
||||
else first_candle_ms
|
||||
)
|
||||
else:
|
||||
# Skip cache, it's too old
|
||||
all_stored_ticks_df = DataFrame(
|
||||
columns=DEFAULT_TRADES_COLUMNS + ["date"]
|
||||
)
|
||||
|
||||
# from_id overrules with exchange set to id paginate
|
||||
[_, new_ticks] = await self._async_get_trade_history(
|
||||
pair,
|
||||
since=since_ms if since_ms else first_candle_ms,
|
||||
until=until,
|
||||
from_id=from_id,
|
||||
)
|
||||
|
||||
except Exception:
|
||||
logger.exception(f"Refreshing TRADES data for {pair} failed")
|
||||
return pairwt, None
|
||||
|
||||
if new_ticks:
|
||||
all_stored_ticks_list = all_stored_ticks_df[DEFAULT_TRADES_COLUMNS].values.tolist()
|
||||
all_stored_ticks_list.extend(new_ticks)
|
||||
trades_df = self._process_trades_df(
|
||||
pair,
|
||||
timeframe,
|
||||
candle_type,
|
||||
all_stored_ticks_list,
|
||||
cache,
|
||||
first_required_candle_date=first_candle_ms,
|
||||
)
|
||||
data_handler.trades_store(
|
||||
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
|
||||
)
|
||||
return pairwt, trades_df
|
||||
else:
|
||||
logger.error(f"No new ticks for {pair}")
|
||||
return pairwt, None
|
||||
|
||||
def refresh_latest_trades(
|
||||
self,
|
||||
pair_list: ListPairsWithTimeframes,
|
||||
|
@ -2698,90 +2786,25 @@ class Exchange:
|
|||
self._config["datadir"], data_format=self._config["dataformat_trades"]
|
||||
)
|
||||
logger.debug("Refreshing TRADES data for %d pairs", len(pair_list))
|
||||
since_ms = None
|
||||
results_df = {}
|
||||
for pair, timeframe, candle_type in set(pair_list):
|
||||
new_ticks: List = []
|
||||
all_stored_ticks_df = DataFrame(columns=DEFAULT_TRADES_COLUMNS + ["date"])
|
||||
first_candle_ms = self.needed_candle_for_trades_ms(timeframe, candle_type)
|
||||
# refresh, if
|
||||
# a. not in _trades
|
||||
# b. no cache used
|
||||
# c. need new data
|
||||
is_in_cache = (pair, timeframe, candle_type) in self._trades
|
||||
if (
|
||||
not is_in_cache
|
||||
or not cache
|
||||
or self._now_is_time_to_refresh_trades(pair, timeframe, candle_type)
|
||||
):
|
||||
logger.debug(f"Refreshing TRADES data for {pair}")
|
||||
# fetch trades since latest _trades and
|
||||
# store together with existing trades
|
||||
try:
|
||||
until = None
|
||||
from_id = None
|
||||
if is_in_cache:
|
||||
from_id = self._trades[(pair, timeframe, candle_type)].iloc[-1]["id"]
|
||||
until = dt_ts() # now
|
||||
coros = []
|
||||
for pairwt in set(pair_list):
|
||||
coros.append(self._build_trades_dl_jobs(pairwt, data_handler, cache))
|
||||
|
||||
else:
|
||||
until = int(timeframe_to_prev_date(timeframe).timestamp()) * 1000
|
||||
all_stored_ticks_df = data_handler.trades_load(
|
||||
f"{pair}-cached", self.trading_mode
|
||||
)
|
||||
async def gather_stuff(coro):
|
||||
return await asyncio.gather(*coro, return_exceptions=True)
|
||||
|
||||
if not all_stored_ticks_df.empty:
|
||||
if (
|
||||
all_stored_ticks_df.iloc[-1]["timestamp"] > first_candle_ms
|
||||
and all_stored_ticks_df.iloc[0]["timestamp"] <= first_candle_ms
|
||||
):
|
||||
# Use cache and populate further
|
||||
last_cached_ms = all_stored_ticks_df.iloc[-1]["timestamp"]
|
||||
from_id = all_stored_ticks_df.iloc[-1]["id"]
|
||||
# only use cached if it's closer than first_candle_ms
|
||||
since_ms = (
|
||||
last_cached_ms
|
||||
if last_cached_ms > first_candle_ms
|
||||
else first_candle_ms
|
||||
)
|
||||
else:
|
||||
# Skip cache, it's too old
|
||||
all_stored_ticks_df = DataFrame(
|
||||
columns=DEFAULT_TRADES_COLUMNS + ["date"]
|
||||
)
|
||||
for input_coro in chunks(coros, 100):
|
||||
with self._loop_lock:
|
||||
results = self.loop.run_until_complete(gather_stuff(input_coro))
|
||||
|
||||
# from_id overrules with exchange set to id paginate
|
||||
[_, new_ticks] = self.get_historic_trades(
|
||||
pair,
|
||||
since=since_ms if since_ms else first_candle_ms,
|
||||
until=until,
|
||||
from_id=from_id,
|
||||
)
|
||||
|
||||
except Exception:
|
||||
logger.exception(f"Refreshing TRADES data for {pair} failed")
|
||||
for res in results:
|
||||
if isinstance(res, Exception):
|
||||
logger.warning(f"Async code raised an exception: {repr(res)}")
|
||||
continue
|
||||
|
||||
if new_ticks:
|
||||
all_stored_ticks_list = all_stored_ticks_df[
|
||||
DEFAULT_TRADES_COLUMNS
|
||||
].values.tolist()
|
||||
all_stored_ticks_list.extend(new_ticks)
|
||||
trades_df = self._process_trades_df(
|
||||
pair,
|
||||
timeframe,
|
||||
candle_type,
|
||||
all_stored_ticks_list,
|
||||
cache,
|
||||
first_required_candle_date=first_candle_ms,
|
||||
)
|
||||
results_df[(pair, timeframe, candle_type)] = trades_df
|
||||
data_handler.trades_store(
|
||||
f"{pair}-cached", trades_df[DEFAULT_TRADES_COLUMNS], self.trading_mode
|
||||
)
|
||||
|
||||
else:
|
||||
logger.error(f"No new ticks for {pair}")
|
||||
pairwt, trades_df = res
|
||||
if trades_df is not None:
|
||||
results_df[pairwt] = trades_df
|
||||
|
||||
return results_df
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user