minor changes, update candle appending to support overlaps

This commit is contained in:
Timothy Pogue 2022-12-05 13:11:07 -07:00
parent f1ebaf4730
commit 0602479f7d
3 changed files with 35 additions and 22 deletions

View File

@ -167,7 +167,8 @@ class DataProvider:
producer_name: str = "default" producer_name: str = "default"
) -> Tuple[bool, int]: ) -> Tuple[bool, int]:
""" """
Append a candle to the existing external dataframe Append a candle to the existing external dataframe. The incoming dataframe
must have at least 1 candle.
:param pair: pair to get the data for :param pair: pair to get the data for
:param timeframe: Timeframe to get data for :param timeframe: Timeframe to get data for
@ -176,29 +177,32 @@ class DataProvider:
""" """
pair_key = (pair, timeframe, candle_type) pair_key = (pair, timeframe, candle_type)
if (producer_name not in self.__producer_pairs_df) \ if dataframe.empty:
or (pair_key not in self.__producer_pairs_df[producer_name]): # The incoming dataframe must have at least 1 candle
return (False, 0)
if (producer_name not in self.__producer_pairs_df
or pair_key not in self.__producer_pairs_df[producer_name]):
# We don't have data from this producer yet, # We don't have data from this producer yet,
# or we don't have data for this pair_key # or we don't have data for this pair_key
# return False and 1000 for the full df # return False and 1000 for the full df
return (False, 1000) return (False, 1000)
existing_df, _ = self.__producer_pairs_df[producer_name][pair_key] existing_df, la = self.__producer_pairs_df[producer_name][pair_key]
# Iterate over any overlapping candles and update the values
for idx, candle in dataframe.iterrows():
existing_df.iloc[
existing_df['date'] == candle['date']
] = candle
existing_df.reset_index(drop=True, inplace=True)
# CHECK FOR MISSING CANDLES # CHECK FOR MISSING CANDLES
timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas timeframe_delta = to_timedelta(timeframe) # Convert the timeframe to a timedelta for pandas
local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data local_last = existing_df.iloc[-1]['date'] # We want the last date from our copy of data
incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data incoming_first = dataframe.iloc[0]['date'] # We want the first date from the incoming data
# We have received this candle before, update our copy
# and return True, 0
if local_last == incoming_first:
existing_df.iloc[-1] = dataframe.iloc[0]
existing_data = (existing_df.reset_index(drop=True), _)
self.__producer_pairs_df[producer_name][pair_key] = existing_data
return (True, 0)
candle_difference = (incoming_first - local_last) / timeframe_delta candle_difference = (incoming_first - local_last) / timeframe_delta
# If the difference divided by the timeframe is 1, then this # If the difference divided by the timeframe is 1, then this
@ -228,6 +232,7 @@ class DataProvider:
# Only keep the last 1500 candles in memory # Only keep the last 1500 candles in memory
existing = existing[-1500:] if len(existing) > 1500 else existing existing = existing[-1500:] if len(existing) > 1500 else existing
existing.reset_index(drop=True, inplace=True)
return existing return existing

View File

@ -36,6 +36,9 @@ class Producer(TypedDict):
ws_token: str ws_token: str
FULL_DATAFRAME_THRESHOLD = 100
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -376,8 +379,8 @@ class ExternalMessageConsumer:
logger.debug(f"Received {len(df)} candle(s) for {key}") logger.debug(f"Received {len(df)} candle(s) for {key}")
if len(df) >= 999: if len(df) >= FULL_DATAFRAME_THRESHOLD:
# This is a full dataframe # This is likely a full dataframe
# Add the dataframe to the dataprovider # Add the dataframe to the dataprovider
self._dp._add_external_df( self._dp._add_external_df(
pair, pair,
@ -388,8 +391,8 @@ class ExternalMessageConsumer:
producer_name=producer_name producer_name=producer_name
) )
elif len(df) < 999: elif len(df) < FULL_DATAFRAME_THRESHOLD:
# This is n single candles # This is likely n single candles
# Have dataprovider append it to # Have dataprovider append it to
# the full datafame. If it can't, # the full datafame. If it can't,
# request the missing candles # request the missing candles
@ -403,9 +406,14 @@ class ExternalMessageConsumer:
) )
if not did_append: if not did_append:
logger.debug("Holes in data or no existing df, " # We want an overlap in candles incase some data has changed
f"requesting {n_missing} candles " n_missing += 1
f"for {key} from `{producer_name}`") # Set to None for all candles if we missed a full df's worth of candles
n_missing = n_missing if n_missing < FULL_DATAFRAME_THRESHOLD else 1500
logger.warning("Holes in data or no existing df, "
f"requesting {n_missing} candles "
f"for {key} from `{producer_name}`")
self.send_producer_request( self.send_producer_request(
producer_name, producer_name,

View File

@ -1062,7 +1062,7 @@ class RPC:
self, self,
pair: str, pair: str,
timeframe: str, timeframe: str,
limit: Optional[int] = None limit: Optional[int]
) -> Tuple[DataFrame, datetime]: ) -> Tuple[DataFrame, datetime]:
""" """
Get the dataframe and last analyze from the dataprovider Get the dataframe and last analyze from the dataprovider
@ -1083,7 +1083,7 @@ class RPC:
def _ws_all_analysed_dataframes( def _ws_all_analysed_dataframes(
self, self,
pairlist: List[str], pairlist: List[str],
limit: Optional[int] = None limit: Optional[int]
) -> Generator[Dict[str, Any], None, None]: ) -> Generator[Dict[str, Any], None, None]:
""" """
Get the analysed dataframes of each pair in the pairlist. Get the analysed dataframes of each pair in the pairlist.