from datetime import date, datetime, timedelta from io import StringIO import polars as pl import data from data import etl_cache import matplotlib.pyplot as plt d = data.load() def region_movingAverage(id: int, scrape_date_start_min: str): file = f"etl_region_movingAverage_{id}_{scrape_date_start_min}.obj" obj = etl_cache.openObj(file) if obj: return obj # Settings # Offset between actual and predict ScrapeDate timeOffset = 30 # Calculation Frame calcFrame = 180 # Filter Setting windowSize = 7 # Get unique ScrapeDates uniqueScrapeDates = d.unique_scrapeDates().pl() uniqueScrapeDates = uniqueScrapeDates.get_column('ScrapeDate').str.to_date() uniqueScrapeDates = uniqueScrapeDates.sort().to_list() # String to Date scrape_date_start_min = datetime.strptime(scrape_date_start_min, '%Y-%m-%d') # Get end date of start search-window scrape_date_start_max = scrape_date_start_min + timedelta(days=1) # Get start and end date of End search-window scrape_date_end_min = scrape_date_start_min + timedelta(days=timeOffset) # Get closest ScrapeDate scrape_date_end_min = min(uniqueScrapeDates, key=lambda x: abs(x - scrape_date_end_min.date())) scrape_date_end_max = scrape_date_end_min + timedelta(days=1) final_end_date = scrape_date_end_min + timedelta(days=calcFrame) # Get Data if id == -1: ex_start = d.singleScrape_of_global(scrape_date_start_min, scrape_date_start_max) ex_start_count = ex_start.shape[0] ex_end = d.singleScrape_of_global(scrape_date_end_min, scrape_date_end_max) ex_end_count = ex_end.shape[0] else: ex_start = d.singleScrape_of_region(id, scrape_date_start_min, scrape_date_start_max) ex_start_count = ex_start.shape[0] ex_end = d.singleScrape_of_region(id, scrape_date_end_min, scrape_date_end_max) ex_end_count = ex_end.shape[0] num_properties = [ex_start_count, ex_end_count] start_end = [ex_start, ex_end] outDFList = [] for df in start_end: df = df.pl() firstExe = True counter = 1 outDF = pl.DataFrame(schema={"0": int, "dates": date}) for row in df.rows(named=True): if row['calendarBody']: calDF = pl.read_json(StringIO(row['calendarBody'])) columnTitles = calDF.columns calDF = calDF.transpose() calDF = calDF.with_columns(pl.Series(name="dates", values=columnTitles)) calDF = calDF.with_columns((pl.col("dates").str.to_date())) # Filter out all Data that's in the calculation frame calDF = calDF.filter((pl.col("dates") >= (scrape_date_start_min + timedelta(days=1)))) calDF = calDF.filter((pl.col("dates") < final_end_date)) # Join all information into one Dataframe if firstExe: outDF = calDF firstExe = False else: outDF = outDF.join(calDF, on='dates') outDF = outDF.rename({'column_0': str(counter)}) counter += 1 outDF = outDF.sort('dates') outDFList.append(outDF) # Calculate the horizontal Sum for all Dates arrayCunter = 0 tempDFList = [] for df in outDFList: dates = df.select(pl.col("dates")) values = df.select(pl.exclude("dates")) sum_hor = values.sum_horizontal() sum_hor = sum_hor / num_properties[arrayCunter] / 2 * 100 arrayCunter += 1 newDF = dates.with_columns(sum_hor=pl.Series(sum_hor)) tempDFList.append(newDF) # Join actual and predict Values outDF = tempDFList[0].join(tempDFList[1], on='dates', how='outer') # Rename Columns for clarity outDF = outDF.drop('dates_right') # sum_hor_predict is the data from the earlier ScrapeDate outDF = outDF.rename({'sum_hor_right': 'sum_hor_actual', 'sum_hor': 'sum_hor_predict'}) # Calculate Moving average from Start baseValues = outDF.get_column('sum_hor_predict').to_list() i = 0 moving_averages = [] while i < len(baseValues) - windowSize + 1: window = baseValues[i: i + windowSize] window_average = sum(window) / windowSize moving_averages.append(window_average) i += 1 # Add empty values back to the front and end of moving_averages num_empty = int(windowSize / 2) moving_averages = [None] *num_empty + moving_averages + [None] * num_empty # Add moving_averages to df outDF = outDF.with_columns(moving_averages=pl.Series(moving_averages)) result = {'dates':outDF.get_column('dates').to_list(), 'cap_earlierTimeframe':outDF.get_column('sum_hor_predict').to_list(), 'cap_laterTimeframe':outDF.get_column('sum_hor_actual').to_list(), 'movAvg':outDF.get_column('moving_averages').to_list(),} etl_cache.saveObj(file, result) return result