Source code for ETIA.AFS.AFS

import os
import random
import uuid
from multiprocessing import cpu_count
from typing import Any, Dict, List, Optional, Tuple, Union

import numpy as np
import pandas as pd
from sklearn.metrics import r2_score, roc_auc_score
import logging

from joblib import Parallel, delayed

from .feature_selector import FeatureSelector
from .oos import OOS
from .predictive_configurator import PredictiveConfigurator
from .predictive_model import PredictiveModel
from .preprocessor import Preprocessor


[docs] class AFS: """ Automated Feature Selection (AFS) class. Parameters ---------- depth : int, optional The depth of the feature selection process. Default is 1. verbose : bool, optional If True, prints detailed logs. Default is False. num_processors : int, optional Number of processors to use for parallel processing. Default is the number of CPU cores. oos_protocol : dict, optional A dictionary specifying the out-of-sample protocol. Default is a 5-fold cross-validation. random_seed : int, optional Seed for random number generator to ensure reproducibility. Default is None. Methods ------- run_AFS(data, target_features, pred_configs=None, dataset_name='dataset') Runs the AFS process on the provided data and target features. """ def __init__( self, depth: int = 1, verbose: bool = False, num_processors: Optional[int] = None, oos_protocol: Optional[Dict[str, Any]] = None, random_seed: Optional[int] = None, ): if depth < 0: raise ValueError("Depth must be a non-negative integer.") # Set default oos_protocol if none is provided self.oos_protocol = oos_protocol if oos_protocol else { "name": "KFoldCV", "folds": 5, "folds_to_run": 2, } self.csv_path = os.path.dirname(__file__) self.depth = depth self.verbose = verbose self.num_processors = num_processors if num_processors else cpu_count() self.random_seed = random_seed # Setup logging self.logger = logging.getLogger(__name__) logging.basicConfig( level=logging.INFO if self.verbose else logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s', ) # Set random seed for reproducibility if self.random_seed is not None: random.seed(self.random_seed) np.random.seed(self.random_seed)
[docs] def run_AFS( self, data: Union[str, pd.DataFrame, np.ndarray], target_features: Union[List[str], Dict[str, str]], pred_configs: Optional[Union[List[Dict[str, Any]], float]] = None, dataset_name: str = 'dataset', ) -> Dict[str, Any]: """ Runs the AFS process on the provided data and target features. Parameters ---------- data : str or pd.DataFrame or np.ndarray The dataset to use. Can be a filename (str), a pandas DataFrame, or a NumPy array. target_features : Union[Dict[str, str], List[str]] A dictionary mapping feature names to their types, or a list of feature names (in which case the types are inferred). pred_configs : Union[List[Dict[str, Any]], float], optional - If list, it is a list of predictive configurations provided by the user. - If float (between 0 and 1), it indicates the percentage of default configurations to sample and run. - If None, all default configurations are used. dataset_name : str, optional The name of the dataset (used for saving intermediate files). Default is 'dataset'. Returns ------- dict A dictionary containing: - 'original_data': The original dataset - 'reduced_data': The dataset with only the selected features and target features - 'best_config': The configuration that led to the best feature selection - 'selected_features': The selected features for each target Examples -------- To run feature selection on a dataset: >>> afs = AFS() >>> result = afs.run_AFS(data="data.csv", target_features=["feature1", "feature2"]) >>> print(result["selected_features"]) """ if not isinstance(target_features, (list, dict)): raise ValueError("target_features must be a list or dictionary.") # Load the data based on the type if isinstance(data, str): # Assume it's a filename original_data = pd.read_csv(data) elif isinstance(data, pd.DataFrame): original_data = data.copy() elif isinstance(data, np.ndarray): # Check if data includes target feature if data.dtype.names is not None: original_data = pd.DataFrame(data) else: # Need to get feature names from elsewhere or raise an error raise ValueError("When data is a NumPy array, it must have named columns.") else: raise ValueError("Data must be a filename (str), pandas DataFrame, or NumPy array.") if(original_data.empty): raise ValueError("Dataframe is empty.") # Handle target features being either a list or a dictionary if isinstance(target_features, list): target_features = {feature: 'unknown' for feature in target_features} # Generate default configurations if pred_configs is None or a float if pred_configs is None or isinstance(pred_configs, float): configurator = PredictiveConfigurator() all_configs = configurator.create_predictive_configs() if isinstance(pred_configs, float): # pred_configs is a float indicating the percentage to sample sample_rate = pred_configs if not (0 < sample_rate <= 1): raise ValueError("pred_configs as a float must be between 0 and 1.") total_configs = len(all_configs) sample_size = max(1, int(total_configs * sample_rate)) pred_configs = random.sample(all_configs, sample_size) self.logger.info(f"Sampled {sample_size} out of {total_configs} configurations.") else: # Use all configurations pred_configs = all_configs elif isinstance(pred_configs, list): # pred_configs is a list provided by the user pass else: raise ValueError("pred_configs must be a list of configurations or a float between 0 and 1.") best_config = None best_score = -float('inf') selected_features = {} reduced_data = original_data.copy() # Prepare arguments for parallel processing target_items = list(target_features.items()) args_list = [ ( original_data, target_feature, target_type, pred_configs, dataset_name, self.depth, None # visited_features ) for target_feature, target_type in target_items ] # Use joblib's Parallel to process targets in parallel results = Parallel(n_jobs=self.num_processors)( delayed(self._process_target)( data=arg[0], target_feature=arg[1], target_type=arg[2], pred_configs=arg[3], dataset_name=arg[4], depth=arg[5], visited_features=arg[6], ) for arg in args_list ) main_target = next(iter(target_features)) # Process results for (target_feature, _), result in zip(target_items, results): selected_features[target_feature] = result['selected_features'] if target_feature == main_target: best_score = result['bbc_score'] best_ci = result['ci'] best_config = result['best_config'] # Collect all selected features across all targets and depths all_selected_features = set() for features in selected_features.values(): all_selected_features.update(features) # Add target features to the selected features all_selected_features.update(target_features.keys()) reduced_data = reduced_data[list(all_selected_features)] reduced_without_target = reduced_data.loc[:, reduced_data.columns != main_target] if(reduced_without_target.empty): self.logger.info(f"No features selected for target '{target_feature}'") return { 'original_data': original_data, 'reduced_data': reduced_data, 'best_config': best_config, 'bbc_score': best_score, 'ci': best_ci, 'trained_model': None, 'selected_features': selected_features[target_feature], } pm = PredictiveModel() pm.fit( best_config, reduced_without_target.values, reduced_data[main_target].values, None, None, target_features[main_target], ) return { 'original_data': original_data, 'reduced_data': reduced_data, 'best_config': best_config, 'bbc_score': best_score, 'ci': best_ci, 'trained_model': pm, 'selected_features': selected_features, }
def _process_target( self, data: pd.DataFrame, target_feature: str, target_type: str, pred_configs: List[Dict[str, Any]], dataset_name: str, depth: int, visited_features: Optional[set] = None, ) -> Dict[str, Any]: """ Helper function to process each target in parallel. """ self.logger.info(f"Starting feature selection for target '{target_feature}'") return self.recursive_fs_for_target( data=data, target_feature=target_feature, target_type=target_type, pred_configs=pred_configs, dataset_name=dataset_name, depth=depth, visited_features=visited_features, )
[docs] def recursive_fs_for_target( self, data: pd.DataFrame, target_feature: str, target_type: str, pred_configs: List[Dict[str, Any]], dataset_name: str, depth: int, visited_features: Optional[set] = None, ) -> Dict[str, Any]: """ Recursively runs feature selection for a specific target feature up to the specified depth. """ if depth == 0: return { 'bbc_score': -float('inf'), 'ci': None, 'selected_features': [], 'best_config': None, } if visited_features is None: visited_features = set() visited_features.add(target_feature) self.logger.info(f"Depth {self.depth - depth + 1}: Feature selection for '{target_feature}'") # Prepare data excluding the target feature and visited features feature_columns = [col for col in data.columns if col != target_feature and col not in visited_features] if not feature_columns: self.logger.warning(f"No features left to select for target '{target_feature}' at depth {self.depth - depth + 1}.") return { 'bbc_score': -float('inf'), 'ci': None, 'selected_features': [], 'best_config': None, } # Generate the folds once and use them for every configuration oos = OOS() X = data[feature_columns] y = data[target_feature] # Determine target type if unknown if target_type == 'unknown': if pd.api.types.is_numeric_dtype(y): target_type = 'continuous' else: target_type = 'categorical' train_inds, test_inds = oos.data_split( self.oos_protocol, X.values, y.values, target_type=target_type ) # Run feature selection and model training for each configuration in parallel results = Parallel(n_jobs=self.num_processors)( delayed(self._process_config)( data=data, target_feature=target_feature, target_type=target_type, config=config, dataset_name=dataset_name, train_inds=train_inds, test_inds=test_inds, feature_columns=feature_columns, ) for config in pred_configs ) all_scores = [] all_fold_predictions = [] configs_tried = [] for result in results: if result is not None: config, mean_score, fold_predictions, selected_features_df = result all_scores.append(mean_score) all_fold_predictions.append((config, fold_predictions, selected_features_df)) configs_tried.append(config) if not all_scores: self.logger.warning(f"No valid configurations for target '{target_feature}' at depth {self.depth - depth + 1}.") return { 'bbc_score': -float('inf'), 'ci': None, 'selected_features': [], 'best_config': None, } # Identify the best configuration based on average scores idx_best_config = np.argmax(all_scores) best_config = configs_tried[idx_best_config] best_fold_predictions = all_fold_predictions[idx_best_config][1] selected_features_df = all_fold_predictions[idx_best_config][2] # Collect predictions and true values best_conf_predictions = [ (pred, true) for pred, true, _, _, _ in best_fold_predictions ] self.logger.info(f"Target: {target_feature} with optimal config: {best_config}") # Apply bootstrap bias correction to the best configuration bbc_score, ci = self.bootstrap_bias_correction(best_conf_predictions, target_type) # Collect selected features (assuming features may vary across folds) selected_features_sets = [ fold_info[2]['sel'] for fold_info in best_fold_predictions ] # Find common selected features across all folds if selected_features_sets: selected_feature_indices = list( set.intersection(*map(set, selected_features_sets)) ) selected_feature_names = [feature_columns[idx] for idx in selected_feature_indices] else: selected_feature_names = [] # Recursive feature selection on selected features all_selected_features = set() for feature in selected_feature_names: if feature not in visited_features: result = self.recursive_fs_for_target( data, target_feature=feature, target_type='unknown', pred_configs=pred_configs, dataset_name=dataset_name, depth=depth - 1, visited_features=visited_features.copy(), ) all_selected_features.update(result['selected_features']) all_selected_features.add(feature) return { 'bbc_score': bbc_score, 'ci': ci, 'selected_features': list(all_selected_features), 'best_config': best_config, }
def _process_config( self, data: pd.DataFrame, target_feature: str, target_type: str, config: Dict[str, Any], dataset_name: str, train_inds: List[np.ndarray], test_inds: List[np.ndarray], feature_columns: List[str], ) -> Optional[Tuple[Dict[str, Any], float, List[Tuple[np.ndarray, np.ndarray, Dict[str, Any], Any, Optional[Preprocessor]]], pd.DataFrame]]: """ Helper function to process each configuration in parallel. """ scores, fold_predictions, selected_features_df = self.run_fs_for_config( data, target_feature, target_type, config, dataset_name, train_inds, test_inds, feature_columns, ) if scores: mean_score = np.mean(scores) return (config, mean_score, fold_predictions, selected_features_df) else: return None
[docs] def run_fs_for_config( self, data: pd.DataFrame, target_feature: str, target_type: str, config: Dict[str, Any], dataset_name: str, train_inds: List[np.ndarray], test_inds: List[np.ndarray], feature_columns: List[str], ) -> Tuple[List[float], List[Tuple[np.ndarray, np.ndarray, Dict[str, Any], Any, Optional[Preprocessor]]], pd.DataFrame]: """ Runs the feature selection process for a specific configuration. """ scores = [] fold_predictions = [] selected_features_df = None config_id = str(uuid.uuid4()) for fold_num, (train_index, test_index) in enumerate(zip(train_inds, test_inds)): train_data = data.iloc[train_index] test_data = data.iloc[test_index] # Preprocessing if config.get('preprocess_method'): preprocessor = Preprocessor(method=config['preprocess_method']) train_data_preprocessed = preprocessor.fit_transform(train_data[feature_columns + [target_feature]]) test_data_preprocessed = preprocessor.transform(test_data[feature_columns + [target_feature]]) train_data_preprocessed_df = pd.DataFrame( train_data_preprocessed, columns=feature_columns + [target_feature] ) test_data_preprocessed_df = pd.DataFrame( test_data_preprocessed, columns=feature_columns + [target_feature] ) else: train_data_preprocessed_df = train_data[feature_columns + [target_feature]].copy() test_data_preprocessed_df = test_data[feature_columns + [target_feature]].copy() preprocessor = None # Initialize the FeatureSelector with the path to Rscript fs = FeatureSelector(r_path=config.get('r_path', 'Rscript')) # Perform feature selection try: unique_dataset_name = f"{dataset_name}_{target_feature}_{config_id}_fold{fold_num}" selected_features_fold_df = fs.feature_selection( config=config, target_name=target_feature, data_pd=train_data_preprocessed_df, dataset_name=unique_dataset_name, verbose=self.verbose ) except RuntimeError as e: self.logger.error(f"Feature selection failed for target '{target_feature}' with config {config}: {e}") continue # Skip this fold due to error if selected_features_fold_df.empty: self.logger.warning( f"No features selected for target '{target_feature}' with config {config} in fold {fold_num}. Skipping this fold." ) continue selected_feature_indices = selected_features_fold_df['sel'].tolist() feature_names = [feature_columns[idx] for idx in selected_feature_indices] # Prepare training and testing data train_X = train_data_preprocessed_df[feature_names] train_y = train_data_preprocessed_df[target_feature] test_X = test_data_preprocessed_df[feature_names] test_y = test_data_preprocessed_df[target_feature] # Model Training pm = PredictiveModel() pm.fit( config, train_X.values, train_y.values, None, preprocessor, target_type, ) predictions = pm.predict(test_X.values) fold_predictions.append( (predictions, test_y.values, selected_features_fold_df, pm, preprocessor) ) # Scoring if target_type == 'categorical': try: score = roc_auc_score(test_y.values, predictions) except ValueError: self.logger.warning( f"ROC AUC score could not be computed for fold {fold_num} due to insufficient classes." ) continue else: score = r2_score(test_y.values, predictions) scores.append(score) # Store selected features from the first fold if selected_features_df is None: selected_features_df = selected_features_fold_df return scores, fold_predictions, selected_features_df
[docs] def bootstrap_bias_correction( self, fold_predictions: List[Tuple[np.ndarray, np.ndarray]], target_type: str, B: int = 1000, conf_interval: float = 0.95, ) -> float: """ Applies bootstrap bias correction to the fold predictions. """ if not fold_predictions: return float('nan') all_predictions = np.concatenate([pred for pred, _ in fold_predictions]) all_true_values = np.concatenate([true for _, true in fold_predictions]) n_samples = len(all_predictions) b_scores = [] for _ in range(B): indices = np.random.choice(n_samples, n_samples, replace=True) sampled_preds = all_predictions[indices] sampled_trues = all_true_values[indices] if target_type == 'categorical': try: score = roc_auc_score(sampled_trues, sampled_preds) except ValueError: continue else: score = r2_score(sampled_trues, sampled_preds) b_scores.append(score) # Compute confidence interval lower_bound = (1.0 - conf_interval) / 2.0 upper_bound = 1.0 - lower_bound ci = np.percentile(b_scores, [100 * lower_bound, 100 * upper_bound]) self.logger.info(f'Confidence interval: {ci}') bbc_score = np.mean(b_scores) return bbc_score, ci