mirror of
https://github.com/freqtrade/freqtrade.git
synced 2024-11-10 10:21:59 +00:00
Improve exchange_ws terminology
This commit is contained in:
parent
60cfda5d52
commit
f4f8b910fe
|
@ -2249,7 +2249,7 @@ class Exchange:
|
|||
if self._exchange_ws:
|
||||
candle_date = int(timeframe_to_prev_date(timeframe).timestamp())
|
||||
candles = self._exchange_ws.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
|
||||
x = self._exchange_ws.pairs_last_refresh[(pair, timeframe, candle_type)]
|
||||
x = self._exchange_ws.klines_last_refresh.get((pair, timeframe, candle_type))
|
||||
logger.info(f"{candle_date < x}, {candle_date}, {x}")
|
||||
if candles and candles[-1][0] > min_date and candle_date < x:
|
||||
# Usable result ...
|
||||
|
|
|
@ -20,22 +20,22 @@ class ExchangeWS():
|
|||
def __init__(self, config: Config, ccxt_object) -> None:
|
||||
self.config = config
|
||||
self.ccxt_object = ccxt_object
|
||||
self._thread = Thread(name="ccxt_ws", target=self.start)
|
||||
self._thread = Thread(name="ccxt_ws", target=self.__start_forever)
|
||||
self._background_tasks: Set[asyncio.Task] = set()
|
||||
|
||||
self._pairs_watching: Set[Tuple[str, str, CandleType]] = set()
|
||||
self._pairs_scheduled: Set[Tuple[str, str, CandleType]] = set()
|
||||
self.pairs_last_refresh: Dict[Tuple[str, str, CandleType], float] = {}
|
||||
self.pairs_last_request: Dict[Tuple[str, str, CandleType], float] = {}
|
||||
self._klines_watching: Set[Tuple[str, str, CandleType]] = set()
|
||||
self._klines_scheduled: Set[Tuple[str, str, CandleType]] = set()
|
||||
self.klines_last_refresh: Dict[Tuple[str, str, CandleType], float] = {}
|
||||
self.klines_last_request: Dict[Tuple[str, str, CandleType], float] = {}
|
||||
self._thread.start()
|
||||
|
||||
def start(self) -> None:
|
||||
def __start_forever(self) -> None:
|
||||
self._loop = asyncio.new_event_loop()
|
||||
self._loop.run_forever()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
logger.debug("Cleanup called - stopping")
|
||||
self._pairs_watching.clear()
|
||||
self._klines_watching.clear()
|
||||
self._loop.stop()
|
||||
self._thread.join()
|
||||
logger.debug("Stopped")
|
||||
|
@ -45,19 +45,19 @@ class ExchangeWS():
|
|||
Remove pairs from watchlist if they've not been requested within
|
||||
the last timeframe (+ offset)
|
||||
"""
|
||||
for p in list(self._pairs_watching):
|
||||
for p in list(self._klines_watching):
|
||||
_, timeframe, _ = p
|
||||
timeframe_s = timeframe_to_seconds(timeframe)
|
||||
last_refresh = self.pairs_last_request.get(p, 0)
|
||||
last_refresh = self.klines_last_request.get(p, 0)
|
||||
if last_refresh > 0 and time.time() - last_refresh > timeframe_s + 20:
|
||||
logger.info(f"Removing {p} from watchlist")
|
||||
self._pairs_watching.discard(p)
|
||||
self._klines_watching.discard(p)
|
||||
|
||||
async def schedule_while_true(self) -> None:
|
||||
|
||||
for p in self._pairs_watching:
|
||||
if p not in self._pairs_scheduled:
|
||||
self._pairs_scheduled.add(p)
|
||||
for p in self._klines_watching:
|
||||
if p not in self._klines_scheduled:
|
||||
self._klines_scheduled.add(p)
|
||||
pair, timeframe, candle_type = p
|
||||
task = asyncio.create_task(
|
||||
self.continuously_async_watch_ohlcv(pair, timeframe, candle_type))
|
||||
|
@ -73,21 +73,24 @@ class ExchangeWS():
|
|||
async def continuously_async_watch_ohlcv(
|
||||
self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||
try:
|
||||
while (pair, timeframe, candle_type) in self._pairs_watching:
|
||||
while (pair, timeframe, candle_type) in self._klines_watching:
|
||||
start = time.time()
|
||||
data = await self.ccxt_object.watch_ohlcv(pair, timeframe)
|
||||
self.pairs_last_refresh[(pair, timeframe, candle_type)] = time.time()
|
||||
self.klines_last_refresh[(pair, timeframe, candle_type)] = time.time()
|
||||
# logger.info(
|
||||
# f"watch done {pair}, {timeframe}, data {len(data)} "
|
||||
# f"in {time.time() - start:.2f}s")
|
||||
except ccxt.BaseError:
|
||||
logger.exception("Exception in continuously_async_watch_ohlcv")
|
||||
finally:
|
||||
self._pairs_watching.discard((pair, timeframe, candle_type))
|
||||
self._klines_watching.discard((pair, timeframe, candle_type))
|
||||
|
||||
def schedule_ohlcv(self, pair: str, timeframe: str, candle_type: CandleType) -> None:
|
||||
self._pairs_watching.add((pair, timeframe, candle_type))
|
||||
self.pairs_last_request[(pair, timeframe, candle_type)] = time.time()
|
||||
"""
|
||||
Schedule a pair/timeframe combination to be watched
|
||||
"""
|
||||
self._klines_watching.add((pair, timeframe, candle_type))
|
||||
self.klines_last_request[(pair, timeframe, candle_type)] = time.time()
|
||||
# asyncio.run_coroutine_threadsafe(self.schedule_schedule(), loop=self._loop)
|
||||
asyncio.run_coroutine_threadsafe(self.schedule_while_true(), loop=self._loop)
|
||||
self.cleanup_expired()
|
||||
|
@ -100,10 +103,10 @@ class ExchangeWS():
|
|||
candles = self.ccxt_object.ohlcvs.get(pair, {}).get(timeframe)
|
||||
# Fake 1 candle - which is then removed again
|
||||
# TODO: is this really a good idea??
|
||||
refresh_time = int(self.pairs_last_refresh[(pair, timeframe, candle_type)] * 1000)
|
||||
refresh_time = int(self.klines_last_refresh[(pair, timeframe, candle_type)] * 1000)
|
||||
candles.append([refresh_time, 0, 0, 0, 0, 0])
|
||||
logger.info(
|
||||
f"watch result for {pair}, {timeframe} with length {len(candles)}, "
|
||||
f"{datetime.fromtimestamp(candles[-1][0] // 1000)}, "
|
||||
f"lref={datetime.fromtimestamp(self.pairs_last_refresh[(pair, timeframe, candle_type)])}")
|
||||
f"lref={datetime.fromtimestamp(self.klines_last_refresh[(pair, timeframe, candle_type)])}")
|
||||
return pair, timeframe, candle_type, candles
|
||||
|
|
Loading…
Reference in New Issue
Block a user