fund_rfid_data/fund_db.py
Florian Herzog 1993658fb2 Add SEC fund prospectus -> RDF triple dataset pipeline
Builds a relationship-rich finance dataset for text-to-RDF-triple extraction
from SEC fund disclosures, the dataset for the thesis 'Magical RDF Triples and
how to synthetize them'.

- build_rdf_dataset.py: gold (N-CEN graphs), fetch (EDGAR prospectus prose,
  all books per trust), samples (per-fund segmentation, marker + plain
  serializations), split (trust-level 80/10/10, no leakage)
- score_baseline.py: no-model string-match baseline + strong-model scorer
- dataset_description.{tex,pdf}: scientific description of the dataset
- data/rdf_poc/gold_graphs.jsonl: structured gold knowledge graph (2025Q3)
- Large prose/sample files and raw SEC downloads are gitignored (reproducible)

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-03 10:31:35 +02:00

485 lines
18 KiB
Python

"""
SQLite database for SEC fund universe.
Stores:
- Fund universe (trusts, series, share classes) from SEC Series/Class CSV
- Filing metadata and prospectus text from EDGAR
- Structured fee/performance data from XBRL Risk/Return
- Holdings snapshots from N-PORT
- Service provider data from N-CEN
"""
import logging
import sqlite3
from contextlib import contextmanager
from pathlib import Path
log = logging.getLogger(__name__)
SCHEMA = """
-- ===== Fund Universe =====
CREATE TABLE IF NOT EXISTS trust (
cik TEXT PRIMARY KEY,
trust_name TEXT NOT NULL,
file_number TEXT,
state_of_inc TEXT,
fiscal_year_end TEXT,
sic_code TEXT,
entity_type TEXT,
website TEXT,
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS series (
series_id TEXT PRIMARY KEY,
cik TEXT NOT NULL REFERENCES trust(cik),
series_name TEXT NOT NULL,
status TEXT DEFAULT 'active',
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS share_class (
class_id TEXT PRIMARY KEY,
series_id TEXT NOT NULL REFERENCES series(series_id),
cik TEXT NOT NULL REFERENCES trust(cik),
class_name TEXT,
ticker TEXT,
cusip TEXT,
isin TEXT,
inception_date TEXT,
status TEXT DEFAULT 'active',
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_series_cik ON series(cik);
CREATE INDEX IF NOT EXISTS idx_class_series ON share_class(series_id);
CREATE INDEX IF NOT EXISTS idx_class_cik ON share_class(cik);
CREATE INDEX IF NOT EXISTS idx_class_ticker ON share_class(ticker);
-- ===== Filings & Prospectus Text =====
CREATE TABLE IF NOT EXISTS filing (
accession_number TEXT PRIMARY KEY,
cik TEXT NOT NULL REFERENCES trust(cik),
form_type TEXT NOT NULL,
filing_date TEXT NOT NULL,
primary_document TEXT,
document_url TEXT,
description TEXT,
text_length INTEGER DEFAULT 0,
fetched_at TEXT
);
CREATE TABLE IF NOT EXISTS filing_text (
accession_number TEXT PRIMARY KEY REFERENCES filing(accession_number),
text_content TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS filing_html (
accession_number TEXT PRIMARY KEY REFERENCES filing(accession_number),
html_content TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_filing_cik ON filing(cik);
CREATE INDEX IF NOT EXISTS idx_filing_form ON filing(form_type);
CREATE INDEX IF NOT EXISTS idx_filing_date ON filing(filing_date);
-- ===== XBRL Risk/Return Structured Data =====
CREATE TABLE IF NOT EXISTS xbrl_fee (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT,
cik TEXT NOT NULL,
series_id TEXT,
class_id TEXT,
filing_date TEXT,
fund_name TEXT,
share_class_name TEXT,
max_sales_charge_pct REAL,
max_deferred_charge_pct REAL,
redemption_fee_pct REAL,
management_fee_pct REAL,
dist_12b1_fee_pct REAL,
other_expenses_pct REAL,
acquired_fund_fees_pct REAL,
total_expenses_pct REAL,
fee_waiver_pct REAL,
net_expenses_pct REAL,
expense_example_1yr REAL,
expense_example_3yr REAL,
expense_example_5yr REAL,
expense_example_10yr REAL,
UNIQUE(cik, series_id, class_id, filing_date)
);
CREATE TABLE IF NOT EXISTS xbrl_performance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT,
cik TEXT NOT NULL,
series_id TEXT,
class_id TEXT,
filing_date TEXT,
fund_name TEXT,
share_class_name TEXT,
inception_date TEXT,
return_year_1 REAL,
return_year_5 REAL,
return_year_10 REAL,
return_since_incep REAL,
best_quarter_return REAL,
best_quarter_label TEXT,
worst_quarter_return REAL,
worst_quarter_label TEXT,
portfolio_turnover REAL,
UNIQUE(cik, series_id, class_id, filing_date)
);
CREATE TABLE IF NOT EXISTS xbrl_objective (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT,
cik TEXT NOT NULL,
series_id TEXT,
filing_date TEXT,
fund_name TEXT,
objective_text TEXT,
strategy_text TEXT,
risk_text TEXT,
UNIQUE(cik, series_id, filing_date)
);
-- ===== N-PORT Holdings & Fund-Level Data =====
CREATE TABLE IF NOT EXISTS nport_fund_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT NOT NULL,
cik TEXT NOT NULL,
series_id TEXT,
report_date TEXT NOT NULL,
total_assets REAL,
total_liabilities REAL,
net_assets REAL,
borrowing_pay_within_1yr REAL,
borrowing_pay_after_1yr REAL,
controlled_affiliates_val REAL,
is_non_cash_collateral INTEGER,
credit_spread_risk_inv_grade_dv01 REAL,
credit_spread_risk_non_inv_dv01 REAL,
UNIQUE(accession_number)
);
CREATE TABLE IF NOT EXISTS nport_monthly_return (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT NOT NULL,
cik TEXT NOT NULL,
class_id TEXT,
report_date TEXT NOT NULL,
month1_return REAL,
month2_return REAL,
month3_return REAL,
UNIQUE(accession_number, class_id)
);
CREATE TABLE IF NOT EXISTS nport_holding (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT NOT NULL,
cik TEXT NOT NULL,
report_date TEXT NOT NULL,
holding_name TEXT,
lei TEXT,
cusip TEXT,
isin TEXT,
ticker TEXT,
asset_category TEXT,
issuer_category TEXT,
inv_country TEXT,
currency TEXT,
quantity REAL,
value_usd REAL,
pct_val REAL,
is_debt INTEGER DEFAULT 0,
coupon_rate REAL,
maturity_date TEXT,
is_default INTEGER,
fair_value_level TEXT
);
CREATE INDEX IF NOT EXISTS idx_holding_acc ON nport_holding(accession_number);
CREATE INDEX IF NOT EXISTS idx_holding_cik ON nport_holding(cik);
CREATE INDEX IF NOT EXISTS idx_holding_date ON nport_holding(report_date);
-- ===== N-CEN Service Provider & Classification =====
CREATE TABLE IF NOT EXISTS ncen_fund_info (
id INTEGER PRIMARY KEY AUTOINCREMENT,
accession_number TEXT NOT NULL,
cik TEXT NOT NULL,
series_id TEXT,
report_period TEXT,
is_etf INTEGER,
is_index_fund INTEGER,
index_name TEXT,
is_fund_of_funds INTEGER,
is_money_market INTEGER,
fund_type TEXT,
UNIQUE(accession_number, series_id)
);
CREATE TABLE IF NOT EXISTS ncen_service_provider (
id INTEGER PRIMARY KEY AUTOINCREMENT,
cik TEXT NOT NULL,
report_period TEXT,
provider_role TEXT NOT NULL,
provider_name TEXT NOT NULL,
provider_lei TEXT,
provider_country TEXT,
UNIQUE(cik, report_period, provider_role, provider_name)
);
-- ===== Pipeline Status Tracking =====
CREATE TABLE IF NOT EXISTS pipeline_status (
cik TEXT NOT NULL,
stage TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
started_at TEXT,
completed_at TEXT,
error_message TEXT,
items_processed INTEGER DEFAULT 0,
PRIMARY KEY (cik, stage)
);
CREATE TABLE IF NOT EXISTS bulk_download (
source TEXT PRIMARY KEY,
quarter TEXT,
downloaded_at TEXT,
file_path TEXT,
record_count INTEGER
);
"""
class FundDatabase:
"""SQLite database wrapper for SEC fund data."""
def __init__(self, db_path: str = "fund_data.db"):
self.db_path = Path(db_path)
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._init_db()
def _init_db(self):
with self.conn() as c:
c.executescript(SCHEMA)
c.execute("PRAGMA journal_mode=WAL")
c.execute("PRAGMA synchronous=NORMAL")
c.execute("PRAGMA cache_size=-64000") # 64MB cache
log.info("Database initialized at %s", self.db_path)
@contextmanager
def conn(self):
con = sqlite3.connect(str(self.db_path), timeout=30)
con.row_factory = sqlite3.Row
try:
yield con
con.commit()
except Exception:
con.rollback()
raise
finally:
con.close()
# ----- Trust / Series / Class -----
def upsert_trust(self, cik, trust_name, **kwargs):
cols = ["cik", "trust_name"] + list(kwargs.keys()) + ["updated_at"]
vals = [cik, trust_name] + list(kwargs.values()) + ["datetime('now')"]
placeholders = ", ".join("?" if c != "updated_at" else "datetime('now')" for c in cols)
set_clause = ", ".join(
f"{c}=excluded.{c}" for c in cols if c != "cik"
)
sql = f"""
INSERT INTO trust ({', '.join(cols)})
VALUES ({placeholders})
ON CONFLICT(cik) DO UPDATE SET {set_clause}
"""
actual_vals = [v for c, v in zip(cols, vals) if c != "updated_at"]
with self.conn() as c:
c.execute(sql, actual_vals)
def upsert_trust_simple(self, cik, trust_name, file_number="", state_of_inc="",
fiscal_year_end="", entity_type="", website=""):
with self.conn() as c:
c.execute("""
INSERT INTO trust (cik, trust_name, file_number, state_of_inc,
fiscal_year_end, entity_type, website)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(cik) DO UPDATE SET
trust_name=excluded.trust_name,
file_number=COALESCE(NULLIF(excluded.file_number,''), trust.file_number),
state_of_inc=COALESCE(NULLIF(excluded.state_of_inc,''), trust.state_of_inc),
fiscal_year_end=COALESCE(NULLIF(excluded.fiscal_year_end,''), trust.fiscal_year_end),
entity_type=COALESCE(NULLIF(excluded.entity_type,''), trust.entity_type),
website=COALESCE(NULLIF(excluded.website,''), trust.website),
updated_at=datetime('now')
""", (cik, trust_name, file_number, state_of_inc,
fiscal_year_end, entity_type, website))
def upsert_series(self, series_id, cik, series_name, status="active"):
with self.conn() as c:
c.execute("""
INSERT INTO series (series_id, cik, series_name, status)
VALUES (?, ?, ?, ?)
ON CONFLICT(series_id) DO UPDATE SET
series_name=excluded.series_name,
status=excluded.status,
updated_at=datetime('now')
""", (series_id, cik, series_name, status))
def upsert_share_class(self, class_id, series_id, cik, class_name="",
ticker="", cusip="", isin="", inception_date=""):
with self.conn() as c:
c.execute("""
INSERT INTO share_class (class_id, series_id, cik, class_name,
ticker, cusip, isin, inception_date)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(class_id) DO UPDATE SET
class_name=excluded.class_name,
ticker=COALESCE(NULLIF(excluded.ticker,''), share_class.ticker),
cusip=COALESCE(NULLIF(excluded.cusip,''), share_class.cusip),
isin=COALESCE(NULLIF(excluded.isin,''), share_class.isin),
inception_date=COALESCE(NULLIF(excluded.inception_date,''), share_class.inception_date),
updated_at=datetime('now')
""", (class_id, series_id, cik, class_name, ticker, cusip, isin, inception_date))
# ----- Filings -----
def insert_filing(self, accession_number, cik, form_type, filing_date,
primary_document="", document_url="", description=""):
with self.conn() as c:
c.execute("""
INSERT OR IGNORE INTO filing
(accession_number, cik, form_type, filing_date, primary_document,
document_url, description)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (accession_number, cik, form_type, filing_date,
primary_document, document_url, description))
def save_filing_text(self, accession_number, text_content, html_content=None):
with self.conn() as c:
c.execute("""
INSERT OR REPLACE INTO filing_text (accession_number, text_content)
VALUES (?, ?)
""", (accession_number, text_content))
if html_content:
c.execute("""
INSERT OR REPLACE INTO filing_html (accession_number, html_content)
VALUES (?, ?)
""", (accession_number, html_content))
c.execute("""
UPDATE filing SET text_length=?, fetched_at=datetime('now')
WHERE accession_number=?
""", (len(text_content), accession_number))
# ----- Pipeline status -----
def set_pipeline_status(self, cik, stage, status, error_message="", items_processed=0):
ts_col = "started_at" if status == "running" else "completed_at"
with self.conn() as c:
c.execute(f"""
INSERT INTO pipeline_status (cik, stage, status, {ts_col}, error_message, items_processed)
VALUES (?, ?, ?, datetime('now'), ?, ?)
ON CONFLICT(cik, stage) DO UPDATE SET
status=excluded.status,
{ts_col}=datetime('now'),
error_message=excluded.error_message,
items_processed=excluded.items_processed
""", (cik, stage, status, error_message, items_processed))
def get_pipeline_status(self, cik, stage):
with self.conn() as c:
row = c.execute(
"SELECT status FROM pipeline_status WHERE cik=? AND stage=?",
(cik, stage)
).fetchone()
return row["status"] if row else None
def get_pending_ciks(self, stage, limit=1000):
with self.conn() as c:
rows = c.execute("""
SELECT t.cik FROM trust t
LEFT JOIN pipeline_status ps ON t.cik = ps.cik AND ps.stage = ?
WHERE ps.status IS NULL OR ps.status = 'pending'
ORDER BY t.cik
LIMIT ?
""", (stage, limit)).fetchall()
return [r["cik"] for r in rows]
# ----- Bulk inserts -----
def bulk_insert_holdings(self, holdings):
"""Insert a batch of N-PORT holdings. Each is a dict."""
if not holdings:
return
cols = [
"accession_number", "cik", "report_date", "holding_name", "lei",
"cusip", "isin", "ticker", "asset_category", "issuer_category",
"inv_country", "currency", "quantity", "value_usd", "pct_val",
"is_debt", "coupon_rate", "maturity_date", "is_default", "fair_value_level"
]
placeholders = ", ".join(["?"] * len(cols))
sql = f"INSERT OR IGNORE INTO nport_holding ({', '.join(cols)}) VALUES ({placeholders})"
rows = [tuple(h.get(c) for c in cols) for h in holdings]
with self.conn() as c:
c.executemany(sql, rows)
def record_bulk_download(self, source, quarter, file_path, record_count):
with self.conn() as c:
c.execute("""
INSERT OR REPLACE INTO bulk_download
(source, quarter, downloaded_at, file_path, record_count)
VALUES (?, ?, datetime('now'), ?, ?)
""", (source, quarter, file_path, record_count))
# ----- Queries -----
def get_stats(self):
with self.conn() as c:
stats = {}
for table in ["trust", "series", "share_class", "filing",
"filing_text", "filing_html", "xbrl_fee",
"xbrl_performance", "nport_fund_info",
"nport_holding", "ncen_fund_info",
"ncen_service_provider"]:
row = c.execute(f"SELECT COUNT(*) as cnt FROM {table}").fetchone()
stats[table] = row["cnt"]
return stats
def get_ciks_with_series(self):
"""Return all CIKs that have at least one series (i.e. active fund trusts)."""
with self.conn() as c:
rows = c.execute("""
SELECT DISTINCT t.cik, t.trust_name, COUNT(s.series_id) as num_series
FROM trust t JOIN series s ON t.cik = s.cik
GROUP BY t.cik
ORDER BY num_series DESC
""").fetchall()
return [dict(r) for r in rows]
def get_all_ciks(self):
with self.conn() as c:
rows = c.execute("SELECT cik FROM trust ORDER BY cik").fetchall()
return [r["cik"] for r in rows]
def search_funds(self, query, limit=50):
with self.conn() as c:
rows = c.execute("""
SELECT t.cik, t.trust_name, s.series_id, s.series_name,
sc.class_id, sc.class_name, sc.ticker
FROM share_class sc
JOIN series s ON sc.series_id = s.series_id
JOIN trust t ON s.cik = t.cik
WHERE sc.ticker LIKE ? OR s.series_name LIKE ? OR t.trust_name LIKE ?
LIMIT ?
""", (f"%{query}%", f"%{query}%", f"%{query}%", limit)).fetchall()
return [dict(r) for r in rows]