Builds a relationship-rich finance dataset for text-to-RDF-triple extraction
from SEC fund disclosures, the dataset for the thesis 'Magical RDF Triples and
how to synthetize them'.
- build_rdf_dataset.py: gold (N-CEN graphs), fetch (EDGAR prospectus prose,
all books per trust), samples (per-fund segmentation, marker + plain
serializations), split (trust-level 80/10/10, no leakage)
- score_baseline.py: no-model string-match baseline + strong-model scorer
- dataset_description.{tex,pdf}: scientific description of the dataset
- data/rdf_poc/gold_graphs.jsonl: structured gold knowledge graph (2025Q3)
- Large prose/sample files and raw SEC downloads are gitignored (reproducible)
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
354 lines
13 KiB
Python
354 lines
13 KiB
Python
"""
|
|
Step 3: Load XBRL Risk/Return Summary data into the database.
|
|
|
|
Downloads quarterly ZIP files from SEC containing structured fee,
|
|
performance, and objective data extracted from mutual fund prospectuses.
|
|
Parses the flat files (SUB, TAG, NUM, TXT) and loads structured records
|
|
into the xbrl_fee, xbrl_performance, and xbrl_objective tables.
|
|
"""
|
|
|
|
import csv
|
|
import io
|
|
import logging
|
|
import os
|
|
import time
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from pathlib import Path
|
|
|
|
import requests
|
|
from tqdm import tqdm
|
|
|
|
from fund_db import FundDatabase
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
USER_AGENT = "FundDataResearch/1.0 research@university.edu"
|
|
HEADERS = {"User-Agent": USER_AGENT, "Accept-Encoding": "gzip, deflate"}
|
|
REQUEST_INTERVAL = 0.12
|
|
_last_request_time = 0.0
|
|
|
|
XBRL_RR_BASE_URL = (
|
|
"https://www.sec.gov/files/dera/data/"
|
|
"mutual-fund-prospectus-risk/return-summary-data-sets"
|
|
)
|
|
|
|
FEE_TAGS = {
|
|
"MaximumSalesChargeImposedOnPurchasesOverOfferingPrice": "max_sales_charge_pct",
|
|
"MaximumDeferredSalesChargeOverOther": "max_deferred_charge_pct",
|
|
"RedemptionFeeOverRedemption": "redemption_fee_pct",
|
|
"ManagementFeesOverAssets": "management_fee_pct",
|
|
"Distribution12b1FeesOverAssets": "dist_12b1_fee_pct",
|
|
"OtherExpensesOverAssets": "other_expenses_pct",
|
|
"AcquiredFundFeesAndExpensesOverAssets": "acquired_fund_fees_pct",
|
|
"TotalAnnualFundOperatingExpensesOverAssets": "total_expenses_pct",
|
|
"FeeWaiverOrReimbursementOverAssets": "fee_waiver_pct",
|
|
"TotalAnnualFundOperatingExpensesAfterFeeWaiverOverAssets": "net_expenses_pct",
|
|
"ExpenseExampleYear01": "expense_example_1yr",
|
|
"ExpenseExampleYear03": "expense_example_3yr",
|
|
"ExpenseExampleYear05": "expense_example_5yr",
|
|
"ExpenseExampleYear10": "expense_example_10yr",
|
|
}
|
|
|
|
PERFORMANCE_TAGS = {
|
|
"AverageAnnualReturnYear01": "return_year_1",
|
|
"AverageAnnualReturnYear05": "return_year_5",
|
|
"AverageAnnualReturnYear10": "return_year_10",
|
|
"AverageAnnualReturnSinceInception": "return_since_incep",
|
|
"HighestQuarterlyReturnValue": "best_quarter_return",
|
|
"LowestQuarterlyReturnValue": "worst_quarter_return",
|
|
"AnnualTurnover": "portfolio_turnover",
|
|
}
|
|
|
|
PERFORMANCE_TEXT_TAGS = {
|
|
"HighestQuarterlyReturnLabel": "best_quarter_label",
|
|
"LowestQuarterlyReturnLabel": "worst_quarter_label",
|
|
"ShareClassInceptionDate": "inception_date",
|
|
}
|
|
|
|
OBJECTIVE_TEXT_TAGS = {
|
|
"ObjectivePrimaryTextBlock": "objective_text",
|
|
"StrategyNarrativeTextBlock": "strategy_text",
|
|
"RiskNarrativeTextBlock": "risk_text",
|
|
}
|
|
|
|
|
|
def _throttled_get(url: str, **kwargs) -> requests.Response:
|
|
global _last_request_time
|
|
elapsed = time.time() - _last_request_time
|
|
if elapsed < REQUEST_INTERVAL:
|
|
time.sleep(REQUEST_INTERVAL - elapsed)
|
|
kwargs.setdefault("headers", {}).update(HEADERS)
|
|
kwargs.setdefault("timeout", 120)
|
|
resp = requests.get(url, **kwargs)
|
|
_last_request_time = time.time()
|
|
resp.raise_for_status()
|
|
return resp
|
|
|
|
|
|
def download_xbrl_rr_zip(quarter: str, output_dir: str = "data/xbrl_rr") -> Path:
|
|
"""Download a quarterly XBRL Risk/Return ZIP from SEC."""
|
|
url = f"{XBRL_RR_BASE_URL}/{quarter}_rr1.zip"
|
|
out = Path(output_dir)
|
|
out.mkdir(parents=True, exist_ok=True)
|
|
zip_path = out / f"{quarter}_rr1.zip"
|
|
extract_dir = out / quarter
|
|
|
|
if extract_dir.exists() and any(extract_dir.iterdir()):
|
|
log.info("Already extracted: %s", extract_dir)
|
|
return extract_dir
|
|
|
|
log.info("Downloading XBRL Risk/Return: %s", url)
|
|
resp = _throttled_get(url, stream=True)
|
|
with open(zip_path, "wb") as fp:
|
|
for chunk in resp.iter_content(chunk_size=65536):
|
|
fp.write(chunk)
|
|
|
|
with zipfile.ZipFile(zip_path, "r") as zf:
|
|
zf.extractall(extract_dir)
|
|
log.info("Extracted to %s", extract_dir)
|
|
return extract_dir
|
|
|
|
|
|
def _read_tsv(filepath: Path) -> list[dict]:
|
|
"""Read a TSV file into a list of dicts."""
|
|
if not filepath.exists():
|
|
log.warning("File not found: %s", filepath)
|
|
return []
|
|
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
|
|
reader = csv.DictReader(f, delimiter="\t")
|
|
return list(reader)
|
|
|
|
|
|
def parse_and_load_xbrl_rr(db: FundDatabase, extract_dir: Path, quarter: str):
|
|
"""
|
|
Parse the XBRL Risk/Return flat files and load into the database.
|
|
|
|
The data set has:
|
|
- sub.tsv: submission metadata (accession, CIK, filing date, etc.)
|
|
- num.tsv: numeric values (tag, value, per accession + dimension)
|
|
- txt.tsv: text values (tag, value, per accession + dimension)
|
|
"""
|
|
sub_file = extract_dir / "sub.tsv"
|
|
num_file = extract_dir / "num.tsv"
|
|
txt_file = extract_dir / "txt.tsv"
|
|
|
|
for f in [sub_file, num_file]:
|
|
if not f.exists():
|
|
sub_file = next(extract_dir.glob("*sub*"), None)
|
|
num_file = next(extract_dir.glob("*num*"), None)
|
|
txt_file = next(extract_dir.glob("*txt*"), None)
|
|
break
|
|
|
|
if not sub_file or not sub_file.exists():
|
|
log.error("Cannot find sub file in %s", extract_dir)
|
|
return
|
|
|
|
log.info("Reading submission metadata from %s", sub_file)
|
|
subs = _read_tsv(sub_file)
|
|
sub_map = {}
|
|
for s in subs:
|
|
adsh = s.get("adsh", "").strip()
|
|
if adsh:
|
|
sub_map[adsh] = {
|
|
"cik": str(s.get("cik", "")).strip().zfill(10),
|
|
"filing_date": s.get("filed", "").strip(),
|
|
"form": s.get("form", "").strip(),
|
|
"name": s.get("name", "").strip(),
|
|
}
|
|
|
|
log.info("Reading numeric data from %s", num_file)
|
|
nums = _read_tsv(num_file) if num_file and num_file.exists() else []
|
|
|
|
fee_data = defaultdict(dict)
|
|
perf_data = defaultdict(dict)
|
|
|
|
for row in tqdm(nums, desc="Parsing numeric data"):
|
|
adsh = row.get("adsh", "").strip()
|
|
tag = row.get("tag", "").strip()
|
|
value_str = row.get("value", "").strip()
|
|
|
|
if not adsh or not tag or not value_str:
|
|
continue
|
|
|
|
try:
|
|
value = float(value_str)
|
|
except (ValueError, TypeError):
|
|
continue
|
|
|
|
sub_info = sub_map.get(adsh, {})
|
|
cik = sub_info.get("cik", "")
|
|
key = (adsh, cik)
|
|
|
|
if tag in FEE_TAGS:
|
|
fee_data[key][FEE_TAGS[tag]] = value
|
|
fee_data[key]["_sub"] = sub_info
|
|
elif tag in PERFORMANCE_TAGS:
|
|
perf_data[key][PERFORMANCE_TAGS[tag]] = value
|
|
perf_data[key]["_sub"] = sub_info
|
|
|
|
log.info("Reading text data from %s", txt_file)
|
|
txts = _read_tsv(txt_file) if txt_file and txt_file.exists() else []
|
|
|
|
obj_data = defaultdict(dict)
|
|
for row in tqdm(txts, desc="Parsing text data"):
|
|
adsh = row.get("adsh", "").strip()
|
|
tag = row.get("tag", "").strip()
|
|
value = row.get("value", "").strip()
|
|
|
|
if not adsh or not tag or not value:
|
|
continue
|
|
|
|
sub_info = sub_map.get(adsh, {})
|
|
cik = sub_info.get("cik", "")
|
|
key = (adsh, cik)
|
|
|
|
if tag in PERFORMANCE_TEXT_TAGS:
|
|
perf_data[key][PERFORMANCE_TEXT_TAGS[tag]] = value
|
|
perf_data[key]["_sub"] = sub_info
|
|
elif tag in OBJECTIVE_TEXT_TAGS:
|
|
obj_data[key][OBJECTIVE_TEXT_TAGS[tag]] = value
|
|
obj_data[key]["_sub"] = sub_info
|
|
|
|
fee_count = 0
|
|
perf_count = 0
|
|
obj_count = 0
|
|
|
|
with db.conn() as c:
|
|
for (adsh, cik), vals in fee_data.items():
|
|
sub_info = vals.pop("_sub", {})
|
|
try:
|
|
c.execute("""
|
|
INSERT OR REPLACE INTO xbrl_fee
|
|
(accession_number, cik, filing_date, fund_name,
|
|
max_sales_charge_pct, max_deferred_charge_pct,
|
|
redemption_fee_pct, management_fee_pct,
|
|
dist_12b1_fee_pct, other_expenses_pct,
|
|
acquired_fund_fees_pct, total_expenses_pct,
|
|
fee_waiver_pct, net_expenses_pct,
|
|
expense_example_1yr, expense_example_3yr,
|
|
expense_example_5yr, expense_example_10yr)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
adsh, cik, sub_info.get("filing_date", ""),
|
|
sub_info.get("name", ""),
|
|
vals.get("max_sales_charge_pct"),
|
|
vals.get("max_deferred_charge_pct"),
|
|
vals.get("redemption_fee_pct"),
|
|
vals.get("management_fee_pct"),
|
|
vals.get("dist_12b1_fee_pct"),
|
|
vals.get("other_expenses_pct"),
|
|
vals.get("acquired_fund_fees_pct"),
|
|
vals.get("total_expenses_pct"),
|
|
vals.get("fee_waiver_pct"),
|
|
vals.get("net_expenses_pct"),
|
|
vals.get("expense_example_1yr"),
|
|
vals.get("expense_example_3yr"),
|
|
vals.get("expense_example_5yr"),
|
|
vals.get("expense_example_10yr"),
|
|
))
|
|
fee_count += 1
|
|
except Exception as e:
|
|
log.debug("Fee insert error for %s: %s", adsh, e)
|
|
|
|
for (adsh, cik), vals in perf_data.items():
|
|
sub_info = vals.pop("_sub", {})
|
|
try:
|
|
c.execute("""
|
|
INSERT OR REPLACE INTO xbrl_performance
|
|
(accession_number, cik, filing_date, fund_name,
|
|
inception_date,
|
|
return_year_1, return_year_5, return_year_10,
|
|
return_since_incep,
|
|
best_quarter_return, best_quarter_label,
|
|
worst_quarter_return, worst_quarter_label,
|
|
portfolio_turnover)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
adsh, cik, sub_info.get("filing_date", ""),
|
|
sub_info.get("name", ""),
|
|
vals.get("inception_date"),
|
|
vals.get("return_year_1"),
|
|
vals.get("return_year_5"),
|
|
vals.get("return_year_10"),
|
|
vals.get("return_since_incep"),
|
|
vals.get("best_quarter_return"),
|
|
vals.get("best_quarter_label"),
|
|
vals.get("worst_quarter_return"),
|
|
vals.get("worst_quarter_label"),
|
|
vals.get("portfolio_turnover"),
|
|
))
|
|
perf_count += 1
|
|
except Exception as e:
|
|
log.debug("Performance insert error for %s: %s", adsh, e)
|
|
|
|
for (adsh, cik), vals in obj_data.items():
|
|
sub_info = vals.pop("_sub", {})
|
|
try:
|
|
c.execute("""
|
|
INSERT OR REPLACE INTO xbrl_objective
|
|
(accession_number, cik, filing_date, fund_name,
|
|
objective_text, strategy_text, risk_text)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""", (
|
|
adsh, cik, sub_info.get("filing_date", ""),
|
|
sub_info.get("name", ""),
|
|
vals.get("objective_text"),
|
|
vals.get("strategy_text"),
|
|
vals.get("risk_text"),
|
|
))
|
|
obj_count += 1
|
|
except Exception as e:
|
|
log.debug("Objective insert error for %s: %s", adsh, e)
|
|
|
|
log.info("Loaded %d fee records, %d performance records, %d objective records",
|
|
fee_count, perf_count, obj_count)
|
|
db.record_bulk_download("xbrl_rr", quarter, str(extract_dir), fee_count + perf_count + obj_count)
|
|
return fee_count, perf_count, obj_count
|
|
|
|
|
|
def main():
|
|
import argparse
|
|
logging.basicConfig(level=logging.INFO,
|
|
format="%(asctime)s [%(levelname)s] %(message)s")
|
|
|
|
parser = argparse.ArgumentParser(description="Load XBRL Risk/Return data")
|
|
parser.add_argument("--db", default="fund_data.db", help="Database path")
|
|
parser.add_argument("--quarters", nargs="+",
|
|
default=["2025q1", "2025q2", "2025q3", "2025q4"],
|
|
help="Quarters to download (e.g. 2025q1 2025q2)")
|
|
parser.add_argument("--data-dir", default="data/xbrl_rr",
|
|
help="Directory for downloaded files")
|
|
args = parser.parse_args()
|
|
|
|
db = FundDatabase(args.db)
|
|
|
|
total_fees = 0
|
|
total_perf = 0
|
|
total_obj = 0
|
|
|
|
for quarter in args.quarters:
|
|
print(f"\n{'='*60}")
|
|
print(f"Processing {quarter}")
|
|
print(f"{'='*60}")
|
|
try:
|
|
extract_dir = download_xbrl_rr_zip(quarter, args.data_dir)
|
|
f, p, o = parse_and_load_xbrl_rr(db, extract_dir, quarter)
|
|
total_fees += f
|
|
total_perf += p
|
|
total_obj += o
|
|
except Exception as e:
|
|
log.error("Failed to process %s: %s", quarter, e)
|
|
|
|
print(f"\nTotal loaded: {total_fees} fee records, {total_perf} performance, {total_obj} objectives")
|
|
|
|
stats = db.get_stats()
|
|
print(f"\nDatabase stats:")
|
|
for table, count in stats.items():
|
|
if count > 0:
|
|
print(f" {table:30s} {count:>10,}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|