added simple caching for etl

main
Giò Diani 2025-01-14 19:56:15 +01:00
parent d436c5d892
commit 8bef4b9621
11 changed files with 99 additions and 9 deletions

4
.gitignore vendored
View File

@ -66,4 +66,8 @@ env3.*/
# duckdb # duckdb
*.duckdb *.duckdb
# cache
*.obj
/src/mauro/dok/ /src/mauro/dok/

18
etl/src/data/etl_cache.py Normal file
View File

@ -0,0 +1,18 @@
from pathlib import Path
from pickle import dump, load
Path('cache').mkdir(parents=True, exist_ok=True)
# load pickle obj
def openObj(file):
filepath = Path(f"cache/{file}")
if filepath.is_file():
with open(filepath, 'rb') as f:
return load(f)
return False
# save pickle obj
def saveObj(file, result):
filepath = Path(f"cache/{file}")
with open(filepath, 'wb') as f:
dump(result, f)

View File

@ -3,11 +3,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def property_capacities(id: int): def property_capacities(id: int):
file = f"etl_property_capacities_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
extractions = d.extractions_for(id).pl() extractions = d.extractions_for(id).pl()
df_dates = pl.DataFrame() df_dates = pl.DataFrame()
@ -35,4 +41,6 @@ def property_capacities(id: int):
max_capacity_perc = 100 / max_capacity max_capacity_perc = 100 / max_capacity
result['capacities'].append(round(max_capacity_perc * row['sum'], 2)) result['capacities'].append(round(max_capacity_perc * row['sum'], 2))
result['capacities'].reverse() result['capacities'].reverse()
etl_cache.saveObj(file, result)
return result return result

View File

@ -3,10 +3,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def property_capacities_monthly(id: int, scrapeDate: str): def property_capacities_monthly(id: int, scrapeDate: str):
file = f"etl_property_capacities_monthly_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
extractions = d.extractions_propId_scrapeDate(id, scrapeDate).pl() extractions = d.extractions_propId_scrapeDate(id, scrapeDate).pl()
df_calendar = pl.DataFrame() df_calendar = pl.DataFrame()
@ -24,4 +31,5 @@ def property_capacities_monthly(id: int, scrapeDate: str):
df_calendar = df_calendar.sort('dates') df_calendar = df_calendar.sort('dates')
df_calendar = df_calendar.drop('dates') df_calendar = df_calendar.drop('dates')
result = {"scraping-date": scrapeDate, "months": df_calendar['date_short'].to_list(), 'capacities': df_calendar['column_0'].to_list()} result = {"scraping-date": scrapeDate, "months": df_calendar['date_short'].to_list(), 'capacities': df_calendar['column_0'].to_list()}
return result etl_cache.saveObj(file, result)
return result

View File

@ -3,10 +3,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def property_capacities_weekdays(id: int, scrapeDate: str): def property_capacities_weekdays(id: int, scrapeDate: str):
file = f"etl_property_capacities_weekdays_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
extractions = d.extractions_propId_scrapeDate(id, scrapeDate).pl() extractions = d.extractions_propId_scrapeDate(id, scrapeDate).pl()
weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] weekdays = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
df_calendar = pl.DataFrame() df_calendar = pl.DataFrame()
@ -30,4 +37,5 @@ def property_capacities_weekdays(id: int, scrapeDate: str):
df_calendar = df_calendar.drop('weekday_num') df_calendar = df_calendar.drop('weekday_num')
result = {"scraping-date": scrapeDate, "weekdays": df_calendar['weekday'].to_list(), 'capacities': df_calendar['column_0'].to_list()} result = {"scraping-date": scrapeDate, "weekdays": df_calendar['weekday'].to_list(), 'capacities': df_calendar['column_0'].to_list()}
etl_cache.saveObj(file, result)
return result return result

View File

@ -1,7 +1,9 @@
from math import asin, atan2, cos, degrees, radians, sin, sqrt
import polars as pl import polars as pl
from math import radians, cos, sin, asin, sqrt, degrees, atan2
import data import data
from data import etl_cache
d = data.load() d = data.load()
@ -23,6 +25,12 @@ def calcHaversinDistance(latMain, lonMain, lat, lon):
return d return d
def property_neighbours(id: int): def property_neighbours(id: int):
file = f"etl_property_neighbours_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
extractions = d.properties_geo_seeds().pl() extractions = d.properties_geo_seeds().pl()
# Get lat, long and region from main property # Get lat, long and region from main property
@ -61,6 +69,6 @@ def property_neighbours(id: int):
#result = {"ids": extractions['id'].to_list(), "lat": extractions['lat'].to_list(), "lon": extractions['lon'].to_list()} #result = {"ids": extractions['id'].to_list(), "lat": extractions['lat'].to_list(), "lon": extractions['lon'].to_list()}
result = extractions.to_dicts() result = extractions.to_dicts()
etl_cache.saveObj(file, result)
return result return result

View File

