AISE501_CLASS/AST Files/sample_stats.py
2026-05-03 20:27:09 +02:00

296 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
sample_stats.py Statistical Analysis Module
==============================================
This module provides classes and functions for loading, cleaning,
analysing, and visualising numerical data. It is used as the target
program for the AST-analysis exercises in AISE501.
Dependencies: numpy, scipy
"""
import os
import csv
import json
from typing import List, Dict, Optional, Tuple
import numpy as np
from scipy import stats
from scipy.optimize import curve_fit
# ── Helper functions ────────────────────────────────────────────────────────
def load_csv(filepath: str) -> List[List[str]]:
"""Read a CSV file and return its rows as lists of strings."""
rows = []
with open(filepath, newline="") as fh:
reader = csv.reader(fh)
for row in reader:
rows.append(row)
return rows
def save_json(data: dict, filepath: str) -> None:
"""Serialise *data* to a JSON file."""
with open(filepath, "w") as fh:
json.dump(data, fh, indent=2)
def validate_data(values: List[float]) -> bool:
"""Return True if all values are finite numbers."""
return all(np.isfinite(v) for v in values)
# ── Data cleaning ───────────────────────────────────────────────────────────
class DataCleaner:
"""Handles missing-value imputation and outlier removal."""
def __init__(self, raw_data: List[float]):
self.raw_data = raw_data
self.cleaned: Optional[np.ndarray] = None
def remove_nans(self) -> np.ndarray:
"""Drop NaN entries from the data."""
arr = np.array(self.raw_data, dtype=float)
self.cleaned = arr[~np.isnan(arr)]
return self.cleaned
def impute_mean(self) -> np.ndarray:
"""Replace NaN entries with the column mean."""
arr = np.array(self.raw_data, dtype=float)
mean_val = np.nanmean(arr)
arr[np.isnan(arr)] = mean_val
self.cleaned = arr
return self.cleaned
def remove_outliers(self, z_threshold: float = 3.0) -> np.ndarray:
"""Remove values whose z-score exceeds *z_threshold*."""
if self.cleaned is None:
self.remove_nans()
z_scores = np.abs(stats.zscore(self.cleaned))
self.cleaned = self.cleaned[z_scores < z_threshold]
return self.cleaned
def get_summary(self) -> Dict[str, float]:
"""Return a summary dict of the cleaned data."""
data = self.cleaned if self.cleaned is not None else np.array(self.raw_data)
return {
"count": len(data),
"mean": float(np.mean(data)),
"std": float(np.std(data, ddof=1)),
"min": float(np.min(data)),
"max": float(np.max(data)),
}
# ── Descriptive statistics ──────────────────────────────────────────────────
class DescriptiveStats:
"""Compute descriptive statistics on cleaned data."""
def __init__(self, data: np.ndarray):
self.data = data
def mean(self) -> float:
return float(np.mean(self.data))
def median(self) -> float:
return float(np.median(self.data))
def variance(self) -> float:
return float(np.var(self.data, ddof=1))
def std_dev(self) -> float:
return float(np.std(self.data, ddof=1))
def skewness(self) -> float:
return float(stats.skew(self.data))
def kurtosis(self) -> float:
return float(stats.kurtosis(self.data))
def percentile(self, q: float) -> float:
return float(np.percentile(self.data, q))
def full_report(self) -> Dict[str, float]:
"""Build a complete descriptive-statistics report."""
return {
"mean": self.mean(),
"median": self.median(),
"variance": self.variance(),
"std_dev": self.std_dev(),
"skewness": self.skewness(),
"kurtosis": self.kurtosis(),
"q25": self.percentile(25),
"q75": self.percentile(75),
}
# ── Hypothesis testing ──────────────────────────────────────────────────────
class HypothesisTester:
"""Perform common hypothesis tests."""
def __init__(self, sample_a: np.ndarray, sample_b: Optional[np.ndarray] = None):
self.sample_a = sample_a
self.sample_b = sample_b
def t_test_one_sample(self, pop_mean: float = 0.0) -> Tuple[float, float]:
"""One-sample t-test against *pop_mean*."""
stat, p_value = stats.ttest_1samp(self.sample_a, pop_mean)
return float(stat), float(p_value)
def t_test_two_sample(self) -> Tuple[float, float]:
"""Independent two-sample t-test (equal variance assumed)."""
if self.sample_b is None:
raise ValueError("sample_b is required for a two-sample test.")
stat, p_value = stats.ttest_ind(self.sample_a, self.sample_b)
return float(stat), float(p_value)
def chi_squared_test(self, observed: np.ndarray,
expected: np.ndarray) -> Tuple[float, float]:
"""Chi-squared goodness-of-fit test."""
stat, p_value = stats.chisquare(observed, f_exp=expected)
return float(stat), float(p_value)
def normality_test(self) -> Tuple[float, float]:
"""ShapiroWilk test for normality on sample_a."""
stat, p_value = stats.shapiro(self.sample_a)
return float(stat), float(p_value)
def interpret_result(self, p_value: float, alpha: float = 0.05) -> str:
"""Return a plain-English interpretation of *p_value*."""
if p_value < alpha:
return f"Reject H0 (p={p_value:.4f} < alpha={alpha})"
return f"Fail to reject H0 (p={p_value:.4f} >= alpha={alpha})"
# ── Curve fitting ───────────────────────────────────────────────────────────
class CurveFitter:
"""Fit parametric models to (x, y) data."""
def __init__(self, x: np.ndarray, y: np.ndarray):
self.x = x
self.y = y
self.params: Optional[np.ndarray] = None
self.covariance: Optional[np.ndarray] = None
@staticmethod
def linear_model(x, a, b):
return a * x + b
@staticmethod
def quadratic_model(x, a, b, c):
return a * x**2 + b * x + c
@staticmethod
def exponential_model(x, a, b):
return a * np.exp(b * x)
def fit(self, model_func=None) -> Tuple[np.ndarray, np.ndarray]:
"""Fit *model_func* (default: linear) via least squares."""
if model_func is None:
model_func = self.linear_model
self.params, self.covariance = curve_fit(model_func, self.x, self.y)
return self.params, self.covariance
def predict(self, x_new: np.ndarray, model_func=None) -> np.ndarray:
"""Predict y values for *x_new* using fitted parameters."""
if self.params is None:
raise ValueError("Call fit() before predict().")
if model_func is None:
model_func = self.linear_model
return model_func(x_new, *self.params)
def r_squared(self, model_func=None) -> float:
"""Coefficient of determination R^2."""
y_pred = self.predict(self.x, model_func)
ss_res = np.sum((self.y - y_pred) ** 2)
ss_tot = np.sum((self.y - np.mean(self.y)) ** 2)
return float(1 - ss_res / ss_tot)
# ── Report generation ──────────────────────────────────────────────────────
class ReportGenerator:
"""Compose a statistical report from the above components."""
def __init__(self, title: str):
self.title = title
self.sections: List[Dict] = []
def add_descriptive(self, desc: DescriptiveStats) -> None:
"""Append a descriptive-statistics section."""
self.sections.append({
"type": "descriptive",
"content": desc.full_report(),
})
def add_hypothesis(self, tester: HypothesisTester,
test_name: str, result: Tuple[float, float]) -> None:
"""Append a hypothesis-test section."""
stat, p = result
self.sections.append({
"type": "hypothesis",
"test": test_name,
"statistic": stat,
"p_value": p,
"interpretation": tester.interpret_result(p),
})
def add_curve_fit(self, fitter: CurveFitter, model_name: str) -> None:
"""Append a curve-fitting section."""
self.sections.append({
"type": "curve_fit",
"model": model_name,
"params": fitter.params.tolist() if fitter.params is not None else [],
"r_squared": fitter.r_squared() if fitter.params is not None else None,
})
def to_dict(self) -> dict:
"""Return the full report as a nested dictionary."""
return {"title": self.title, "sections": self.sections}
def save(self, filepath: str) -> None:
"""Write the report to a JSON file."""
save_json(self.to_dict(), filepath)
# ── Pipeline function ──────────────────────────────────────────────────────
def run_analysis_pipeline(raw_data: List[float],
title: str = "Analysis Report") -> dict:
"""End-to-end pipeline: clean -> describe -> test -> report."""
# Step 1: clean
cleaner = DataCleaner(raw_data)
cleaner.remove_nans()
cleaner.remove_outliers()
# Step 2: descriptive statistics
desc = DescriptiveStats(cleaner.cleaned)
report_data = desc.full_report()
# Step 3: normality test
tester = HypothesisTester(cleaner.cleaned)
norm_stat, norm_p = tester.normality_test()
# Step 4: assemble report
report = ReportGenerator(title)
report.add_descriptive(desc)
report.add_hypothesis(tester, "Shapiro-Wilk", (norm_stat, norm_p))
return report.to_dict()
# ── Main ────────────────────────────────────────────────────────────────────
if __name__ == "__main__":
np.random.seed(42)
sample = list(np.random.normal(loc=50, scale=10, size=200))
sample += [float("nan"), float("nan"), 999.0] # inject NaN + outlier
result = run_analysis_pipeline(sample, title="Demo Report")
print(json.dumps(result, indent=2))