from threading import Thread, current_thread import duckdb class Database: def check_duckdb_extensions(self, extension): return self.connection.execute(""" SELECT installed FROM duckdb_extensions() WHERE extension_name = $extension """, { "extension": extension } ).fetchone() def __init__(self, path): duckdb_connection = duckdb.connect(database = path, read_only=True) self.connection = duckdb_connection.cursor() # Install spatial extension if not already installed spatial_installed = self.check_duckdb_extensions(extension='spatial') if(spatial_installed and not spatial_installed[0]): self.connection.sql("INSTALL spatial") def db_overview(self): return self.connection.sql("DESCRIBE;").show() def seeds(self): return self.connection.sql(""" SELECT regions.name, seeds.uri FROM consultancy_d.regions LEFT JOIN consultancy_d.seeds ON regions.id = seeds.region_id; """).show() def properties_growth(self): return self.connection.sql(""" WITH PropertiesALL AS ( SELECT strftime(created_at, '%Y-%m-%d') AS date, COUNT(*) as properties_count, SUM(properties_count) OVER (ORDER BY date) AS total FROM consultancy_d.properties p GROUP BY date ORDER BY date ), PropertiesR1 AS ( SELECT strftime(created_at, '%Y-%m-%d') AS date, COUNT(*) as properties_count, SUM(properties_count) OVER (ORDER BY date) AS total FROM consultancy_d.properties p WHERE p.seed_id = 1 GROUP BY date ORDER BY date ), PropertiesR2 AS ( SELECT strftime(created_at, '%Y-%m-%d') AS date, COUNT(*) as properties_count, SUM(properties_count) OVER (ORDER BY date) AS total FROM consultancy_d.properties p WHERE p.seed_id = 2 GROUP BY date ORDER BY date ), PropertiesR3 AS ( SELECT strftime(created_at, '%Y-%m-%d') AS date, COUNT(*) as properties_count, SUM(properties_count) OVER (ORDER BY date) AS total FROM consultancy_d.properties p WHERE p.seed_id = 3 GROUP BY date ORDER BY date ), PropertiesR4 AS ( SELECT strftime(created_at, '%Y-%m-%d') AS date, COUNT(*) as properties_count, SUM(properties_count) OVER (ORDER BY date) AS total FROM consultancy_d.properties p WHERE p.seed_id = 4 GROUP BY date ORDER BY date ) SELECT p.date, p.total AS total_all, pR1.total as total_heidiland, pR2.total AS total_davos, pR3.total AS total_engadin, pR4.total AS total_stmoritz FROM PropertiesAll p LEFT JOIN PropertiesR1 pR1 ON p.date = pR1.date LEFT JOIN PropertiesR2 pR2 ON p.date = pR2.date LEFT JOIN PropertiesR3 pR3 ON p.date = pR3.date LEFT JOIN PropertiesR4 pR4 ON p.date = pR4.date ORDER BY p.date """) def properties_per_region(self): return self.connection.sql(""" SELECT regions.name, COUNT(*) AS count_properties FROM consultancy_d.properties LEFT JOIN consultancy_d.seeds ON seeds.id = properties.seed_id LEFT JOIN consultancy_d.regions ON regions.id = seeds.region_id GROUP BY properties.seed_id, regions.name ORDER BY count_properties ASC """) def propIds_with_region(self): return self.connection.sql(""" SELECT properties.id, seed_id, regions.name FROM consultancy_d.properties LEFT JOIN consultancy_d.seeds ON seeds.id = properties.seed_id LEFT JOIN consultancy_d.regions ON regions.id = seeds.region_id """) def properties_unreachable(self): return self.connection.sql(""" SELECT entity_id, strftime(properties.created_at, '%Y-%m-%d') AS first_found, strftime(properties.last_found, '%Y-%m-%d') AS last_found FROM consultancy_d.exceptions LEFT JOIN consultancy_d.properties ON properties.id = exceptions.entity_id WHERE JSON_VALID(exception) = true AND JSON_EXTRACT(exception, '$.status') = '404' GROUP BY ALL ORDER BY last_found """).show() def properties_not_found(self): return self.connection.sql(""" SELECT COUNT(entity_id) as count_props, strftime(created_at, '%Y-%m-%d') as date FROM consultancy_d.exceptions WHERE JSON_VALID(exception) = true AND JSON_EXTRACT(exception, '$.status') > 400 GROUP BY date """).show() def properties_distance(self): return self.connection.sql(""" LOAD spatial; CREATE OR REPLACE VIEW geolocation_changes AS SELECT exceptions.entity_id, properties.check_data AS geolocation_original, SUBSTRING(exceptions.exception, 28) AS geolocation_new, ST_Distance_Sphere( ST_GeomFromText( CONCAT( 'POINT(', REPLACE(properties.check_data, ',', ' '), ')' ) ), ST_GeomFromText( CONCAT( 'POINT(', REPLACE(SUBSTRING(exceptions.exception, 28), ',', ' '), ')' ) ) ) AS distance FROM consultancy_d.exceptions LEFT JOIN consultancy_d.properties ON exceptions.entity_id = properties.id WHERE exception LIKE 'geoLocation was different%' GROUP BY entity_id, check_data, geolocation_new ORDER BY distance; SELECT * FROM geolocation_changes; SELECT '0 bis 25' AS category, COUNT(*) as count_properties FROM geolocation_changes WHERE distance >= (0) AND distance < (25) UNION SELECT '25 bis 50' AS category, COUNT(*) as count_properties FROM geolocation_changes WHERE distance >= (25) AND distance < (50) UNION SELECT '50 bis 75' AS category, COUNT(*) as count_properties FROM geolocation_changes WHERE distance >= (50) AND distance < (75) UNION SELECT '75 bis 100' AS category, COUNT(*) as count_properties FROM geolocation_changes WHERE distance >= (75) AND distance < (100); """) def properties_exceptions(self): return self.connection.sql(""" SELECT JSON_EXTRACT(exception, '$.status') AS exception_status, COUNT(JSON_EXTRACT(exception, '$.status')) AS exception_count FROM consultancy_d.exceptions WHERE type != 'property' GROUP BY JSON_EXTRACT(exception, '$.status') """) def extractions(self): return self.connection.sql(""" SELECT JSON_EXTRACT(body, '$.content.days') as calendar, property_id, created_at FROM consultancy_d.extractions WHERE type == 'calendar' ORDER BY property_id """) def extractions_with_region(self): return self.connection.sql(""" SELECT JSON_EXTRACT(body, '$.content.days') as calendar, extractions.property_id, extractions.created_at, properties.seed_id, regions.name FROM consultancy_d.extractions LEFT JOIN consultancy_d.properties ON properties.id = extractions.property_id LEFT JOIN consultancy_d.seeds ON seeds.id = properties.seed_id LEFT JOIN consultancy_d.regions ON regions.id = seeds.region_id """) def extractions_for(self, property_id): return self.connection.sql(f""" SELECT JSON_EXTRACT(body, '$.content.days') as calendar, created_at FROM consultancy_d.extractions WHERE type == 'calendar' AND property_id = {property_id} AND calendar NOT NULL ORDER BY created_at """) # Anzahl der extrahierten properties pro Exktraktionsvorgang def properties_per_extraction(self, property_id): return self.connection.sql(""" SELECT COUNT(property_id), strftime(created_at, '%Y-%m-%d') AS date FROM consultancy_d.extractions WHERE type == 'calendar' GROUP BY date ORDER BY date ASC """) def price(self): return self.connection.sql(""" SELECT JSON_EXTRACT(body, '$.content.lowestPrice.valueWeekRaw') AS pricePerWeek, JSON_EXTRACT(body, '$.content.lowestPrice.valueNightRaw') AS pricePerNight, JSON_EXTRACT(body, '$.content.lowestPrice.currency') AS currency, property_id, created_at FROM consultancy_d.extractions WHERE type == 'price' ORDER BY property_id """) def price_developement_per_property(self): return self.connection.sql(""" SELECT JSON_EXTRACT(body, '$.content.lowestPrice.valueNightRaw') AS pricePerNight, property_id, created_at FROM consultancy_d.extractions WHERE type == 'price' ORDER BY property_id """) def property_base_data(self, id): return self.connection.sql(f""" SELECT p.property_platform_id, p.created_at as first_found, p.last_found, r.name as region_name FROM consultancy_d.properties p INNER JOIN consultancy_d.seeds s ON s.id = p.seed_id INNER JOIN consultancy_d.regions r ON s.region_id = r.id WHERE p.id = {id} """) def properties_geo(self): return self.connection.sql(""" SELECT p.id, p.check_data as coordinates FROM consultancy_d.properties p """) def capacity_of_region(self, region_id): return self.connection.sql(f""" SELECT JSON_EXTRACT(body, '$.content.days') as calendarBody, strftime(extractions.created_at, '%Y-%m-%d') AS ScrapeDate, extractions.property_id, FROM consultancy_d.extractions LEFT JOIN consultancy_d.properties ON properties.id = extractions.property_id WHERE type == 'calendar' AND properties.seed_id = {region_id} """)