Source code for ETIA.CausalLearning.CDHPO.OCT.OCT

import os
import copy
import pickle
import numpy as np
from typing import Any, Dict, List, Optional, Tuple

from joblib import Parallel, delayed
from sklearn.metrics import mutual_info_score
from sklearn.model_selection import ParameterGrid

from ...utils import get_logger
from .utils import mutual_info_continuous, is_dict_in_array
from ...CDHPO.CDHPOBase import CDHPOBase
from ...CausalModel.utils import pywhy_graph_to_matrix
from ....CRV.causal_graph_utils.markov_boundary import markov_boundary

[docs] class OCT(CDHPOBase): """ Class for performing Order-Based Causal Transfer (OCT) procedure. Parameters ---------- oct_params : CDHPOParameters Object containing the parameters required for the OCT procedure. data : Dataset Data to be used for the OCT procedure. results_folder : str Path to the folder where results will be saved. verbose : bool, optional If True, enables verbose logging. Default is False. Methods ------- run() Executes the OCT procedure. run_new() Continues the OCT procedure with new configurations. find_best_config(algorithms) Finds the best configuration among specified algorithms. save_progress() Saves the current state of the OCT object to a file. load_progress(path) Loads the OCT object state from a file. fold_fit(target, c, mec_graphs_configs, train_indexes, test_indexes, fold) Performs Markov boundary identification and predictive modeling for a specific fold. nodes_parallel(target, c, mec_graphs_configs, train_indexes, test_indexes) Calculates the mutual information between the true values and predicted values of a target node in parallel. config_parallel(c, mec_graphs_configs, train_indexes, test_indexes) Calculates the mutual information scores for all target nodes in parallel. permutations(node, poolYhat_best, poolYhat_cur, idxs, poolY) Calculates the mutual information scores after swapping predictions between best and current configurations. permutations_nodes(node, c) Performs permutations for a single node across all permutations. calculate_pvalues(c) Calculates p-values to compare the current configuration with the best one. """ def __init__(self, oct_params: Any, data: Any, results_folder: str, verbose=False): """ Initializes the OCT class. Parameters ---------- oct_params : CDHPOParameters Object containing the parameters required for the OCT procedure. data : Dataset Data to be used for the OCT procedure. results_folder : str Path to the folder where results will be saved. verbose : bool, optional If True, enables verbose logging. Default is False. """ super().__init__(oct_params, data) self.oct_params = copy.deepcopy(oct_params) self.data = copy.deepcopy(data) self.results_folder = results_folder self.verbose = verbose self.logger = get_logger(__name__, verbose=self.verbose) self.n_samples, self.n_nodes = self.data.get_dataset().shape self.data_type_info = self.data.get_data_type_info() self.logger.debug('OCT object has been initialized') # Instance-level attributes to ensure no shared references self.results = [] self.mb_configs = [] self.saved_mb_configs = {} self.pred_configs = [] self.saved_pred_configs = {} self.mu_configs = [] self.mb_size = np.array([]) self.total_configs_run = 0 self.configs_ran = [] self.saved_y_test = {} self.matrix_mec_graph = None self.matrix_graph = None self.var_map = None self.mec_graphs_configs = [] self.p_values = np.array([]) self.is_equal = np.array([]) self.mean_mb = np.array([]) self.y_test_nodes = [] self.idxs = []
[docs] def save_progress(self): """ Saves the current state of the OCT object to a file. """ path = os.path.join(self.results_folder, 'OCT.pkl') with open(path, 'wb') as f: pickle.dump(self, f) self.logger.debug(f'OCT progress saved to {path}')
[docs] @staticmethod def load_progress(path: str) -> 'OCT': """ Loads the OCT object state from a file. Parameters ---------- path : str The file path to load the progress from. Returns ------- OCT The loaded OCT object. """ with open(path, 'rb') as f: oct_instance = pickle.load(f) oct_instance.logger.debug(f'OCT progress loaded from {path}') return oct_instance
[docs] def fold_fit( self, target: int, c: int, mec_graphs_configs: List[Any], train_indexes: List[np.ndarray], test_indexes: List[np.ndarray], fold: int, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: """ Performs Markov boundary identification and predictive modeling for a specific fold of a target variable. Parameters ---------- target : int Target variable index. c : int Configuration index. mec_graphs_configs : list MEC graphs configurations. train_indexes : list List of training indices for each fold. test_indexes : list List of testing indices for each fold. fold : int Fold index. Returns ------- mb : np.ndarray Markov boundary indices. prediction : np.ndarray Predicted values. y_test : np.ndarray Actual target values for the test data. """ self.saved_mb_configs.setdefault(c, {}).setdefault(target, {}) self.saved_pred_configs.setdefault(c, {}).setdefault(target, {}) self.saved_y_test.setdefault(c, {}).setdefault(target, {}) if fold in self.saved_mb_configs[c][target]: return ( self.saved_mb_configs[c][target][fold], self.saved_pred_configs[c][target][fold], self.saved_y_test[c][target][fold], ) mb = markov_boundary(target, self.graphs_configs[c][fold]) self.mb_size[c, fold, target] = len(mb) data_ = self.data.get_dataset() X_train = data_.iloc[train_indexes[fold]].to_numpy()[:, mb] y_train = data_.iloc[train_indexes[fold]].to_numpy()[:, target] X_test = data_.iloc[test_indexes[fold]].to_numpy()[:, mb] y_test = data_.iloc[test_indexes[fold]].to_numpy()[:, target] if self.data_type_info['var_type'][target] == 'categorical': if len(mb) > 0: clf = copy.deepcopy(self.oct_params.regressor.regressor) clf.fit(X_train, y_train) prediction = clf.predict(X_test) else: values, counts = np.unique(y_train, return_counts=True) prediction = np.full(y_test.shape, values[np.argmax(counts)]) else: if len(mb) > 0: clf = copy.deepcopy(self.oct_params.regressor.regressor) clf.fit(X_train, y_train) prediction = clf.predict(X_test) else: prediction = np.random.uniform(np.min(y_train), np.max(y_train), y_test.shape) self.logger.debug(f'Fold {fold} for variable {target} is fitted') self.saved_mb_configs[c][target][fold] = mb self.saved_pred_configs[c][target][fold] = prediction self.saved_y_test[c][target][fold] = y_test return mb, prediction, y_test
[docs] def nodes_parallel( self, target: int, c: int, mec_graphs_configs: List[Any], train_indexes: List[np.ndarray], test_indexes: List[np.ndarray], ) -> Tuple[float, List[np.ndarray], List[np.ndarray], List[np.ndarray]]: """ Calculates the mutual information between the true values and predicted values of a target node in parallel. Parameters ---------- target : int Target variable index. c : int Configuration index. mec_graphs_configs : list MEC graphs configurations. train_indexes : list List of training indices for each fold. test_indexes : list List of testing indices for each fold. Returns ------- mu : float Mutual information score between the true values and predicted values. mb_folds : list List of Markov boundaries for each fold. pred_folds : list List of predictions for each fold. y_test_folds : list List of true values for each fold. """ results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.fold_fit)( target, c, mec_graphs_configs, train_indexes, test_indexes, fold, ) for fold in range(self.oct_params.oos_protocol.protocol.folds_to_run) ) mb_folds, pred_folds, y_test_folds = zip(*results) pred_folds_np = np.concatenate(pred_folds, axis=0) y_test_folds_np = np.concatenate(y_test_folds, axis=0) if self.data_type_info['var_type'][target] == 'categorical': mu = mutual_info_score(y_test_folds_np, pred_folds_np) else: mu = mutual_info_continuous(y_test_folds_np, pred_folds_np) self.logger.debug(f'All folds for target {target} have been fitted') return mu, list(mb_folds), list(pred_folds), list(y_test_folds)
[docs] def config_parallel( self, c: int, mec_graphs_configs: List[Any], train_indexes: List[np.ndarray], test_indexes: List[np.ndarray], ) -> Tuple[np.ndarray, List[List[np.ndarray]], List[List[np.ndarray]], List[List[np.ndarray]]]: """ Calculates the mutual information scores for all target nodes in parallel. Parameters ---------- c : int Configuration index. mec_graphs_configs : list MEC graphs configurations. train_indexes : list List of training indices for each fold. test_indexes : list List of testing indices for each fold. Returns ------- mu_list : np.ndarray Array of mutual information scores for each target node. mb_list : list List of Markov boundaries for each target node. pred_list : list List of predictions for each target node. y_test_list : list List of true values for each target node. """ results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.nodes_parallel)( target, c, mec_graphs_configs, train_indexes, test_indexes, ) for target in range(self.n_nodes) ) mu_list, mb_list, pred_list, y_test_list = zip(*results) self.logger.debug(f'Mutual information calculated for all nodes for config {c}') self.save_progress() return np.array(mu_list), list(mb_list), list(pred_list), list(y_test_list)
[docs] def permutations( self, node: int, poolYhat_best: np.ndarray, poolYhat_cur: np.ndarray, idxs: np.ndarray, poolY: np.ndarray, ) -> Tuple[float, float]: """ Calculates the mutual information scores after swapping predictions between best and current configurations. Parameters ---------- node : int Node index. poolYhat_best : np.ndarray Predictions from the best configuration. poolYhat_cur : np.ndarray Predictions from the current configuration. idxs : np.ndarray Indices for permutation. poolY : np.ndarray Actual target values. Returns ------- x : float Mutual information score for the best configuration after swap. y : float Mutual information score for the current configuration after swap. """ swap_best = poolYhat_best.copy() swap_cur = poolYhat_cur.copy() swap_best[idxs] = poolYhat_cur[idxs] swap_cur[idxs] = poolYhat_best[idxs] if self.data_type_info['var_type'][node] == 'categorical': x = mutual_info_score(poolY, swap_best) y = mutual_info_score(poolY, swap_cur) else: x = mutual_info_continuous(poolY, swap_best) y = mutual_info_continuous(poolY, swap_cur) return x, y
[docs] def permutations_nodes(self, node: int, c: int) -> Tuple[np.ndarray, np.ndarray]: """ Performs permutations for a single node across all permutations. Parameters ---------- node : int Node index. c : int Configuration index. Returns ------- swap_best_metric : np.ndarray Array of mutual information scores for the best configuration after swaps. swap_cur_metric : np.ndarray Array of mutual information scores for the current configuration after swaps. """ poolYhat_best = np.concatenate(self.pred_configs[self.opt_config][node], axis=0) poolYhat_cur = np.concatenate(self.pred_configs[c][node], axis=0) poolY = np.concatenate(self.y_test_nodes[node], axis=0) results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.permutations)( node, poolYhat_best, poolYhat_cur, self.idxs[i], poolY, ) for i in range(self.oct_params.n_permutations) ) swap_best_metric, swap_cur_metric = zip(*results) self.logger.debug(f'All permutations for node {node} have been completed') return np.array(swap_best_metric), np.array(swap_cur_metric)
[docs] def calculate_pvalues(self, c: int): """ Calculates p-values to compare the current configuration with the best one. Parameters ---------- c : int Configuration index. """ if c == self.opt_config: return swap_best_metric = np.zeros((self.oct_params.n_permutations, self.n_nodes)) swap_cur_metric = np.zeros((self.oct_params.n_permutations, self.n_nodes)) results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.permutations_nodes)(node, c) for node in range(self.n_nodes) ) for node, (best_metric, cur_metric) in enumerate(results): swap_best_metric[:, node] = best_metric swap_cur_metric[:, node] = cur_metric cur_metric = self.mean_mu_configs[c] best_metric = np.max(self.mean_mu_configs) obs_t_stat = best_metric - cur_metric t_stat = np.mean(swap_best_metric, axis=1) - np.mean(swap_cur_metric, axis=1) p_val = np.count_nonzero(t_stat >= obs_t_stat) / self.oct_params.n_permutations self.p_values[c] = p_val self.logger.debug(f'P-value for configuration {c} calculated: {p_val}') self.is_equal[c] = p_val > self.oct_params.alpha
[docs] def run(self) -> Tuple[Dict[str, Any], np.ndarray, Any]: """ Executes the OCT procedure. Returns ------- opt_config : dict The optimal configuration found. matrix_mec_graph : np.ndarray The MEC graph matrix of the optimal configuration. matrix_graph : nd.nd.array The graph matrix of optimal configuration library_results : Any Results from the causal discovery library. """ if self.configs_ran: self.logger.error('Previous configurations have already been run. Use run_new() to continue.') raise RuntimeError('Cannot run OCT when previous configurations have already been run.') self.oct_params.oos_protocol.protocol.init_protocol(self.data) mec_graphs_ = [] graphs_ = [] for algo, algo_configs in self.oct_params.configs.items(): cd_configs = list(ParameterGrid(algo_configs)) self.logger.info(f'Running algorithm: {algo}') for params in cd_configs: params['var_type'] = self.data_type_info['var_type'] matrix_mec_graph, matrix_graph, _ = self.oct_params.oos_protocol.protocol.run_protocol( self.data, params['model'], params ) mec_graphs = [mec_graph.to_numpy() for mec_graph in matrix_mec_graph] graphs = [graph.to_numpy() for graph in matrix_graph] mec_graphs_.append(mec_graphs) graphs_.append(graphs) self.save_progress() self.logger.debug(f'Protocol for params {params} has been run') params['name'] = algo self.configs_ran.append(params) self.mec_graphs_configs = mec_graphs_ self.graphs_configs = graphs_ num_configs = len(self.configs_ran) num_folds = self.oct_params.oos_protocol.protocol.folds_to_run self.mb_size = np.zeros((num_configs, num_folds, self.n_nodes)) config_results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.config_parallel)( c, self.mec_graphs_configs, self.oct_params.oos_protocol.protocol.train_indexes, self.oct_params.oos_protocol.protocol.test_indexes, ) for c in range(num_configs) ) self.mu_configs, self.mb_configs, self.pred_configs, y_test_nodes_list = zip(*config_results) self.y_test_nodes = y_test_nodes_list[0] self.mu_configs_np = np.stack(self.mu_configs, axis=0) self.mean_mu_configs = np.nanmean(self.mu_configs_np, axis=1) self.opt_config = int(np.argmax(self.mean_mu_configs)) pred_co = np.concatenate(self.pred_configs[0][0], axis=0) self.idxs = [np.random.choice(len(pred_co), len(pred_co) // 2, replace=False) for _ in range(self.oct_params.n_permutations)] self.p_values = np.ones(num_configs) self.is_equal = np.ones(num_configs, dtype=bool) self.mean_mb = np.mean(np.mean(self.mb_size, axis=2), axis=1) Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.calculate_pvalues)(c) for c in range(num_configs) ) OCTs_c = self.opt_config for c in range(num_configs): if self.is_equal[c] and self.mean_mb[c] < self.mean_mb[OCTs_c]: OCTs_c = c self.logger.debug(f'Best causal configuration is {self.configs_ran[OCTs_c]}') self.matrix_mec_graph, self.graph, library_results = self.configs_ran[OCTs_c]['model'].run( self.data, self.configs_ran[OCTs_c] ) self.save_progress() np.save(os.path.join(self.results_folder, 'matrix_mec_graph.npy'), self.matrix_mec_graph) return self.configs_ran[OCTs_c], self.matrix_mec_graph, self.graph, library_results
[docs] def run_new(self) -> Tuple[Dict[str, Any], np.ndarray, Any]: """ Continues the OCT procedure with new configurations. Returns ------- opt_config : dict The optimal configuration found. matrix_mec_graph : np.ndarray The MEC graph matrix of the optimal configuration. library_results : Any Results from the causal discovery library. """ if not self.configs_ran: self.logger.error('No previous configurations have been run. Use run() to start the OCT procedure.') raise RuntimeError('No previous run!') self.oct_params.oos_protocol.protocol.init_protocol(self.data) initial_config_count = len(self.configs_ran) mec_graphs_ = [] graphs_ = [] for algo, algo_configs in self.oct_params.configs.items(): cd_configs = list(ParameterGrid(algo_configs)) self.logger.debug(f'Running new configurations for algorithm: {algo}') for params in cd_configs: params['var_type'] = self.data_type_info['var_type'] params['name'] = algo if is_dict_in_array(params, self.configs_ran): continue matrix_mec_graph, matrix_graph, _ = self.oct_params.oos_protocol.protocol.run_protocol( self.data, params['model'], params ) mec_graphs = [mec_graph.to_numpy() for mec_graph in matrix_mec_graph] graphs = [graph.to_numpy() for graph in matrix_graph] mec_graphs_.append(mec_graphs) graphs_.append(graphs) self.save_progress() self.configs_ran.append(params) if not mec_graphs_: self.logger.warning('No new configurations to run.') return self.configs_ran[self.opt_config], self.matrix_mec_graph, None self.mec_graphs_configs.extend(mec_graphs_) self.graphs_configs.extend(graphs_) num_configs = len(self.configs_ran) num_folds = self.oct_params.oos_protocol.protocol.folds_to_run self.mb_size = np.zeros((num_configs, num_folds, self.n_nodes)) config_results = Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.config_parallel)( c, self.mec_graphs_configs, self.oct_params.oos_protocol.protocol.train_indexes, self.oct_params.oos_protocol.protocol.test_indexes, ) for c in range(num_configs) ) self.mu_configs, self.mb_configs, self.pred_configs, y_test_nodes_list = zip(*config_results) self.y_test_nodes = y_test_nodes_list[0] self.mu_configs_np = np.stack(self.mu_configs, axis=0) self.mean_mu_configs = np.nanmean(self.mu_configs_np, axis=1) self.opt_config = int(np.argmax(self.mean_mu_configs)) self.p_values = np.concatenate([self.p_values, np.ones(num_configs - initial_config_count)]) self.is_equal = np.concatenate([self.is_equal, np.ones(num_configs - initial_config_count, dtype=bool)]) self.mean_mb = np.concatenate([self.mean_mb, np.mean(np.mean(self.mb_size, axis=2), axis=1)]) Parallel(n_jobs=self.oct_params.n_jobs)( delayed(self.calculate_pvalues)(c) for c in range(initial_config_count, num_configs) ) OCTs_c = self.opt_config for c in range(num_configs): if self.is_equal[c] and self.mean_mb[c] < self.mean_mb[OCTs_c]: OCTs_c = c self.matrix_mec_graph, library_results = self.configs_ran[OCTs_c]['model'].run( self.data, self.configs_ran[OCTs_c] ) self.logger.info(f'Best causal configuration is {self.configs_ran[OCTs_c]}') self.save_progress() np.save(os.path.join(self.results_folder, 'matrix_mec_graph.npy'), self.matrix_mec_graph) return self.configs_ran[OCTs_c], self.matrix_mec_graph, library_results
[docs] def find_best_config(self, algorithms: List[str]) -> Tuple[Dict[str, Any], np.ndarray, Any]: """ Finds the best configuration among specified algorithms. Parameters ---------- algorithms : list List of algorithm names to consider. Returns ------- best_config : dict The best configuration among the specified algorithms. matrix_mec_graph : np.ndarray The MEC graph matrix of the best configuration. library_results : Any Results from the causal discovery library. Raises ------ RuntimeError If no configurations have been run for the specified algorithms. """ indices = [i for i, config in enumerate(self.configs_ran) if config['name'] in algorithms] if not indices: self.logger.error('Cannot find best configuration among algorithms that have not been run yet.') raise RuntimeError('Cannot find best configuration among algorithms that have not been run yet.') best_index = indices[np.argmax(self.mean_mu_configs[indices])] OCTs_c = best_index for c in indices: if self.is_equal[c] and self.mean_mb[c] < self.mean_mb[OCTs_c]: OCTs_c = c matrix_mec_graph, library_results = self.configs_ran[OCTs_c]['model'].run( self.data, self.configs_ran[OCTs_c] ) self.logger.info(f'Best configuration among specified algorithms is {self.configs_ran[OCTs_c]}') return self.configs_ran[OCTs_c], matrix_mec_graph, library_results