fund_rfid_data/load_xbrl_rr.py
Florian Herzog 1993658fb2 Add SEC fund prospectus -> RDF triple dataset pipeline
Builds a relationship-rich finance dataset for text-to-RDF-triple extraction
from SEC fund disclosures, the dataset for the thesis 'Magical RDF Triples and
how to synthetize them'.

- build_rdf_dataset.py: gold (N-CEN graphs), fetch (EDGAR prospectus prose,
  all books per trust), samples (per-fund segmentation, marker + plain
  serializations), split (trust-level 80/10/10, no leakage)
- score_baseline.py: no-model string-match baseline + strong-model scorer
- dataset_description.{tex,pdf}: scientific description of the dataset
- data/rdf_poc/gold_graphs.jsonl: structured gold knowledge graph (2025Q3)
- Large prose/sample files and raw SEC downloads are gitignored (reproducible)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 10:31:35 +02:00

354 lines
13 KiB
Python

"""
Step 3: Load XBRL Risk/Return Summary data into the database.
Downloads quarterly ZIP files from SEC containing structured fee,
performance, and objective data extracted from mutual fund prospectuses.
Parses the flat files (SUB, TAG, NUM, TXT) and loads structured records
into the xbrl_fee, xbrl_performance, and xbrl_objective tables.
"""
import csv
import io
import logging
import os
import time
import zipfile
from collections import defaultdict
from pathlib import Path
import requests
from tqdm import tqdm
from fund_db import FundDatabase
log = logging.getLogger(__name__)
USER_AGENT = "FundDataResearch/1.0 research@university.edu"
HEADERS = {"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"}
REQUEST_INTERVAL = 0.12
_last_request_time = 0.0
XBRL_RR_BASE_URL = (
"https://www.sec.gov/files/dera/data/"
"mutual-fund-prospectus-risk/return-summary-data-sets"
)
FEE_TAGS = {
"MaximumSalesChargeImposedOnPurchasesOverOfferingPrice": "max_sales_charge_pct",
"MaximumDeferredSalesChargeOverOther": "max_deferred_charge_pct",
"RedemptionFeeOverRedemption": "redemption_fee_pct",
"ManagementFeesOverAssets": "management_fee_pct",
"Distribution12b1FeesOverAssets": "dist_12b1_fee_pct",
"OtherExpensesOverAssets": "other_expenses_pct",
"AcquiredFundFeesAndExpensesOverAssets": "acquired_fund_fees_pct",
"TotalAnnualFundOperatingExpensesOverAssets": "total_expenses_pct",
"FeeWaiverOrReimbursementOverAssets": "fee_waiver_pct",
"TotalAnnualFundOperatingExpensesAfterFeeWaiverOverAssets": "net_expenses_pct",
"ExpenseExampleYear01": "expense_example_1yr",
"ExpenseExampleYear03": "expense_example_3yr",
"ExpenseExampleYear05": "expense_example_5yr",
"ExpenseExampleYear10": "expense_example_10yr",
}
PERFORMANCE_TAGS = {
"AverageAnnualReturnYear01": "return_year_1",
"AverageAnnualReturnYear05": "return_year_5",
"AverageAnnualReturnYear10": "return_year_10",
"AverageAnnualReturnSinceInception": "return_since_incep",
"HighestQuarterlyReturnValue": "best_quarter_return",
"LowestQuarterlyReturnValue": "worst_quarter_return",
"AnnualTurnover": "portfolio_turnover",
}
PERFORMANCE_TEXT_TAGS = {
"HighestQuarterlyReturnLabel": "best_quarter_label",
"LowestQuarterlyReturnLabel": "worst_quarter_label",
"ShareClassInceptionDate": "inception_date",
}
OBJECTIVE_TEXT_TAGS = {
"ObjectivePrimaryTextBlock": "objective_text",
"StrategyNarrativeTextBlock": "strategy_text",
"RiskNarrativeTextBlock": "risk_text",
}
def _throttled_get(url: str, **kwargs) -> requests.Response:
global _last_request_time
elapsed = time.time() - _last_request_time
if elapsed < REQUEST_INTERVAL:
time.sleep(REQUEST_INTERVAL - elapsed)
kwargs.setdefault("headers", {}).update(HEADERS)
kwargs.setdefault("timeout", 120)
resp = requests.get(url, **kwargs)
_last_request_time = time.time()
resp.raise_for_status()
return resp
def download_xbrl_rr_zip(quarter: str, output_dir: str = "data/xbrl_rr") -> Path:
"""Download a quarterly XBRL Risk/Return ZIP from SEC."""
url = f"{XBRL_RR_BASE_URL}/{quarter}_rr1.zip"
out = Path(output_dir)
out.mkdir(parents=True, exist_ok=True)
zip_path = out / f"{quarter}_rr1.zip"
extract_dir = out / quarter
if extract_dir.exists() and any(extract_dir.iterdir()):
log.info("Already extracted: %s", extract_dir)
return extract_dir
log.info("Downloading XBRL Risk/Return: %s", url)
resp = _throttled_get(url, stream=True)
with open(zip_path, "wb") as fp:
for chunk in resp.iter_content(chunk_size=65536):
fp.write(chunk)
with zipfile.ZipFile(zip_path, "r") as zf:
zf.extractall(extract_dir)
log.info("Extracted to %s", extract_dir)
return extract_dir
def _read_tsv(filepath: Path) -> list[dict]:
"""Read a TSV file into a list of dicts."""
if not filepath.exists():
log.warning("File not found: %s", filepath)
return []
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
reader = csv.DictReader(f, delimiter="\t")
return list(reader)
def parse_and_load_xbrl_rr(db: FundDatabase, extract_dir: Path, quarter: str):
"""
Parse the XBRL Risk/Return flat files and load into the database.
The data set has:
- sub.tsv: submission metadata (accession, CIK, filing date, etc.)
- num.tsv: numeric values (tag, value, per accession + dimension)
- txt.tsv: text values (tag, value, per accession + dimension)
"""
sub_file = extract_dir / "sub.tsv"
num_file = extract_dir / "num.tsv"
txt_file = extract_dir / "txt.tsv"
for f in [sub_file, num_file]:
if not f.exists():
sub_file = next(extract_dir.glob("*sub*"), None)
num_file = next(extract_dir.glob("*num*"), None)
txt_file = next(extract_dir.glob("*txt*"), None)
break
if not sub_file or not sub_file.exists():
log.error("Cannot find sub file in %s", extract_dir)
return
log.info("Reading submission metadata from %s", sub_file)
subs = _read_tsv(sub_file)
sub_map = {}
for s in subs:
adsh = s.get("adsh", "").strip()
if adsh:
sub_map[adsh] = {
"cik": str(s.get("cik", "")).strip().zfill(10),
"filing_date": s.get("filed", "").strip(),
"form": s.get("form", "").strip(),
"name": s.get("name", "").strip(),
}
log.info("Reading numeric data from %s", num_file)
nums = _read_tsv(num_file) if num_file and num_file.exists() else []
fee_data = defaultdict(dict)
perf_data = defaultdict(dict)
for row in tqdm(nums, desc="Parsing numeric data"):
adsh = row.get("adsh", "").strip()
tag = row.get("tag", "").strip()
value_str = row.get("value", "").strip()
if not adsh or not tag or not value_str:
continue
try:
value = float(value_str)
except (ValueError, TypeError):
continue
sub_info = sub_map.get(adsh, {})
cik = sub_info.get("cik", "")
key = (adsh, cik)
if tag in FEE_TAGS:
fee_data[key][FEE_TAGS[tag]] = value
fee_data[key]["_sub"] = sub_info
elif tag in PERFORMANCE_TAGS:
perf_data[key][PERFORMANCE_TAGS[tag]] = value
perf_data[key]["_sub"] = sub_info
log.info("Reading text data from %s", txt_file)
txts = _read_tsv(txt_file) if txt_file and txt_file.exists() else []
obj_data = defaultdict(dict)
for row in tqdm(txts, desc="Parsing text data"):
adsh = row.get("adsh", "").strip()
tag = row.get("tag", "").strip()
value = row.get("value", "").strip()
if not adsh or not tag or not value:
continue
sub_info = sub_map.get(adsh, {})
cik = sub_info.get("cik", "")
key = (adsh, cik)
if tag in PERFORMANCE_TEXT_TAGS:
perf_data[key][PERFORMANCE_TEXT_TAGS[tag]] = value
perf_data[key]["_sub"] = sub_info
elif tag in OBJECTIVE_TEXT_TAGS:
obj_data[key][OBJECTIVE_TEXT_TAGS[tag]] = value
obj_data[key]["_sub"] = sub_info
fee_count = 0
perf_count = 0
obj_count = 0
with db.conn() as c:
for (adsh, cik), vals in fee_data.items():
sub_info = vals.pop("_sub", {})
try:
c.execute("""
INSERT OR REPLACE INTO xbrl_fee
(accession_number, cik, filing_date, fund_name,
max_sales_charge_pct, max_deferred_charge_pct,
redemption_fee_pct, management_fee_pct,
dist_12b1_fee_pct, other_expenses_pct,
acquired_fund_fees_pct, total_expenses_pct,
fee_waiver_pct, net_expenses_pct,
expense_example_1yr, expense_example_3yr,
expense_example_5yr, expense_example_10yr)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
adsh, cik, sub_info.get("filing_date", ""),
sub_info.get("name", ""),
vals.get("max_sales_charge_pct"),
vals.get("max_deferred_charge_pct"),
vals.get("redemption_fee_pct"),
vals.get("management_fee_pct"),
vals.get("dist_12b1_fee_pct"),
vals.get("other_expenses_pct"),
vals.get("acquired_fund_fees_pct"),
vals.get("total_expenses_pct"),
vals.get("fee_waiver_pct"),
vals.get("net_expenses_pct"),
vals.get("expense_example_1yr"),
vals.get("expense_example_3yr"),
vals.get("expense_example_5yr"),
vals.get("expense_example_10yr"),
))
fee_count += 1
except Exception as e:
log.debug("Fee insert error for %s: %s", adsh, e)
for (adsh, cik), vals in perf_data.items():
sub_info = vals.pop("_sub", {})
try:
c.execute("""
INSERT OR REPLACE INTO xbrl_performance
(accession_number, cik, filing_date, fund_name,
inception_date,
return_year_1, return_year_5, return_year_10,
return_since_incep,
best_quarter_return, best_quarter_label,
worst_quarter_return, worst_quarter_label,
portfolio_turnover)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
adsh, cik, sub_info.get("filing_date", ""),
sub_info.get("name", ""),
vals.get("inception_date"),
vals.get("return_year_1"),
vals.get("return_year_5"),
vals.get("return_year_10"),
vals.get("return_since_incep"),
vals.get("best_quarter_return"),
vals.get("best_quarter_label"),
vals.get("worst_quarter_return"),
vals.get("worst_quarter_label"),
vals.get("portfolio_turnover"),
))
perf_count += 1
except Exception as e:
log.debug("Performance insert error for %s: %s", adsh, e)
for (adsh, cik), vals in obj_data.items():
sub_info = vals.pop("_sub", {})
try:
c.execute("""
INSERT OR REPLACE INTO xbrl_objective
(accession_number, cik, filing_date, fund_name,
objective_text, strategy_text, risk_text)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
adsh, cik, sub_info.get("filing_date", ""),
sub_info.get("name", ""),
vals.get("objective_text"),
vals.get("strategy_text"),
vals.get("risk_text"),
))
obj_count += 1
except Exception as e:
log.debug("Objective insert error for %s: %s", adsh, e)
log.info("Loaded %d fee records, %d performance records, %d objective records",
fee_count, perf_count, obj_count)
db.record_bulk_download("xbrl_rr", quarter, str(extract_dir), fee_count + perf_count + obj_count)
return fee_count, perf_count, obj_count
def main():
import argparse
logging.basicConfig(level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s")
parser = argparse.ArgumentParser(description="Load XBRL Risk/Return data")
parser.add_argument("--db", default="fund_data.db", help="Database path")
parser.add_argument("--quarters", nargs="+",
default=["2025q1", "2025q2", "2025q3", "2025q4"],
help="Quarters to download (e.g. 2025q1 2025q2)")
parser.add_argument("--data-dir", default="data/xbrl_rr",
help="Directory for downloaded files")
args = parser.parse_args()
db = FundDatabase(args.db)
total_fees = 0
total_perf = 0
total_obj = 0
for quarter in args.quarters:
print(f"\n{'='*60}")
print(f"Processing {quarter}")
print(f"{'='*60}")
try:
extract_dir = download_xbrl_rr_zip(quarter, args.data_dir)
f, p, o = parse_and_load_xbrl_rr(db, extract_dir, quarter)
total_fees += f
total_perf += p
total_obj += o
except Exception as e:
log.error("Failed to process %s: %s", quarter, e)
print(f"\nTotal loaded: {total_fees} fee records, {total_perf} performance, {total_obj} objectives")
stats = db.get_stats()
print(f"\nDatabase stats:")
for table, count in stats.items():
if count > 0:
print(f" {table:30s} {count:>10,}")
if __name__ == "__main__":
main()