Source code for pymars.mars

"""
Main MARS regression class
==========================

Public API for Multivariate Adaptive Regression Splines.
"""

import numpy as np
from typing import Optional, Dict, List
from .basis import BasisFunction, build_design_matrix
from .model import ForwardPass, BackwardPass
from .gcv import GCVCalculator, calculate_mse, calculate_r2
from .utils import (standardize_data, unstandardize_predictions,
                   calculate_minspan, calculate_endspan, solve_least_squares)


[docs] class MARS: """ Multivariate Adaptive Regression Splines (MARS) Nonparametric regression method that automatically selects relevant variables and detects interactions through adaptive basis function construction. Parameters ---------- max_terms : int, default=30 Maximum number of basis functions to create in forward pass. The final model will typically have fewer due to pruning. max_degree : int, default=1 Maximum interaction order (number of variables in a product). - max_degree=1: Additive model (no interactions) - max_degree=2: Pairwise interactions allowed - max_degree=n: Full interactions allowed penalty : float, default=3.0 GCV penalty parameter (d in Friedman 1991). - penalty=2: Recommended for additive models - penalty=3: Recommended for general MARS Larger values produce smoother models. minspan : int or 'auto', default='auto' Minimum number of observations between knots. 'auto' uses Friedman's formula: -log2(alpha/n) / 2.5 endspan : int or 'auto', default='auto' Minimum observations from data endpoints. 'auto' uses Friedman's formula: 3 - log2(alpha/n) alpha : float, default=0.05 Significance level for automatic span calculation. Only used if minspan or endspan is 'auto'. standardize : bool, default=True Whether to standardize features before fitting. Recommended for numerical stability. verbose : bool, default=True Whether to print progress messages during fitting. Attributes ---------- basis_functions_ : list of BasisFunction Selected basis functions after pruning coefficients_ : array Fitted coefficients for basis functions gcv_score_ : float Final GCV score n_features_in_ : int Number of input features feature_importances_ : array Importance score for each feature Examples -------- >>> from pymars import MARS >>> import numpy as np >>> >>> # Generate synthetic data >>> X = np.random.randn(100, 5) >>> y = X[:, 0]**2 + 2*X[:, 1] + np.random.randn(100)*0.1 >>> >>> # Fit MARS model >>> model = MARS(max_terms=20, max_degree=2) >>> model.fit(X, y) >>> >>> # Make predictions >>> y_pred = model.predict(X) >>> >>> # View summary >>> model.summary() """
[docs] def __init__(self, max_terms: int = 30, max_degree: int = 1, penalty: float = 3.0, minspan: str = 'auto', endspan: str = 'auto', alpha: float = 0.05, standardize: bool = True, verbose: bool = True,smooth: bool = False ): self.max_terms = max_terms self.max_degree = max_degree self.penalty = penalty self.minspan = minspan self.endspan = endspan self.alpha = alpha self.standardize = standardize self.verbose = verbose # Fitted attributes (set during fit) self.basis_functions_: Optional[List[BasisFunction]] = None self.coefficients_: Optional[np.ndarray] = None self.gcv_score_: Optional[float] = None self.n_features_in_: Optional[int] = None self.feature_importances_: Optional[np.ndarray] = None # Standardization parameters self._x_mean: Optional[np.ndarray] = None self._x_std: Optional[np.ndarray] = None self._y_mean: float = 0.0 self._y_std: float = 1.0 # Training metrics self._train_mse: Optional[float] = None self._train_r2: Optional[float] = None self.smooth = smooth # Enable cubic conversion self.cubic_basis_ = None # Store cubic basis if used
[docs] def fit(self, X: np.ndarray, y: np.ndarray) -> 'MARS': """ Fit MARS model to data Parameters ---------- X : array-like, shape (n_samples, n_features) Training input features y : array-like, shape (n_samples,) Training target values Returns ------- self : MARS Fitted estimator """ # Validate and convert inputs X, y = self._validate_data(X, y) n_samples, n_features = X.shape self.n_features_in_ = n_features if self.verbose: print(f"\n{'='*60}") print(f"MARS Fitting") print(f"{'='*60}") print(f"Samples: {n_samples}, Features: {n_features}") print(f"Max terms: {self.max_terms}, Max degree: {self.max_degree}") print(f"Penalty: {self.penalty}") # Standardize if requested if self.standardize: X_scaled, self._x_mean, self._x_std = standardize_data(X) y_scaled, y_mean, y_std = standardize_data(y.reshape(-1, 1)) y_scaled = y_scaled.ravel() self._y_mean = y_mean[0] self._y_std = y_std[0] else: X_scaled = X.copy() y_scaled = y.copy() # Calculate span parameters if auto if self.minspan == 'auto': minspan = calculate_minspan(n_samples, self.alpha) else: minspan = self.minspan if self.endspan == 'auto': endspan = calculate_endspan(n_samples, n_features, self.alpha) else: endspan = self.endspan if self.verbose: print(f"Minspan: {minspan}, Endspan: {endspan}") print() # Forward pass: build large model forward = ForwardPass( max_terms=self.max_terms, max_degree=self.max_degree, minspan=minspan, endspan=endspan ) basis_functions, coefficients = forward.fit(X_scaled, y_scaled) # Backward pass: prune to optimal size gcv_calc = GCVCalculator(penalty=self.penalty) backward = BackwardPass(gcv_calculator=gcv_calc) self.basis_functions_, self.coefficients_, self.gcv_score_ = backward.prune( X_scaled, y_scaled, basis_functions, coefficients ) # Calculate feature importances self.feature_importances_ = self._calculate_feature_importance() # Calculate training metrics y_pred = self.predict(X) self._train_mse = calculate_mse(y, y_pred) self._train_r2 = calculate_r2(y, y_pred) if self.verbose: print(f"\n{'='*60}") print(f"Training complete") print(f"Final model: {len(self.basis_functions_)} basis functions") print(f"GCV score: {self.gcv_score_:.6f}") print(f"Training MSE: {self._train_mse:.6f}") print(f"Training R²: {self._train_r2:.6f}") print(f"{'='*60}\n") # NOUVEAU: Convert to cubic if requested if self.smooth: from .cubic import convert_to_cubic self.cubic_basis_, self.coefficients_ = convert_to_cubic( self.basis_functions_, X_scaled, y_scaled ) return self
[docs] def predict(self, X: np.ndarray) -> np.ndarray: """ Make predictions using fitted MARS model Parameters ---------- X : array-like, shape (n_samples, n_features) Input features (in original scale, not standardized) Returns ------- y_pred : array, shape (n_samples,) Predicted values Notes ----- All knot locations in basis_functions_ are stored in the standardized domain (after mean/std transformation if self.standardize=True). Input X is automatically scaled to match before evaluation. """ if self.basis_functions_ is None: raise ValueError("Model not fitted. Call fit() first.") X = self._validate_input(X) # Standardize if needed (knots are in standardized domain) if self.standardize: X_scaled = (X - self._x_mean) / self._x_std else: X_scaled = X # Evaluate basis functions B = build_design_matrix(X_scaled, self.basis_functions_) y_pred = B @ self.coefficients_ if self.smooth and self.cubic_basis_ is not None: # Use cubic basis from .cubic import evaluate_cubic_basis B = evaluate_cubic_basis(self.cubic_basis_, X_scaled) else: # Use linear basis B = build_design_matrix(X_scaled, self.basis_functions_) y_pred = B @ self.coefficients_ # Unstandardize predictions if self.standardize: y_pred = unstandardize_predictions(y_pred, self._y_mean, self._y_std) return y_pred
[docs] def score(self, X: np.ndarray, y: np.ndarray) -> float: """ Calculate R² score on test data Parameters ---------- X : array-like, shape (n_samples, n_features) Test features y : array-like, shape (n_samples,) True values Returns ------- r2 : float R² score """ y_pred = self.predict(X) return calculate_r2(y, y_pred)
[docs] def summary(self) -> Dict: """ Get model summary with statistics and basis functions Returns ------- summary : dict Model information including: - n_basis: Number of basis functions - gcv_score: GCV score - train_mse: Training MSE (if available) - train_r2: Training R² (if available) - basis_info: List of basis function descriptions - feature_importances: Feature importance scores """ if self.basis_functions_ is None or len(self.basis_functions_) == 0: raise ValueError("Model not fitted. Call fit() first.") # Collect basis function info basis_info = [] for i, (basis, coef) in enumerate(zip(self.basis_functions_, self.coefficients_)): info = { 'index': i, 'coefficient': float(coef), 'degree': basis.degree, 'variables': basis.variables, 'description': str(basis) } basis_info.append(info) # Calculate max degree safely max_degree = max((b.degree for b in self.basis_functions_), default=0) # Handle optional training metrics safely train_mse = float(self._train_mse) if hasattr(self, '_train_mse') and self._train_mse is not None else None train_r2 = float(self._train_r2) if hasattr(self, '_train_r2') and self._train_r2 is not None else None summary = { 'n_basis': len(self.basis_functions_), 'max_degree_achieved': max_degree, 'n_features': self.n_features_in_, 'gcv_score': float(self.gcv_score_) if self.gcv_score_ is not None else None, 'train_mse': train_mse, 'train_r2': train_r2, 'basis_functions': basis_info, 'feature_importances': self.feature_importances_.tolist() } # Print summary print(f"\n{'='*70}") print(f"MARS Model Summary") print(f"{'='*70}") print(f"Number of basis functions: {summary['n_basis']}") print(f"Number of features: {summary['n_features']}") print(f"Maximum degree: {summary['max_degree_achieved']}") if summary['gcv_score'] is not None: print(f"GCV score: {summary['gcv_score']:.6f}") if train_mse is not None and train_r2 is not None: print(f"Training MSE: {train_mse:.6f}") print(f"Training R²: {train_r2:.6f}") print(f"Training MSE: {summary['train_mse']:.6f}") print(f"Training R²: {summary['train_r2']:.6f}") print(f"\nFeature Importances:") for i, imp in enumerate(summary['feature_importances']): print(f" x{i}: {imp:.4f}") print(f"\nBasis Functions:") for info in basis_info[:10]: # Show first 10 print(f" [{info['index']}] coef={info['coefficient']:8.4f} {info['description']}") if len(basis_info) > 10: print(f" ... ({len(basis_info) - 10} more)") print(f"{'='*70}\n") return summary
[docs] def get_anova_decomposition(self) -> Dict[int, List[BasisFunction]]: """ Get ANOVA decomposition of model Returns functions grouped by interaction order: - order 0: constant - order 1: main effects (single variables) - order 2: two-way interactions - etc. Returns ------- anova : dict Maps interaction order to list of basis functions """ if self.basis_functions_ is None: raise ValueError("Model not fitted. Call fit() first.") anova = {} for basis in self.basis_functions_: order = basis.degree if order not in anova: anova[order] = [] anova[order].append(basis) return anova
def _calculate_feature_importance(self) -> np.ndarray: """ Calculate feature importance scores Importance = sum of |coefficient| for all basis functions containing that variable. The constant term (first basis) is excluded. """ if self.basis_functions_ is None or len(self.basis_functions_) == 0: return np.zeros(self.n_features_in_) importance = np.zeros(self.n_features_in_) # Skip the constant term (first basis function) for i, (basis, coef) in enumerate(zip(self.basis_functions_[1:], self.coefficients_[1:]), 1): # Only count non-zero coefficients for non-constant terms if abs(coef) > 0: for var in basis.variables: importance[var] += abs(coef) # Normalize to [0, 1] if importance.sum() > 0: importance /= importance.sum() # Ensure shape matches n_features_in_ importance = importance[:self.n_features_in_] return importance def _validate_data(self, X, y): """Validate and convert training data""" X = np.asarray(X, dtype=np.float64) y = np.asarray(y, dtype=np.float64).ravel() if X.ndim != 2: raise ValueError(f"X must be 2D, got shape {X.shape}") if len(y) != X.shape[0]: raise ValueError(f"X and y must have same length: {X.shape[0]} vs {len(y)}") return X, y def _validate_input(self, X): """Validate prediction input""" X = np.asarray(X, dtype=np.float64) if X.ndim == 1: X = X.reshape(1, -1) if X.shape[1] != self.n_features_in_: raise ValueError(f"Expected {self.n_features_in_} features, got {X.shape[1]}") return X