Source code for ETIA.CRV.confidence.confidence

from joblib import Parallel, delayed
import numpy as np
from ...CausalLearning.CausalModel.utils import matrix_to_pywhy_graph, pywhy_graph_to_matrix
from sklearn.utils import resample

[docs] def is_consistent_edge_L(m1_ij, m1_ji, m2_ij, m2_ji): """ Check if two edges are consistent based on their types. Parameters ---------- m1_ij : int Type of edge from node i to node j in the first matrix. m1_ji : int Type of edge from node j to node i in the first matrix. m2_ij : int Type of edge from node i to node j in the second matrix. m2_ji : int Type of edge from node j to node i in the second matrix. Returns ------- bool True if the edges are consistent, False otherwise. """ if m1_ij == m2_ij and m1_ji == m2_ji: return True else: if m1_ij == 1 and m1_ji == 1 and m2_ij != 0 and m2_ji != 0: return True elif m1_ij == 2 and m1_ji == 1: return m2_ij == 2 and m2_ji == 2 or m2_ij == 2 and m2_ji == 3 or m2_ij == 1 and m2_ji == 1 elif m1_ij == 1 and m1_ji == 2: return m2_ij == 2 and m2_ji == 2 or m2_ij == 3 and m2_ji == 2 or m2_ij == 1 and m2_ji == 1 elif m1_ij == 2 and m1_ji == 3: return m2_ij == 2 and m2_ji == 1 or m2_ij == 1 and m2_ji == 1 elif m1_ij == 3 and m1_ji == 2: return m2_ij == 1 and m2_ji == 2 or m2_ij == 1 and m2_ji == 1 elif m1_ij == 2 and m1_ji == 2: return m2_ij == 1 and m2_ji == 1 or m2_ij == 2 and m2_ji == 1 or m2_ij == 1 and m2_ji == 2 elif m1_ij == 0 and m2_ij != 0: return False elif m1_ij != 0 and m2_ij == 0: return False else: return False
[docs] def bootstrapping_causal_graph_parallel(input_data, config, tiers, is_cat_var): """ Perform bootstrapping of causal graphs in parallel. Parameters ---------- input_data : numpy.ndarray The input data. config : dict The configuration for the causal model. tiers : list Tiers for variable selection. is_cat_var : list of bool Boolean array indicating if the variable is categorical. Returns ------- list Bootstrapped samples, matrix graphs, and matrix MEC graphs. """ bootstrapped_ = resample(input_data, replace=True) matrix_mec_graph, matrix_graph, var_map = config['model'].run(input_data, config) matrix_mec_graph = pywhy_graph_to_matrix(matrix_mec_graph) matrix_graph = pywhy_graph_to_matrix(matrix_graph) return [bootstrapped_, matrix_graph, matrix_mec_graph]
[docs] def bootstrapping_causal_graph(n_bootstraps, input_data, tiers, best_config, is_cat_var): """ Perform bootstrapping of causal graphs. Parameters ---------- n_bootstraps : int Number of bootstrap repetitions. input_data : numpy.ndarray The input data. tiers : list Tiers for variable selection. best_config : dict The best causal configuration to estimate the bootstrapped graphs. is_cat_var : list of bool Boolean array indicating if the variable is categorical. Returns ------- list Bootstrapped MEC matrix and bootstrapped graph matrix. """ bootstrapped_samples = [] bootstrapped_matrix = [] results = Parallel(n_jobs=8)( delayed(bootstrapping_causal_graph_parallel)(input_data, best_config, tiers, is_cat_var) for nb in range(n_bootstraps)) results = np.array(results) bootstrapped_samples = results[:, 0] bootstrapped_matrix = results[:, 1][0] bootstrapped_mec_matrix = results[:, 2] return [bootstrapped_mec_matrix, bootstrapped_matrix]
[docs] def edge_metrics_on_bootstraps(best_mec_matrix, isPAG, bootstrapped_mec_matrix): """ Calculate edge consistency and similarity based on bootstrapped MEC matrices. Parameters ---------- best_mec_matrix : numpy.ndarray The best MEC matrix. isPAG : bool True if the matrix is a PAG, False otherwise. bootstrapped_mec_matrix : list of numpy.ndarray Bootstrapped MEC matrices. Returns ------- tuple Edge consistency and edge similarity. """ best_mec_matrix = pywhy_graph_to_matrix(best_mec_matrix) n_bootstraps = len(bootstrapped_mec_matrix) n_nodes = best_mec_matrix.shape[0] n_edges = int(np.count_nonzero(best_mec_matrix) / 2) consistency_count = np.zeros((n_edges, 1), dtype=int) similarity_count = np.zeros((n_edges, 1), dtype=int) c = 0 for i in range(n_nodes): for j in range(i + 1, n_nodes): if best_mec_matrix[i, j] != 0: for nb in range(n_bootstraps): if is_consistent_edge_L(best_mec_matrix[i, j], best_mec_matrix[j, i], bootstrapped_mec_matrix[nb][i, j], bootstrapped_mec_matrix[nb][j, i]): consistency_count[c] += 1 if bootstrapped_mec_matrix[nb][i, j] == best_mec_matrix[i, j] and \ bootstrapped_mec_matrix[nb][j, i] == best_mec_matrix[j, i]: similarity_count[c] += 1 c += 1 edge_consistency = consistency_count / n_bootstraps edge_similarity = similarity_count / n_bootstraps return edge_consistency, edge_similarity
[docs] def calculate_confidence(dataset, opt_conf, n_bootstraps=50): """ Calculate edge consistency and similarity confidence. Parameters ---------- dataset : object The dataset. opt_conf : dict The optimal configuration. n_bootstraps : int, optional Number of bootstrap repetitions. Default is 50. Returns ------- tuple Edge consistency and edge similarity. """ if opt_conf is None: raise RuntimeError("You need to have an optimal configuration before you can calculate the edge confidences") bootstrapped_mec_matrix, bootstrapped_graph_matrix = bootstrapping_causal_graph(n_bootstraps=n_bootstraps, input_data=dataset.get_dataset(), tiers=None, best_config=opt_conf, is_cat_var=opt_conf.data_type_info['var_type']) np.save('bootstraped_graphs', bootstrapped_mec_matrix) edge_consistency, edge_similarity = edge_metrics_on_bootstraps( best_mec_matrix=opt_conf.matrix_mec_graph, isPAG=True, bootstrapped_mec_matrix=bootstrapped_mec_matrix) return edge_consistency, edge_similarity