""" librarian_scraper.crawler.crawler --------------------------------- Scrapes Moodle degree programmes into CrawlData. • Hero images • Polite throttling / batching • Term-filter: only the latest two terms (dev) • USER_SPECIFIC flag to keep / drop inaccessible courses """ from __future__ import annotations import json import os import re import time from datetime import timedelta from typing import List, Tuple import sys import asyncio if sys.platform == "win32": # Switch from Selector to Proactor so asyncio.subprocess works asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) import httpx import parsel from librarian_core.utils.path_utils import get_cache_root from librarian_core.workers.base import Worker from prefect import get_run_logger, task from prefect.futures import wait from librarian_scraper.constants import CRAWLER, PRIVATE_URLS, PUBLIC_URLS from librarian_scraper.crawler.cookie_crawler import CookieCrawler from librarian_scraper.models.crawl_data import ( CrawlCourse, CrawlData, CrawlProgram, CrawlTerm, ) _COOKIE_JAR: httpx.Cookies | None = None _DELAY: float = 0.0 CACHE_FILE = get_cache_root() / "librarian_no_access_cache.json" def looks_like_enrol(resp: httpx.Response) -> bool: txt = resp.text.lower() return ( "login" in str(resp.url).lower() or "#page-enrol" in txt or "you need to enrol" in txt ) class Crawler(Worker[CrawlProgram, CrawlData]): input_model = CrawlProgram output_model = CrawlData RELAXED: bool USER_SPECIFIC: bool CLEAR_CACHE: bool async def __run__(self, program: CrawlProgram) -> CrawlData: global _COOKIE_JAR, _DELAY lg = get_run_logger() self.RELAXED = os.getenv("SCRAPER_RELAXED", "true").lower() == "true" self.USER_SPECIFIC = os.getenv("SCRAPER_USER_SPECIFIC", "true").lower() == "true" self.CLEAR_CACHE = os.getenv("SCRAPER_CLEAR_CACHE", "false").lower() == "true" _DELAY = CRAWLER["DELAY_SLOW"] if self.RELAXED else CRAWLER["DELAY_FAST"] batch = CRAWLER["BATCH_SLOW"] if self.RELAXED else CRAWLER["BATCH_FAST"] lg.info( "Mode=%s user_specific=%s delay=%.1fs batch=%s", "RELAXED" if self.RELAXED else "FAST", self.USER_SPECIFIC, _DELAY, batch, ) cookies, _ = await CookieCrawler().crawl() _COOKIE_JAR = cookies self._client = httpx.Client(cookies=cookies, follow_redirects=True) if not self._logged_in(): lg.error("Guest session detected – aborting crawl.") raise RuntimeError("Login failed") no_access: set[str] = set() if self.CLEAR_CACHE else self._load_cache() terms = self._crawl_terms(program.id)[:2] lg.info("Terms discovered: %d", len(terms)) for term in terms: courses = self._crawl_courses(term.id) lg.info("[%s] raw courses: %d", term.name, len(courses)) for i in range(0, len(courses), batch): futs = [ self._crawl_course_task.submit(course.id) for course in courses[i : i + batch] ] done, _ = wait(futs) for fut in done: cid, res_id = fut.result() if res_id: next( c for c in courses if c.id == cid ).content_ressource_id = res_id else: no_access.add(cid) term.courses = ( [c for c in courses if c.content_ressource_id] if self.USER_SPECIFIC else courses ) lg.info("[%s] kept: %d", term.name, len(term.courses)) self._save_cache(no_access) return CrawlData( degree_program=CrawlProgram( id=program.id, name=program.name, terms=[t for t in terms if t.courses], ) ) @staticmethod @task( name="crawl_course", retries=2, retry_delay_seconds=5, log_prints=True, cache_expiration=timedelta(days=1), ) def _crawl_course_task(course_id: str) -> Tuple[str, str]: """ Returns (course_id, content_resource_id or ""). Never raises; logs reasons instead. """ lg = get_run_logger() assert _COOKIE_JAR is not None url = PRIVATE_URLS.course(course_id) for attempt in (1, 2): try: r = httpx.get( url, cookies=_COOKIE_JAR, follow_redirects=True, timeout=30 ) r.raise_for_status() time.sleep(_DELAY) break except Exception as exc: lg.warning("GET %s failed (%s) attempt %d/2", url, exc, attempt) time.sleep(_DELAY) else: lg.warning("Course %s unreachable.", course_id) return course_id, "" if looks_like_enrol(r): lg.info("No access to course %s (enrol / login page).", course_id) return course_id, "" href = ( parsel.Selector(r.text) .css('a[data-downloadcourse="1"]::attr(href)') .get("") ) if not href: lg.info("Course %s has no downloadable content.", course_id) return course_id, "" return course_id, href.split("=")[-1] def _logged_in(self) -> bool: html = self._get_html(PUBLIC_URLS.index) return not parsel.Selector(text=html).css("div.usermenu span.login a") def _crawl_terms(self, dp_id: str) -> List[CrawlTerm]: html = self._get_html(PUBLIC_URLS.degree_program(dp_id)) sel = parsel.Selector(text=html) out = [] for a in sel.css("div.category h3.categoryname a"): name = a.xpath("text()").get("").strip() if re.match(r"^(FS|HS)\d{2}$", name): out.append( CrawlTerm(name=name, id=a.xpath("@href").get("").split("=")[-1]) ) order = {"FS": 0, "HS": 1} return sorted( out, key=lambda t: (2000 + int(t.name[2:]), order[t.name[:2]]), reverse=True ) def _crawl_courses(self, term_id: str) -> List[CrawlCourse]: html = self._get_html(PUBLIC_URLS.term(term_id)) sel = parsel.Selector(text=html) courses = [] for box in sel.css("div.coursebox"): anchor = box.css("h3.coursename a") if not anchor: continue cid = anchor.attrib.get("href", "").split("=")[-1] raw = anchor.xpath("text()").get("").strip() name = re.sub(r"\s*(FS|HS)\d{2}\s*", "", raw) name = re.sub(r"\s*\(.*?\)\s*", "", name).strip() hero = box.css("div.courseimage img::attr(src)").get("") or "" courses.append(CrawlCourse(id=cid, name=name, hero_image=hero)) return courses def _get_html(self, url: str) -> str: try: r = self._client.get(url, timeout=30) r.raise_for_status() time.sleep(_DELAY) return r.text except Exception as exc: get_run_logger().warning("GET %s failed (%s)", url, exc) return "" @staticmethod def _load_cache() -> set[str]: try: return set(json.loads(CACHE_FILE.read_text())) except Exception: return set() @staticmethod def _save_cache(cache: set[str]) -> None: try: CACHE_FILE.write_text(json.dumps(sorted(cache), indent=2)) except Exception as exc: get_run_logger().warning("Could not save cache: %s", exc)