@ -4,11 +4,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def region_capacities(id: int): def region_capacities(id: int):
file = f"etl_region_capacities_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
# Get Data # Get Data
if id == -1: if id == -1:
extractions = d.capacity_global().pl() extractions = d.capacity_global().pl()
@ -47,4 +53,6 @@ def region_capacities(id: int):
df = df.cast({"scrape_date": date}).sort('scrape_date') df = df.cast({"scrape_date": date}).sort('scrape_date')
result = {"capacities": df['capacity'].to_list(), "dates": df['scrape_date'].to_list()} result = {"capacities": df['capacity'].to_list(), "dates": df['scrape_date'].to_list()}
etl_cache.saveObj(file, result)
return result return result

View File

@ -1,15 +1,21 @@
from datetime import datetime, timedelta
from io import StringIO from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
from datetime import datetime, timedelta
d = data.load() d = data.load()
def region_capacities_monthly(id: int, scrapeDate_start: str): def region_capacities_monthly(id: int, scrapeDate_start: str):
file = f"etl_region_capacities_monthly_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
# String to Date # String to Date
scrapeDate_start = datetime.strptime(scrapeDate_start, '%Y-%m-%d') scrapeDate_start = datetime.strptime(scrapeDate_start, '%Y-%m-%d')
@ -50,4 +56,5 @@ def region_capacities_monthly(id: int, scrapeDate_start: str):
outDf = outDf[['date_short', 'mean']] outDf = outDf[['date_short', 'mean']]
result = {"scraping-date": scrapeDate, "months": outDf['date_short'].to_list(),'capacities': outDf['mean'].to_list()} result = {"scraping-date": scrapeDate, "months": outDf['date_short'].to_list(),'capacities': outDf['mean'].to_list()}
etl_cache.saveObj(file, result)
return result return result

View File

@ -1,15 +1,20 @@
from datetime import datetime, timedelta
from io import StringIO from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
from datetime import datetime, timedelta
d = data.load() d = data.load()
def region_capacities_weekdays(id: int, scrapeDate_start: str): def region_capacities_weekdays(id: int, scrapeDate_start: str):
file = f"etl_region_capacities_weekdays_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
# String to Date # String to Date
scrapeDate_start = datetime.strptime(scrapeDate_start, '%Y-%m-%d') scrapeDate_start = datetime.strptime(scrapeDate_start, '%Y-%m-%d')
@ -53,4 +58,5 @@ def region_capacities_weekdays(id: int, scrapeDate_start: str):
outDf = outDf[['weekday', 'mean']] outDf = outDf[['weekday', 'mean']]
result = {"scraping-date": scrapeDate, "weekdays": outDf['weekday'].to_list(),'capacities': outDf['mean'].to_list()} result = {"scraping-date": scrapeDate, "weekdays": outDf['weekday'].to_list(),'capacities': outDf['mean'].to_list()}
etl_cache.saveObj(file, result)
return result return result

View File

@ -4,10 +4,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def region_movingAverage(id: int, scrape_date_start_min: str): def region_movingAverage(id: int, scrape_date_start_min: str):
file = f"etl_region_movingAverage_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
# Settings # Settings
# Offset between actual and predict ScrapeDate # Offset between actual and predict ScrapeDate
timeOffset = 30 timeOffset = 30
@ -119,4 +126,5 @@ def region_movingAverage(id: int, scrape_date_start_min: str):
# Add moving_averages to df # Add moving_averages to df
outDF = outDF.with_columns(moving_averages=pl.Series(moving_averages)) outDF = outDF.with_columns(moving_averages=pl.Series(moving_averages))
result = {'dates':outDF.get_column('dates').to_list(), 'cap_earlierTimeframe':outDF.get_column('sum_hor_predict').to_list(), 'cap_laterTimeframe':outDF.get_column('sum_hor_actual').to_list(), 'movAvg':outDF.get_column('moving_averages').to_list(),} result = {'dates':outDF.get_column('dates').to_list(), 'cap_earlierTimeframe':outDF.get_column('sum_hor_predict').to_list(), 'cap_laterTimeframe':outDF.get_column('sum_hor_actual').to_list(), 'movAvg':outDF.get_column('moving_averages').to_list(),}
etl_cache.saveObj(file, result)
return result return result

View File

@ -3,10 +3,17 @@ from io import StringIO
import polars as pl import polars as pl
import data import data
from data import etl_cache
d = data.load() d = data.load()
def region_properties_capacities(id: int): def region_properties_capacities(id: int):
file = f"etl_region_properties_capacities_{id}.obj"
obj = etl_cache.openObj(file)
if obj:
return obj
# Get Data # Get Data
if id == -1: if id == -1:
df = d.capacity_global().pl() df = d.capacity_global().pl()
@ -53,5 +60,5 @@ def region_properties_capacities(id: int):
# Create JSON # Create JSON
outDict = {'scrapeDates': listOfDates, 'property_ids': listOfPropertyIDs, 'values': values} outDict = {'scrapeDates': listOfDates, 'property_ids': listOfPropertyIDs, 'values': values}
etl_cache.saveObj(file, outDict)
return outDict return outDict