296 lines
11 KiB
Python
296 lines
11 KiB
Python
"""
|
||
sample_stats.py – Statistical Analysis Module
|
||
==============================================
|
||
This module provides classes and functions for loading, cleaning,
|
||
analysing, and visualising numerical data. It is used as the target
|
||
program for the AST-analysis exercises in AISE501.
|
||
|
||
Dependencies: numpy, scipy
|
||
"""
|
||
|
||
import os
|
||
import csv
|
||
import json
|
||
from typing import List, Dict, Optional, Tuple
|
||
|
||
import numpy as np
|
||
from scipy import stats
|
||
from scipy.optimize import curve_fit
|
||
|
||
|
||
# ── Helper functions ────────────────────────────────────────────────────────
|
||
|
||
def load_csv(filepath: str) -> List[List[str]]:
|
||
"""Read a CSV file and return its rows as lists of strings."""
|
||
rows = []
|
||
with open(filepath, newline="") as fh:
|
||
reader = csv.reader(fh)
|
||
for row in reader:
|
||
rows.append(row)
|
||
return rows
|
||
|
||
|
||
def save_json(data: dict, filepath: str) -> None:
|
||
"""Serialise *data* to a JSON file."""
|
||
with open(filepath, "w") as fh:
|
||
json.dump(data, fh, indent=2)
|
||
|
||
|
||
def validate_data(values: List[float]) -> bool:
|
||
"""Return True if all values are finite numbers."""
|
||
return all(np.isfinite(v) for v in values)
|
||
|
||
|
||
# ── Data cleaning ───────────────────────────────────────────────────────────
|
||
|
||
class DataCleaner:
|
||
"""Handles missing-value imputation and outlier removal."""
|
||
|
||
def __init__(self, raw_data: List[float]):
|
||
self.raw_data = raw_data
|
||
self.cleaned: Optional[np.ndarray] = None
|
||
|
||
def remove_nans(self) -> np.ndarray:
|
||
"""Drop NaN entries from the data."""
|
||
arr = np.array(self.raw_data, dtype=float)
|
||
self.cleaned = arr[~np.isnan(arr)]
|
||
return self.cleaned
|
||
|
||
def impute_mean(self) -> np.ndarray:
|
||
"""Replace NaN entries with the column mean."""
|
||
arr = np.array(self.raw_data, dtype=float)
|
||
mean_val = np.nanmean(arr)
|
||
arr[np.isnan(arr)] = mean_val
|
||
self.cleaned = arr
|
||
return self.cleaned
|
||
|
||
def remove_outliers(self, z_threshold: float = 3.0) -> np.ndarray:
|
||
"""Remove values whose z-score exceeds *z_threshold*."""
|
||
if self.cleaned is None:
|
||
self.remove_nans()
|
||
z_scores = np.abs(stats.zscore(self.cleaned))
|
||
self.cleaned = self.cleaned[z_scores < z_threshold]
|
||
return self.cleaned
|
||
|
||
def get_summary(self) -> Dict[str, float]:
|
||
"""Return a summary dict of the cleaned data."""
|
||
data = self.cleaned if self.cleaned is not None else np.array(self.raw_data)
|
||
return {
|
||
"count": len(data),
|
||
"mean": float(np.mean(data)),
|
||
"std": float(np.std(data, ddof=1)),
|
||
"min": float(np.min(data)),
|
||
"max": float(np.max(data)),
|
||
}
|
||
|
||
|
||
# ── Descriptive statistics ──────────────────────────────────────────────────
|
||
|
||
class DescriptiveStats:
|
||
"""Compute descriptive statistics on cleaned data."""
|
||
|
||
def __init__(self, data: np.ndarray):
|
||
self.data = data
|
||
|
||
def mean(self) -> float:
|
||
return float(np.mean(self.data))
|
||
|
||
def median(self) -> float:
|
||
return float(np.median(self.data))
|
||
|
||
def variance(self) -> float:
|
||
return float(np.var(self.data, ddof=1))
|
||
|
||
def std_dev(self) -> float:
|
||
return float(np.std(self.data, ddof=1))
|
||
|
||
def skewness(self) -> float:
|
||
return float(stats.skew(self.data))
|
||
|
||
def kurtosis(self) -> float:
|
||
return float(stats.kurtosis(self.data))
|
||
|
||
def percentile(self, q: float) -> float:
|
||
return float(np.percentile(self.data, q))
|
||
|
||
def full_report(self) -> Dict[str, float]:
|
||
"""Build a complete descriptive-statistics report."""
|
||
return {
|
||
"mean": self.mean(),
|
||
"median": self.median(),
|
||
"variance": self.variance(),
|
||
"std_dev": self.std_dev(),
|
||
"skewness": self.skewness(),
|
||
"kurtosis": self.kurtosis(),
|
||
"q25": self.percentile(25),
|
||
"q75": self.percentile(75),
|
||
}
|
||
|
||
|
||
# ── Hypothesis testing ──────────────────────────────────────────────────────
|
||
|
||
class HypothesisTester:
|
||
"""Perform common hypothesis tests."""
|
||
|
||
def __init__(self, sample_a: np.ndarray, sample_b: Optional[np.ndarray] = None):
|
||
self.sample_a = sample_a
|
||
self.sample_b = sample_b
|
||
|
||
def t_test_one_sample(self, pop_mean: float = 0.0) -> Tuple[float, float]:
|
||
"""One-sample t-test against *pop_mean*."""
|
||
stat, p_value = stats.ttest_1samp(self.sample_a, pop_mean)
|
||
return float(stat), float(p_value)
|
||
|
||
def t_test_two_sample(self) -> Tuple[float, float]:
|
||
"""Independent two-sample t-test (equal variance assumed)."""
|
||
if self.sample_b is None:
|
||
raise ValueError("sample_b is required for a two-sample test.")
|
||
stat, p_value = stats.ttest_ind(self.sample_a, self.sample_b)
|
||
return float(stat), float(p_value)
|
||
|
||
def chi_squared_test(self, observed: np.ndarray,
|
||
expected: np.ndarray) -> Tuple[float, float]:
|
||
"""Chi-squared goodness-of-fit test."""
|
||
stat, p_value = stats.chisquare(observed, f_exp=expected)
|
||
return float(stat), float(p_value)
|
||
|
||
def normality_test(self) -> Tuple[float, float]:
|
||
"""Shapiro–Wilk test for normality on sample_a."""
|
||
stat, p_value = stats.shapiro(self.sample_a)
|
||
return float(stat), float(p_value)
|
||
|
||
def interpret_result(self, p_value: float, alpha: float = 0.05) -> str:
|
||
"""Return a plain-English interpretation of *p_value*."""
|
||
if p_value < alpha:
|
||
return f"Reject H0 (p={p_value:.4f} < alpha={alpha})"
|
||
return f"Fail to reject H0 (p={p_value:.4f} >= alpha={alpha})"
|
||
|
||
|
||
# ── Curve fitting ───────────────────────────────────────────────────────────
|
||
|
||
class CurveFitter:
|
||
"""Fit parametric models to (x, y) data."""
|
||
|
||
def __init__(self, x: np.ndarray, y: np.ndarray):
|
||
self.x = x
|
||
self.y = y
|
||
self.params: Optional[np.ndarray] = None
|
||
self.covariance: Optional[np.ndarray] = None
|
||
|
||
@staticmethod
|
||
def linear_model(x, a, b):
|
||
return a * x + b
|
||
|
||
@staticmethod
|
||
def quadratic_model(x, a, b, c):
|
||
return a * x**2 + b * x + c
|
||
|
||
@staticmethod
|
||
def exponential_model(x, a, b):
|
||
return a * np.exp(b * x)
|
||
|
||
def fit(self, model_func=None) -> Tuple[np.ndarray, np.ndarray]:
|
||
"""Fit *model_func* (default: linear) via least squares."""
|
||
if model_func is None:
|
||
model_func = self.linear_model
|
||
self.params, self.covariance = curve_fit(model_func, self.x, self.y)
|
||
return self.params, self.covariance
|
||
|
||
def predict(self, x_new: np.ndarray, model_func=None) -> np.ndarray:
|
||
"""Predict y values for *x_new* using fitted parameters."""
|
||
if self.params is None:
|
||
raise ValueError("Call fit() before predict().")
|
||
if model_func is None:
|
||
model_func = self.linear_model
|
||
return model_func(x_new, *self.params)
|
||
|
||
def r_squared(self, model_func=None) -> float:
|
||
"""Coefficient of determination R^2."""
|
||
y_pred = self.predict(self.x, model_func)
|
||
ss_res = np.sum((self.y - y_pred) ** 2)
|
||
ss_tot = np.sum((self.y - np.mean(self.y)) ** 2)
|
||
return float(1 - ss_res / ss_tot)
|
||
|
||
|
||
# ── Report generation ──────────────────────────────────────────────────────
|
||
|
||
class ReportGenerator:
|
||
"""Compose a statistical report from the above components."""
|
||
|
||
def __init__(self, title: str):
|
||
self.title = title
|
||
self.sections: List[Dict] = []
|
||
|
||
def add_descriptive(self, desc: DescriptiveStats) -> None:
|
||
"""Append a descriptive-statistics section."""
|
||
self.sections.append({
|
||
"type": "descriptive",
|
||
"content": desc.full_report(),
|
||
})
|
||
|
||
def add_hypothesis(self, tester: HypothesisTester,
|
||
test_name: str, result: Tuple[float, float]) -> None:
|
||
"""Append a hypothesis-test section."""
|
||
stat, p = result
|
||
self.sections.append({
|
||
"type": "hypothesis",
|
||
"test": test_name,
|
||
"statistic": stat,
|
||
"p_value": p,
|
||
"interpretation": tester.interpret_result(p),
|
||
})
|
||
|
||
def add_curve_fit(self, fitter: CurveFitter, model_name: str) -> None:
|
||
"""Append a curve-fitting section."""
|
||
self.sections.append({
|
||
"type": "curve_fit",
|
||
"model": model_name,
|
||
"params": fitter.params.tolist() if fitter.params is not None else [],
|
||
"r_squared": fitter.r_squared() if fitter.params is not None else None,
|
||
})
|
||
|
||
def to_dict(self) -> dict:
|
||
"""Return the full report as a nested dictionary."""
|
||
return {"title": self.title, "sections": self.sections}
|
||
|
||
def save(self, filepath: str) -> None:
|
||
"""Write the report to a JSON file."""
|
||
save_json(self.to_dict(), filepath)
|
||
|
||
|
||
# ── Pipeline function ──────────────────────────────────────────────────────
|
||
|
||
def run_analysis_pipeline(raw_data: List[float],
|
||
title: str = "Analysis Report") -> dict:
|
||
"""End-to-end pipeline: clean -> describe -> test -> report."""
|
||
# Step 1: clean
|
||
cleaner = DataCleaner(raw_data)
|
||
cleaner.remove_nans()
|
||
cleaner.remove_outliers()
|
||
|
||
# Step 2: descriptive statistics
|
||
desc = DescriptiveStats(cleaner.cleaned)
|
||
report_data = desc.full_report()
|
||
|
||
# Step 3: normality test
|
||
tester = HypothesisTester(cleaner.cleaned)
|
||
norm_stat, norm_p = tester.normality_test()
|
||
|
||
# Step 4: assemble report
|
||
report = ReportGenerator(title)
|
||
report.add_descriptive(desc)
|
||
report.add_hypothesis(tester, "Shapiro-Wilk", (norm_stat, norm_p))
|
||
|
||
return report.to_dict()
|
||
|
||
|
||
# ── Main ────────────────────────────────────────────────────────────────────
|
||
|
||
if __name__ == "__main__":
|
||
np.random.seed(42)
|
||
sample = list(np.random.normal(loc=50, scale=10, size=200))
|
||
sample += [float("nan"), float("nan"), 999.0] # inject NaN + outlier
|
||
|
||
result = run_analysis_pipeline(sample, title="Demo Report")
|
||
print(json.dumps(result, indent=2))
|