# course_content_extractor.py import os import zipfile import shutil import tempfile import subprocess import sys import re import unicodedata import logging class CourseContentExtractor: def __init__(self, download_dir, root_dir): """ Initialize the CourseContentExtractor. :param download_dir: Directory where ZIP files are downloaded :param root_dir: Root directory for organizing study materials """ self.download_dir = download_dir self.root_dir = root_dir # Read from environment variable def extract_contents(self, courses): """ Extract and organize course contents based on the provided folder structure. :param courses: List of course dictionaries containing 'Semester' and 'CourseName' """ # Ensure root_dir exists if not os.path.exists(self.root_dir): try: os.makedirs(self.root_dir) logging.info(f"Created root directory: {self.root_dir}") except Exception as e: logging.error(f"Failed to create root directory '{self.root_dir}': {e}") return # Loop through downloaded ZIP files zip_files = [f for f in os.listdir(self.download_dir) if f.endswith('.zip')] for filename in zip_files: zip_path = os.path.join(self.download_dir, filename) base_name = os.path.splitext(filename)[0] # Find the course info matching the ZIP file # Sanitize both course names to ensure matching course_info = next( (course for course in courses if self.sanitize_filename(course['CourseName']) == self.sanitize_filename(base_name)), None ) if not course_info: print(f"No matching course found for {base_name}. Skipping.") logging.warning(f"No matching course found for {base_name}. Skipping.") continue # Build the folder structure semester = course_info['Semester'] course_name = course_info['CourseName'] course_output_dir = os.path.join( self.root_dir, semester, course_name ) # Create subfolders subfolders = ['Lectures', 'Notes', 'Summary', 'Tasks'] for subfolder in subfolders: subfolder_path = os.path.join(course_output_dir, subfolder) try: os.makedirs(subfolder_path, exist_ok=True) logging.info(f"Created subfolder: {subfolder_path}") except Exception as e: logging.error(f"Failed to create subfolder '{subfolder_path}': {e}") continue # Extract and organize files with tempfile.TemporaryDirectory() as temp_extract_dir: try: with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(temp_extract_dir) logging.info(f"Extracted ZIP file to temporary directory: {temp_extract_dir}") except zipfile.BadZipFile as e: logging.error(f"Bad ZIP file '{zip_path}': {e}") continue except Exception as e: logging.error(f"Failed to extract ZIP file '{zip_path}': {e}") continue for root, dirs, files in os.walk(temp_extract_dir): for file in files: file_path = os.path.join(root, file) if file.lower().endswith('.pdf'): dest_folder = os.path.join(course_output_dir, 'Lectures') try: shutil.copy2(file_path, dest_folder) logging.info(f"Copied PDF file to Lectures: {file}") except Exception as e: logging.error(f"Failed to copy PDF file '{file}' to '{dest_folder}': {e}") elif file.lower().endswith(('.ppt', '.pptx')): try: self.convert_ppt_to_pdf(file_path, os.path.join(course_output_dir, 'Lectures')) except Exception as e: logging.error(f"Failed to convert PPT file '{file}': {e}") else: # Skip unwanted file types logging.info(f"Skipped unsupported file type: {file}") # Delete the ZIP file after processing try: os.remove(zip_path) logging.info(f"Deleted ZIP file after extraction: {zip_path}") except Exception as e: logging.error(f"Failed to delete ZIP file '{zip_path}': {e}") print(f"All files have been extracted to {self.root_dir}") logging.info(f"All files have been extracted to {self.root_dir}") def convert_ppt_to_pdf(self, ppt_path, output_dir): """ Convert PowerPoint files to PDF using LibreOffice. :param ppt_path: Path to the PPT/PPTX file :param output_dir: Directory to save the converted PDF """ try: # Determine the command based on the operating system if sys.platform.startswith('win'): office_executable = 'soffice' # Ensure LibreOffice is installed and in PATH else: office_executable = 'libreoffice' # Prepare the command to convert PPT/PPTX to PDF using LibreOffice command = [ office_executable, '--headless', '--convert-to', 'pdf', '--outdir', output_dir, ppt_path ] # Execute the command subprocess.run(command, check=True) logging.info(f"Converted {os.path.basename(ppt_path)} to PDF.") print(f"Converted {os.path.basename(ppt_path)} to PDF.") except subprocess.CalledProcessError as e: logging.error(f"Failed to convert {os.path.basename(ppt_path)} to PDF. Error: {e}") print(f"Failed to convert {os.path.basename(ppt_path)} to PDF. Error: {e}") # Optionally, copy the original PPT/PPTX file try: shutil.copy2(ppt_path, output_dir) logging.info(f"Copied original PPT/PPTX to {output_dir}") except Exception as ex: logging.error(f"Failed to copy PPT/PPTX file '{ppt_path}' to '{output_dir}': {ex}") except FileNotFoundError: logging.error(f"{office_executable} is not installed or not found in the system path.") print(f"{office_executable} is not installed or not found in the system path.") # Optionally, copy the original PPT/PPTX file try: shutil.copy2(ppt_path, output_dir) logging.info(f"Copied original PPT/PPTX to {output_dir}") except Exception as ex: logging.error(f"Failed to copy PPT/PPTX file '{ppt_path}' to '{output_dir}': {ex}") def sanitize_filename(self, name): """ Sanitize the filename by removing invalid characters, replacing spaces with underscores, and truncating to a maximum length to prevent path issues. :param name: Original filename :return: Sanitized filename """ # Normalize unicode characters name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII') # Remove invalid characters for filenames, including newlines sanitized = re.sub(r'[<>:"/\\|?*\n\r]+', '', name) # Replace spaces with underscores sanitized = re.sub(r'\s+', '_', sanitized) # Remove trailing underscores sanitized = sanitized.rstrip('_') # Truncate to a reasonable length (e.g., 200 characters) MAX_LENGTH = 200 if len(sanitized) > MAX_LENGTH: sanitized = sanitized[:MAX_LENGTH] logging.warning(f"Filename truncated to {MAX_LENGTH} characters: '{sanitized}'") return sanitized