#!/usr/bin/env python3 """ SEC Fund Data Pipeline — Main Orchestrator Builds a database of 10,000+ SEC-registered funds with: 1. Fund universe (trusts, series, share classes) from Series/Class CSV 2. Prospectus filings and text from EDGAR 3. Structured fee/performance data from XBRL Risk/Return 4. Portfolio holdings from N-PORT 5. Service provider and classification data from N-CEN Usage: python pipeline.py status # Show database stats python pipeline.py universe # Load fund universe (step 1) python pipeline.py filings --limit 100 # Fetch filings (step 2) python pipeline.py xbrl-rr # Load XBRL fee data (step 3) python pipeline.py nport # Load N-PORT data (step 4) python pipeline.py ncen # Load N-CEN data (step 5) python pipeline.py all # Run all steps python pipeline.py search "Vanguard" # Search the database python pipeline.py export # Export summary CSV """ import argparse import csv import logging import sys from pathlib import Path from fund_db import FundDatabase log = logging.getLogger(__name__) def cmd_status(db: FundDatabase, args): """Show database statistics.""" stats = db.get_stats() print("\n" + "=" * 65) print(" SEC Fund Data Pipeline — Database Status") print("=" * 65) sections = [ ("Fund Universe", [("trust", "Trusts (investment companies)"), ("series", "Series (funds)"), ("share_class", "Share Classes")]), ("Filings", [("filing", "Filing records"), ("filing_text", "Filing texts (plain)"), ("filing_html", "Filing HTML (raw)")]), ("XBRL Risk/Return", [("xbrl_fee", "Fee records"), ("xbrl_performance", "Performance records"), ("xbrl_objective", "Objective/strategy texts")]), ("N-PORT", [("nport_fund_info", "Fund snapshots"), ("nport_holding", "Portfolio holdings"), ("nport_monthly_return", "Monthly returns")]), ("N-CEN", [("ncen_fund_info", "Fund classification records"), ("ncen_service_provider", "Service provider records")]), ] for section_name, tables in sections: print(f"\n {section_name}:") for table, label in tables: count = stats.get(table, 0) bar = "█" * min(count // 1000, 40) print(f" {label:40s} {count:>10,} {bar}") with db.conn() as c: row = c.execute(""" SELECT COUNT(DISTINCT cik) as n FROM filing WHERE form_type IN ('485BPOS','485APOS','N-1A') """).fetchone() print(f"\n Trusts with prospectus filings: {row['n']:>10,}") row = c.execute(""" SELECT COUNT(DISTINCT cik) as n FROM xbrl_fee """).fetchone() print(f" Trusts with XBRL fee data: {row['n']:>10,}") row = c.execute(""" SELECT COUNT(DISTINCT cik) as n FROM nport_fund_info """).fetchone() print(f" Trusts with N-PORT data: {row['n']:>10,}") row = c.execute("SELECT COUNT(*) as n FROM pipeline_status WHERE status='error'").fetchone() if row["n"] > 0: print(f"\n Pipeline errors: {row['n']:>10,}") print() def cmd_universe(db: FundDatabase, args): """Load fund universe from SEC Series/Class CSV.""" from fetch_universe import load_series_class_csv, enrich_from_submissions_api t, s, c = load_series_class_csv(db) print(f"\nUniverse: {t:,} trusts, {s:,} series, {c:,} share classes") if args.enrich: n = enrich_from_submissions_api(db, limit=args.enrich_limit) print(f"Enriched {n:,} trusts from Submissions API") def cmd_filings(db: FundDatabase, args): """Fetch filings for the fund universe.""" from fetch_filings import fetch_filings_for_universe, download_pending_texts, backfill_html if getattr(args, "backfill_html", False): backfill_html(db, limit=args.limit) return if args.text_only: download_pending_texts(db, limit=args.limit) else: ciks = [c.zfill(10) for c in args.ciks] if args.ciks else None fetch_filings_for_universe( db, ciks=ciks, limit=args.limit, download_text=not args.no_text, max_prospectus=args.max_prospectus, max_supplements=args.max_supplements, ) def cmd_xbrl_rr(db: FundDatabase, args): """Load XBRL Risk/Return structured data.""" from load_xbrl_rr import download_xbrl_rr_zip, parse_and_load_xbrl_rr for quarter in args.quarters: print(f"\nProcessing XBRL Risk/Return {quarter}...") try: extract_dir = download_xbrl_rr_zip(quarter, args.data_dir) parse_and_load_xbrl_rr(db, extract_dir, quarter) except Exception as e: log.error("Failed %s: %s", quarter, e) print(f" ERROR: {e}") def cmd_nport(db: FundDatabase, args): """Load N-PORT data.""" from load_nport import (download_nport_zip, load_fund_reported_info, load_monthly_returns, load_holdings) cik_filter = None if args.filter_ciks: cik_filter = set(db.get_all_ciks()) for quarter in args.quarters: print(f"\nProcessing N-PORT {quarter}...") try: extract_dir = download_nport_zip(quarter, args.data_dir) load_fund_reported_info(db, extract_dir, quarter) load_monthly_returns(db, extract_dir, quarter) if not args.skip_holdings: load_holdings(db, extract_dir, quarter, cik_filter=cik_filter) except Exception as e: log.error("Failed %s: %s", quarter, e) print(f" ERROR: {e}") def cmd_ncen(db: FundDatabase, args): """Load N-CEN data.""" from load_ncen import (download_ncen_zip, load_etf_data, load_index_data, load_service_providers) for quarter in args.quarters: print(f"\nProcessing N-CEN {quarter}...") try: extract_dir = download_ncen_zip(quarter, args.data_dir) load_etf_data(db, extract_dir, quarter) load_index_data(db, extract_dir, quarter) load_service_providers(db, extract_dir, quarter) except Exception as e: log.error("Failed %s: %s", quarter, e) print(f" ERROR: {e}") def cmd_all(db: FundDatabase, args): """Run all pipeline steps.""" print("\n" + "=" * 65) print(" Step 1: Loading fund universe") print("=" * 65) cmd_universe(db, args) print("\n" + "=" * 65) print(" Step 2: Fetching filings") print("=" * 65) cmd_filings(db, args) print("\n" + "=" * 65) print(" Step 3: Loading XBRL Risk/Return data") print("=" * 65) cmd_xbrl_rr(db, args) print("\n" + "=" * 65) print(" Step 4: Loading N-PORT data") print("=" * 65) cmd_nport(db, args) print("\n" + "=" * 65) print(" Step 5: Loading N-CEN data") print("=" * 65) cmd_ncen(db, args) print("\n" + "=" * 65) print(" Pipeline complete!") print("=" * 65) cmd_status(db, args) def cmd_search(db: FundDatabase, args): """Search the database for funds.""" query = " ".join(args.query) if args.query else "" if not query: print("Usage: pipeline.py search ") return results = db.search_funds(query, limit=args.limit) if not results: print(f"No results for '{query}'") return print(f"\nSearch results for '{query}' ({len(results)} matches):\n") print(f"{'CIK':>12s} {'Ticker':>8s} {'Series Name':50s} {'Class':20s}") print("-" * 95) for r in results: print(f"{r['cik']:>12s} {(r['ticker'] or ''):>8s} " f"{(r['series_name'] or '')[:50]:50s} {(r['class_name'] or '')[:20]:20s}") def cmd_export(db: FundDatabase, args): """Export a summary CSV of all funds with available data.""" output_path = args.output_csv or "fund_summary.csv" with db.conn() as c: rows = c.execute(""" SELECT t.cik, t.trust_name, t.file_number, t.state_of_inc, t.fiscal_year_end, s.series_id, s.series_name, sc.class_id, sc.class_name, sc.ticker, (SELECT COUNT(*) FROM filing f WHERE f.cik = t.cik AND f.form_type IN ('485BPOS','485APOS','N-1A')) as prospectus_count, (SELECT COUNT(*) FROM filing_text ft JOIN filing f2 ON ft.accession_number = f2.accession_number WHERE f2.cik = t.cik) as texts_downloaded, (SELECT xf.net_expenses_pct FROM xbrl_fee xf WHERE xf.cik = t.cik ORDER BY xf.filing_date DESC LIMIT 1) as latest_expense_ratio, (SELECT xf.management_fee_pct FROM xbrl_fee xf WHERE xf.cik = t.cik ORDER BY xf.filing_date DESC LIMIT 1) as latest_mgmt_fee, (SELECT ni.net_assets FROM nport_fund_info ni WHERE ni.cik = t.cik ORDER BY ni.report_date DESC LIMIT 1) as latest_net_assets, (SELECT GROUP_CONCAT(DISTINCT nsp.provider_name) FROM ncen_service_provider nsp WHERE nsp.cik = t.cik AND nsp.provider_role = 'custodian') as custodians, (SELECT GROUP_CONCAT(DISTINCT nsp.provider_name) FROM ncen_service_provider nsp WHERE nsp.cik = t.cik AND nsp.provider_role = 'adviser') as advisers FROM share_class sc JOIN series s ON sc.series_id = s.series_id JOIN trust t ON s.cik = t.cik ORDER BY t.cik, s.series_id, sc.class_id """).fetchall() if not rows: print("No data to export. Run the pipeline first.") return fieldnames = rows[0].keys() with open(output_path, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter(f, fieldnames=fieldnames) writer.writeheader() for row in rows: writer.writerow(dict(row)) print(f"Exported {len(rows):,} records to {output_path}") def main(): parser = argparse.ArgumentParser( description="SEC Fund Data Pipeline", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=__doc__, ) parser.add_argument("--db", default="fund_data.db", help="Database path") parser.add_argument("-v", "--verbose", action="store_true") subparsers = parser.add_subparsers(dest="command", help="Pipeline command") # status subparsers.add_parser("status", help="Show database statistics") # universe p_univ = subparsers.add_parser("universe", help="Load fund universe from CSV") p_univ.add_argument("--enrich", action="store_true", help="Also enrich from Submissions API") p_univ.add_argument("--enrich-limit", type=int, default=0) # filings p_filings = subparsers.add_parser("filings", help="Fetch prospectus filings") p_filings.add_argument("--limit", type=int, default=0, help="Max CIKs to process") p_filings.add_argument("--ciks", nargs="*") p_filings.add_argument("--max-prospectus", type=int, default=3) p_filings.add_argument("--max-supplements", type=int, default=3) p_filings.add_argument("--no-text", action="store_true") p_filings.add_argument("--text-only", action="store_true") p_filings.add_argument("--backfill-html", action="store_true", help="Re-download raw HTML for filings missing it") # xbrl-rr p_xbrl = subparsers.add_parser("xbrl-rr", help="Load XBRL Risk/Return data") p_xbrl.add_argument("--quarters", nargs="+", default=["2025q1", "2025q2", "2025q3", "2025q4"]) p_xbrl.add_argument("--data-dir", default="data/xbrl_rr") # nport p_nport = subparsers.add_parser("nport", help="Load N-PORT data") p_nport.add_argument("--quarters", nargs="+", default=["2025q3"]) p_nport.add_argument("--data-dir", default="data/nport") p_nport.add_argument("--skip-holdings", action="store_true") p_nport.add_argument("--filter-ciks", action="store_true", help="Only load holdings for CIKs in DB") # ncen p_ncen = subparsers.add_parser("ncen", help="Load N-CEN data") p_ncen.add_argument("--quarters", nargs="+", default=["2025q3"]) p_ncen.add_argument("--data-dir", default="data/ncen") # all p_all = subparsers.add_parser("all", help="Run full pipeline") p_all.add_argument("--limit", type=int, default=100, help="Max CIKs for filing fetch (default 100)") p_all.add_argument("--enrich", action="store_true", default=False) p_all.add_argument("--enrich-limit", type=int, default=0) p_all.add_argument("--ciks", nargs="*") p_all.add_argument("--max-prospectus", type=int, default=3) p_all.add_argument("--max-supplements", type=int, default=3) p_all.add_argument("--no-text", action="store_true") p_all.add_argument("--text-only", action="store_true", default=False) p_all.add_argument("--quarters", nargs="+", default=["2025q3"]) p_all.add_argument("--data-dir", default="data") p_all.add_argument("--skip-holdings", action="store_true") p_all.add_argument("--filter-ciks", action="store_true", default=True) # search p_search = subparsers.add_parser("search", help="Search funds") p_search.add_argument("query", nargs="*") p_search.add_argument("--limit", type=int, default=50) # export p_export = subparsers.add_parser("export", help="Export summary CSV") p_export.add_argument("--output-csv", default="fund_summary.csv") args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO logging.basicConfig(level=log_level, format="%(asctime)s [%(levelname)s] %(message)s") if not args.command: parser.print_help() return db = FundDatabase(args.db) commands = { "status": cmd_status, "universe": cmd_universe, "filings": cmd_filings, "xbrl-rr": cmd_xbrl_rr, "nport": cmd_nport, "ncen": cmd_ncen, "all": cmd_all, "search": cmd_search, "export": cmd_export, } handler = commands.get(args.command) if handler: handler(db, args) else: parser.print_help() if __name__ == "__main__": main()