347 lines
14 KiB
Python
347 lines
14 KiB
Python
![]() |
# moodle_downloader.py
|
||
|
|
||
|
import os
|
||
|
import re
|
||
|
import logging
|
||
|
import requests
|
||
|
import unicodedata
|
||
|
from selenium import webdriver
|
||
|
from selenium.webdriver.chrome.options import Options
|
||
|
from selenium.webdriver.common.by import By
|
||
|
from selenium.webdriver.common.keys import Keys
|
||
|
from selenium.webdriver.support.ui import WebDriverWait
|
||
|
from selenium.webdriver.support import expected_conditions as EC
|
||
|
from selenium.common.exceptions import TimeoutException
|
||
|
from selenium.webdriver.chrome.service import Service as ChromeService
|
||
|
from webdriver_manager.chrome import ChromeDriverManager
|
||
|
|
||
|
|
||
|
class MoodleDownloader:
|
||
|
def __init__(self, username, password, download_dir, headless=False):
|
||
|
"""
|
||
|
Initialize the MoodleDownloader.
|
||
|
|
||
|
:param username: Moodle username
|
||
|
:param password: Moodle password
|
||
|
:param download_dir: Directory to download ZIP files
|
||
|
:param headless: Run browser in headless mode
|
||
|
"""
|
||
|
self.username = username
|
||
|
self.password = password
|
||
|
self.download_dir = download_dir # Set externally to use system temp
|
||
|
self.headless = headless
|
||
|
self.driver = None
|
||
|
self.courses = []
|
||
|
self.LOGIN_URL = 'https://moodle.fhgr.ch/login/index.php'
|
||
|
self.MY_COURSES_URL = 'https://moodle.fhgr.ch/my/courses.php'
|
||
|
|
||
|
def setup_driver(self):
|
||
|
"""
|
||
|
Set up the Selenium WebDriver with Chrome options.
|
||
|
"""
|
||
|
chrome_options = Options()
|
||
|
if self.headless:
|
||
|
chrome_options.add_argument('--headless') # Headless mode
|
||
|
chrome_options.add_argument('--no-sandbox')
|
||
|
chrome_options.add_argument('--disable-dev-shm-usage')
|
||
|
chrome_options.add_argument('--disable-gpu') # Optional
|
||
|
chrome_options.add_argument('--window-size=1920,1080')
|
||
|
|
||
|
# Preferences for downloads
|
||
|
prefs = {
|
||
|
"download.default_directory": self.download_dir,
|
||
|
"download.prompt_for_download": False,
|
||
|
"download.directory_upgrade": True,
|
||
|
"safebrowsing.enabled": True,
|
||
|
"profile.default_content_setting_values.automatic_downloads": 1,
|
||
|
}
|
||
|
chrome_options.add_experimental_option("prefs", prefs)
|
||
|
|
||
|
# Initialize WebDriver
|
||
|
logging.info("Initializing the WebDriver.")
|
||
|
self.driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
|
||
|
|
||
|
def login(self):
|
||
|
"""
|
||
|
Log in to Moodle.
|
||
|
"""
|
||
|
self.setup_driver()
|
||
|
driver = self.driver
|
||
|
try:
|
||
|
# Open Moodle login page
|
||
|
logging.info(f"Opening Moodle login page: {self.LOGIN_URL}")
|
||
|
driver.get(self.LOGIN_URL)
|
||
|
|
||
|
# Wait until the page is loaded
|
||
|
WebDriverWait(driver, 15).until(
|
||
|
EC.presence_of_element_located((By.TAG_NAME, 'body'))
|
||
|
)
|
||
|
logging.info("Moodle login page loaded.")
|
||
|
|
||
|
# Check for 'wayf_submit_button' and click if present
|
||
|
try:
|
||
|
logging.info("Checking for 'wayf_submit_button'.")
|
||
|
wayf_button = WebDriverWait(driver, 5).until(
|
||
|
EC.element_to_be_clickable((By.ID, 'wayf_submit_button'))
|
||
|
)
|
||
|
wayf_button.click()
|
||
|
logging.info("'wayf_submit_button' found and clicked.")
|
||
|
|
||
|
# Wait for redirection to login page
|
||
|
WebDriverWait(driver, 10).until(
|
||
|
EC.presence_of_element_located((By.ID, 'username'))
|
||
|
)
|
||
|
except TimeoutException:
|
||
|
logging.info("'wayf_submit_button' not found. Continuing with login.")
|
||
|
|
||
|
# Wait for username and password fields
|
||
|
logging.info("Waiting for the username field.")
|
||
|
username_field = WebDriverWait(driver, 20).until(
|
||
|
EC.presence_of_element_located((By.ID, 'username'))
|
||
|
)
|
||
|
|
||
|
logging.info("Waiting for the password field.")
|
||
|
password_field = WebDriverWait(driver, 20).until(
|
||
|
EC.presence_of_element_located((By.ID, 'password'))
|
||
|
)
|
||
|
|
||
|
# Enter login credentials
|
||
|
logging.info("Entering login credentials.")
|
||
|
username_field.send_keys(self.username)
|
||
|
password_field.send_keys(self.password)
|
||
|
|
||
|
# Submit the form
|
||
|
logging.info("Submitting the login form.")
|
||
|
password_field.send_keys(Keys.RETURN)
|
||
|
|
||
|
# Wait for login to complete
|
||
|
logging.info("Waiting for login to complete.")
|
||
|
WebDriverWait(driver, 30).until(
|
||
|
EC.url_changes(self.LOGIN_URL)
|
||
|
)
|
||
|
logging.info("Login successful.")
|
||
|
except Exception as e:
|
||
|
logging.error("An error occurred during login.", exc_info=True)
|
||
|
raise e
|
||
|
|
||
|
def get_courses(self):
|
||
|
"""
|
||
|
Retrieve the list of courses from Moodle.
|
||
|
"""
|
||
|
driver = self.driver
|
||
|
try:
|
||
|
# Navigate to "My Courses" page
|
||
|
logging.info(f"Navigating to 'My Courses' page: {self.MY_COURSES_URL}")
|
||
|
driver.get(self.MY_COURSES_URL)
|
||
|
|
||
|
# Wait until the page is loaded
|
||
|
logging.info("Waiting for the 'My Courses' page to load.")
|
||
|
WebDriverWait(driver, 20).until(
|
||
|
EC.presence_of_element_located((By.CSS_SELECTOR, 'a.aalink.coursename'))
|
||
|
)
|
||
|
logging.info("'My Courses' page loaded.")
|
||
|
|
||
|
# Collect all courses from the page
|
||
|
logging.info("Collecting all courses from 'My Courses' page.")
|
||
|
course_elements = driver.find_elements(By.CSS_SELECTOR, 'a.aalink.coursename')
|
||
|
|
||
|
logging.info(f"{len(course_elements)} courses found.")
|
||
|
|
||
|
existing_urls = set()
|
||
|
for coursename_element in course_elements:
|
||
|
try:
|
||
|
# Extract course name from the nested span
|
||
|
course_name_element = coursename_element.find_element(By.CSS_SELECTOR, 'span.multiline span[aria-hidden="true"]')
|
||
|
course_title = course_name_element.text.strip()
|
||
|
logging.debug(f"Course title extracted: '{course_title}'")
|
||
|
|
||
|
# Extract semester from the sibling div
|
||
|
parent_div = coursename_element.find_element(By.XPATH, '..') # Navigate to parent div
|
||
|
category_span = parent_div.find_element(By.CSS_SELECTOR, 'span.categoryname.text-truncate')
|
||
|
semester = category_span.text.strip()
|
||
|
logging.debug(f"Semester extracted: '{semester}'")
|
||
|
|
||
|
# Extract course info
|
||
|
course_info = self.extract_course_info(course_title)
|
||
|
|
||
|
course_url = coursename_element.get_attribute('href')
|
||
|
|
||
|
# Check for duplicates
|
||
|
if course_url in existing_urls:
|
||
|
logging.info(f"Duplicate course found: {course_info['course_name']} - {course_url}")
|
||
|
continue
|
||
|
existing_urls.add(course_url)
|
||
|
|
||
|
self.courses.append({
|
||
|
'Semester': self.sanitize_semester(semester),
|
||
|
'CourseName': course_info['course_name'],
|
||
|
'URL': course_url
|
||
|
})
|
||
|
logging.info(f"Course found: {course_info['course_name']} - {course_url}")
|
||
|
except Exception as e:
|
||
|
logging.warning(f"Error extracting course: {e}")
|
||
|
continue
|
||
|
|
||
|
if not self.courses:
|
||
|
logging.warning("No courses found. Check the HTML structure of the 'My Courses' page.")
|
||
|
print("No courses found. Check the HTML structure of the 'My Courses' page.")
|
||
|
except Exception as e:
|
||
|
logging.error("An error occurred while retrieving courses.", exc_info=True)
|
||
|
raise e
|
||
|
|
||
|
def sanitize_semester(self, semester):
|
||
|
"""
|
||
|
Sanitize the semester name by replacing spaces with underscores and removing trailing underscores.
|
||
|
|
||
|
:param semester: Original semester string
|
||
|
:return: Sanitized semester string
|
||
|
"""
|
||
|
sanitized = re.sub(r'\s+', '_', semester).strip('_')
|
||
|
logging.debug(f"Sanitized semester: '{sanitized}'")
|
||
|
return sanitized
|
||
|
|
||
|
def extract_course_info(self, course_title):
|
||
|
"""
|
||
|
Extract course information from the course title.
|
||
|
|
||
|
:param course_title: Full course title string (e.g., 'Algorithmen und Datenstrukturen (cds-203) HS24')
|
||
|
:return: Dictionary with 'course_name'
|
||
|
"""
|
||
|
# Remove the semester from the course title
|
||
|
# Example: 'Algorithmen und Datenstrukturen (cds-203) HS24' -> 'Algorithmen und Datenstrukturen (cds-203)'
|
||
|
pattern = r'^(.*?)\s*\(([^)]+)\)\s*\w+\d*$'
|
||
|
match = re.search(pattern, course_title)
|
||
|
if match:
|
||
|
course_full_name = match.group(1).strip()
|
||
|
course_code = match.group(2).strip()
|
||
|
course_name = f"{course_full_name} ({course_code})"
|
||
|
return {
|
||
|
'course_name': course_name
|
||
|
}
|
||
|
else:
|
||
|
# Handle cases where the pattern doesn't match
|
||
|
sanitized_title = self.sanitize_filename(course_title)
|
||
|
return {
|
||
|
'course_name': sanitized_title
|
||
|
}
|
||
|
|
||
|
def download_all_courses(self):
|
||
|
"""
|
||
|
Download all courses as ZIP files.
|
||
|
"""
|
||
|
if not self.courses:
|
||
|
logging.warning("No courses to download.")
|
||
|
return
|
||
|
|
||
|
driver = self.driver
|
||
|
|
||
|
# Ensure the download directory exists
|
||
|
if not os.path.exists(self.download_dir):
|
||
|
os.makedirs(self.download_dir)
|
||
|
logging.info(f"Created download directory: {self.download_dir}")
|
||
|
|
||
|
for course in self.courses:
|
||
|
course_name = course['CourseName']
|
||
|
course_url = course['URL']
|
||
|
logging.info(f"Processing course: {course_name} - {course_url}")
|
||
|
driver.get(course_url)
|
||
|
|
||
|
# Wait until the course page is loaded
|
||
|
WebDriverWait(driver, 20).until(
|
||
|
EC.presence_of_element_located((By.TAG_NAME, 'body'))
|
||
|
)
|
||
|
|
||
|
try:
|
||
|
logging.info("Searching for 'Download course content' link.")
|
||
|
download_link = driver.find_element(By.CSS_SELECTOR, 'a[data-downloadcourse="1"]')
|
||
|
download_page_url = download_link.get_attribute('href')
|
||
|
logging.info(f"Download page URL: {download_page_url}")
|
||
|
|
||
|
# Open the download page
|
||
|
driver.get(download_page_url)
|
||
|
|
||
|
# Wait until the page is loaded
|
||
|
WebDriverWait(driver, 10).until(
|
||
|
EC.presence_of_element_located((By.NAME, 'sesskey'))
|
||
|
)
|
||
|
|
||
|
# Extract 'sesskey' and 'contextid'
|
||
|
sesskey = driver.find_element(By.NAME, 'sesskey').get_attribute('value')
|
||
|
contextid = driver.find_element(By.NAME, 'contextid').get_attribute('value')
|
||
|
|
||
|
logging.info(f"sesskey: {sesskey}, contextid: {contextid}")
|
||
|
|
||
|
# Extract cookies from the Selenium session
|
||
|
logging.info("Extracting cookies from the Selenium session.")
|
||
|
selenium_cookies = driver.get_cookies()
|
||
|
cookies = {cookie['name']: cookie['value'] for cookie in selenium_cookies}
|
||
|
|
||
|
# Prepare the HTTP POST request
|
||
|
download_url = 'https://moodle.fhgr.ch/course/downloadcontent.php'
|
||
|
post_data = {
|
||
|
'sesskey': sesskey,
|
||
|
'contextid': contextid,
|
||
|
'download': 1
|
||
|
}
|
||
|
headers = {
|
||
|
'User-Agent': 'Mozilla/5.0',
|
||
|
'Referer': download_page_url
|
||
|
}
|
||
|
|
||
|
# Send the POST request with cookies
|
||
|
logging.info(f"Sending HTTP POST request for course '{course_name}'.")
|
||
|
with requests.Session() as session:
|
||
|
session.cookies.update(cookies)
|
||
|
response = session.post(download_url, data=post_data, headers=headers, stream=True)
|
||
|
response.raise_for_status()
|
||
|
|
||
|
# Determine filename
|
||
|
filename = f"{self.sanitize_filename(course_name)}.zip"
|
||
|
filepath = os.path.join(self.download_dir, filename)
|
||
|
|
||
|
# Overwrite existing files
|
||
|
if os.path.exists(filepath):
|
||
|
os.remove(filepath)
|
||
|
logging.info(f"Overwriting existing file: {filepath}")
|
||
|
|
||
|
with open(filepath, 'wb') as f:
|
||
|
for chunk in response.iter_content(chunk_size=8192):
|
||
|
f.write(chunk)
|
||
|
|
||
|
logging.info(f"Course '{course_name}' downloaded and saved as '{filename}'.")
|
||
|
print(f"Course '{course_name}' downloaded.")
|
||
|
except Exception as e:
|
||
|
logging.error(f"Error downloading course '{course_name}': {e}", exc_info=True)
|
||
|
continue
|
||
|
|
||
|
def sanitize_filename(self, name):
|
||
|
"""
|
||
|
Sanitize the filename by removing invalid characters, replacing spaces with underscores,
|
||
|
and truncating to a maximum length to prevent path issues.
|
||
|
|
||
|
:param name: Original filename
|
||
|
:return: Sanitized filename
|
||
|
"""
|
||
|
# Normalize unicode characters
|
||
|
name = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore').decode('ASCII')
|
||
|
# Remove invalid characters for filenames, including newlines
|
||
|
sanitized = re.sub(r'[<>:"/\\|?*\n\r]+', '', name)
|
||
|
# Replace spaces with underscores
|
||
|
sanitized = re.sub(r'\s+', '_', sanitized)
|
||
|
# Remove trailing underscores
|
||
|
sanitized = sanitized.rstrip('_')
|
||
|
# Truncate to a reasonable length (e.g., 200 characters)
|
||
|
MAX_LENGTH = 200
|
||
|
if len(sanitized) > MAX_LENGTH:
|
||
|
sanitized = sanitized[:MAX_LENGTH]
|
||
|
logging.warning(f"Filename truncated to {MAX_LENGTH} characters: '{sanitized}'")
|
||
|
return sanitized
|
||
|
|
||
|
def close(self):
|
||
|
"""
|
||
|
Close the Selenium WebDriver.
|
||
|
"""
|
||
|
if self.driver:
|
||
|
logging.info("Closing the browser.")
|
||
|
self.driver.quit()
|