moodle-scraper/moodle_downloader.py

347 lines
14 KiB
Python
Raw Permalink Normal View History

2025-01-25 16:20:03 +01:00
# moodle_downloader.py
import os
import re
import logging
import requests
import unicodedata
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
class MoodleDownloader:
def __init__(self, username, password, download_dir, headless=False):
"""
Initialize the MoodleDownloader.
:param username: Moodle username
:param password: Moodle password
:param download_dir: Directory to download ZIP files
:param headless: Run browser in headless mode
"""
self.username = username
self.password = password
self.download_dir = download_dir # Set externally to use system temp
self.headless = headless
self.driver = None
self.courses = []
self.LOGIN_URL = 'https://moodle.fhgr.ch/login/index.php'
self.MY_COURSES_URL = 'https://moodle.fhgr.ch/my/courses.php'
def setup_driver(self):
"""
Set up the Selenium WebDriver with Chrome options.
"""
chrome_options = Options()
if self.headless:
chrome_options.add_argument('--headless') # Headless mode
chrome_options.add_argument('--no-sandbox')
chrome_options.add_argument('--disable-dev-shm-usage')
chrome_options.add_argument('--disable-gpu') # Optional
chrome_options.add_argument('--window-size=1920,1080')
# Preferences for downloads
prefs = {
"download.default_directory": self.download_dir,
"download.prompt_for_download": False,
"download.directory_upgrade": True,
"safebrowsing.enabled": True,
"profile.default_content_setting_values.automatic_downloads": 1,
}
chrome_options.add_experimental_option("prefs", prefs)
# Initialize WebDriver
logging.info("Initializing the WebDriver.")
self.driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
def login(self):
"""
Log in to Moodle.
"""
self.setup_driver()
driver = self.driver
try:
# Open Moodle login page
logging.info(f"Opening Moodle login page: {self.LOGIN_URL}")
driver.get(self.LOGIN_URL)
# Wait until the page is loaded
WebDriverWait(driver, 15).until(
EC.presence_of_element_located((By.TAG_NAME, 'body'))
)
logging.info("Moodle login page loaded.")
# Check for 'wayf_submit_button' and click if present
try:
logging.info("Checking for 'wayf_submit_button'.")
wayf_button = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable((By.ID, 'wayf_submit_button'))
)
wayf_button.click()
logging.info("'wayf_submit_button' found and clicked.")
# Wait for redirection to login page
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.ID, 'username'))
)
except TimeoutException:
logging.info("'wayf_submit_button' not found. Continuing with login.")
# Wait for username and password fields
logging.info("Waiting for the username field.")
username_field = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.ID, 'username'))
)
logging.info("Waiting for the password field.")
password_field = WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.ID, 'password'))
)
# Enter login credentials
logging.info("Entering login credentials.")
username_field.send_keys(self.username)
password_field.send_keys(self.password)
# Submit the form
logging.info("Submitting the login form.")
password_field.send_keys(Keys.RETURN)
# Wait for login to complete
logging.info("Waiting for login to complete.")
WebDriverWait(driver, 30).until(
EC.url_changes(self.LOGIN_URL)
)
logging.info("Login successful.")
except Exception as e:
logging.error("An error occurred during login.", exc_info=True)
raise e
def get_courses(self):
"""
Retrieve the list of courses from Moodle.
"""
driver = self.driver
try:
# Navigate to "My Courses" page
logging.info(f"Navigating to 'My Courses' page: {self.MY_COURSES_URL}")
driver.get(self.MY_COURSES_URL)
# Wait until the page is loaded
logging.info("Waiting for the 'My Courses' page to load.")
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.CSS_SELECTOR, 'a.aalink.coursename'))
)
logging.info("'My Courses' page loaded.")
# Collect all courses from the page
logging.info("Collecting all courses from 'My Courses' page.")
course_elements = driver.find_elements(By.CSS_SELECTOR, 'a.aalink.coursename')
logging.info(f"{len(course_elements)} courses found.")
existing_urls = set()
for coursename_element in course_elements:
try:
# Extract course name from the nested span
course_name_element = coursename_element.find_element(By.CSS_SELECTOR, 'span.multiline span[aria-hidden="true"]')
course_title = course_name_element.text.strip()
logging.debug(f"Course title extracted: '{course_title}'")
# Extract semester from the sibling div
parent_div = coursename_element.find_element(By.XPATH, '..') # Navigate to parent div
category_span = parent_div.find_element(By.CSS_SELECTOR, 'span.categoryname.text-truncate')
semester = category_span.text.strip()
logging.debug(f"Semester extracted: '{semester}'")
# Extract course info
course_info = self.extract_course_info(course_title)
course_url = coursename_element.get_attribute('href')
# Check for duplicates
if course_url in existing_urls:
logging.info(f"Duplicate course found: {course_info['course_name']} - {course_url}")
continue
existing_urls.add(course_url)
self.courses.append({
'Semester': self.sanitize_semester(semester),
'CourseName': course_info['course_name'],
'URL': course_url
})
logging.info(f"Course found: {course_info['course_name']} - {course_url}")
except Exception as e:
logging.warning(f"Error extracting course: {e}")
continue
if not self.courses:
logging.warning("No courses found. Check the HTML structure of the 'My Courses' page.")
print("No courses found. Check the HTML structure of the 'My Courses' page.")
except Exception as e:
logging.error("An error occurred while retrieving courses.", exc_info=True)
raise e
def sanitize_semester(self, semester):
"""
Sanitize the semester name by replacing spaces with underscores and removing trailing underscores.
:param semester: Original semester string
:return: Sanitized semester string
"""
sanitized = re.sub(r'\s+', '_', semester).strip('_')
logging.debug(f"Sanitized semester: '{sanitized}'")
return sanitized
def extract_course_info(self, course_title):
"""
Extract course information from the course title.
:param course_title: Full course title string (e.g., 'Algorithmen und Datenstrukturen (cds-203) HS24')
:return: Dictionary with 'course_name'
"""
# Remove the semester from the course title
# Example: 'Algorithmen und Datenstrukturen (cds-203) HS24' -> 'Algorithmen und Datenstrukturen (cds-203)'
pattern = r'^(.*?)\s*\(([^)]+)\)\s*\w+\d*$'
match = re.search(pattern, course_title)
if match:
course_full_name = match.group(1).strip()
course_code = match.group(2).strip()
course_name = f"{course_full_name} ({course_code})"
return {
'course_name': course_name
}
else:
# Handle cases where the pattern doesn't match
sanitized_title = self.sanitize_filename(course_title)
return {
'course_name': sanitized_title
}
def download_all_courses(self):
"""
Download all courses as ZIP files.
"""
if not self.courses:
logging.warning("No courses to download.")
return
driver = self.driver
# Ensure the download directory exists
if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)
logging.info(f"Created download directory: {self.download_dir}")
for course in self.courses:
course_name = course['CourseName']
course_url = course['URL']
logging.info(f"Processing course: {course_name} - {course_url}")
driver.get(course_url)
# Wait until the course page is loaded
WebDriverWait(driver, 20).until(
EC.presence_of_element_located((By.TAG_NAME, 'body'))
)
try:
logging.info("Searching for 'Download course content' link.")
download_link = driver.find_element(By.CSS_SELECTOR, 'a[data-downloadcourse="1"]')
download_page_url = download_link.get_attribute('href')
logging.info(f"Download page URL: {download_page_url}")
# Open the download page
driver.get(download_page_url)
# Wait until the page is loaded
WebDriverWait(driver, 10).until(
EC.presence_of_element_located((By.NAME, 'sesskey'))
)
# Extract 'sesskey' and 'contextid'
sesskey = driver.find_element(By.NAME, 'sesskey').get_attribute('value')
contextid = driver.find_element(By.NAME, 'contextid').get_attribute('value')
logging.info(f"sesskey: {sesskey}, contextid: {contextid}")
# Extract cookies from the Selenium session
logging.info("Extracting cookies from the Selenium session.")
selenium_cookies = driver.get_cookies()
cookies = {cookie['name']: cookie['value'] for cookie in selenium_cookies}
# Prepare the HTTP POST request
download_url = 'https://moodle.fhgr.ch/course/downloadcontent.php'
post_data = {
'sesskey': sesskey,
'contextid': contextid,
'download': 1
}
headers = {
'User-Agent': 'Mozilla/5.0',
'Referer': download_page_url
}
# Send the POST request with cookies
logging.info(f"Sending HTTP POST request for course '{course_name}'.")
with requests.Session() as session:
session.cookies.update(cookies)
response = session.post(download_url, data=post_data, headers=headers, stream=True)
response.raise_for_status()
# Determine filename
filename = f"{self.sanitize_filename(course_name)}.zip"
filepath = os.path.join(self.download_dir, filename)
# Overwrite existing files
if os.path.exists(filepath):
os.remove(filepath)
logging.info(f"Overwriting existing file: {filepath}")
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
logging.info(f"Course '{course_name}' downloaded and saved as '{filename}'.")
print(f"Course '{course_name}' downloaded.")
except Exception as e:
logging.error(f"Error downloading course '{course_name}': {e}", exc_info=True)
continue
def sanitize_filename(self, name):
"""
Sanitize the filename by removing invalid characters, replacing spaces with underscores,
and truncating to a maximum length to prevent path issues.
:param name: Original filename
:return: Sanitized filename
"""
# Normalize unicode characters
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
# Remove invalid characters for filenames, including newlines
sanitized = re.sub(r'[<>:"/\\|?*\n\r]+', '', name)
# Replace spaces with underscores
sanitized = re.sub(r'\s+', '_', sanitized)
# Remove trailing underscores
sanitized = sanitized.rstrip('_')
# Truncate to a reasonable length (e.g., 200 characters)
MAX_LENGTH = 200
if len(sanitized) > MAX_LENGTH:
sanitized = sanitized[:MAX_LENGTH]
logging.warning(f"Filename truncated to {MAX_LENGTH} characters: '{sanitized}'")
return sanitized
def close(self):
"""
Close the Selenium WebDriver.
"""
if self.driver:
logging.info("Closing the browser.")
self.driver.quit()