Source code for ETIA.CausalLearning.CausalLearner

# ETIA/CausalLearning/CausalLearner.py

import pickle
import time
from typing import Optional, Union
import pandas as pd

from .algorithms import causaldiscoveryalgorithms
from ..data.Dataset import Dataset
from .configurations import Configurations, configurations
from .configurations.causal_configs import causal_configs
from .CDHPO import OCT
from ..utils import get_logger
from .CausalModel.utils import pywhy_graph_to_matrix
from ..utils.jvm_manager import *  # Import the JVMManager

[docs] class CausalLearner: """ CausalLearner class for automated causal discovery. Parameters ---------- dataset_input : str or Dataset Either a file path to the dataset or a Dataset instance containing the data. configurations : Configurations, optional A Configurations object containing experiment configurations. If None, default configurations are used. verbose : bool, optional If True, prints detailed logs. Default is False. n_jobs : int, optional Number of jobs for parallel processing. Default is the number of CPU cores. random_seed : int, optional Seed for random number generator to ensure reproducibility. Default is None. Methods ------- learn_model() Runs the causal discovery process. print_results(opt_conf=None) Prints the results of the causal discovery process. set_dataset(dataset) Sets the dataset for the causal learner. set_configurations(configurations) Sets the configurations for the causal learner. save_progress(path=None) Saves the progress of the experiment to a file. load_progress(path) Loads the progress of the experiment from a file. add_configurations_from_file(filename) Adds additional configurations to the experiment from a JSON file. update_learnt_model() Updates the learnt model with new configurations. get_best_model_between_algorithms(algorithms) Gets the best model between specified algorithms. get_best_model_between_family(**kwargs) Gets the best model within a family of algorithms based on specified criteria. """ def __init__( self, dataset_input: Optional[Union[str, Dataset]] = None, configurations: Optional[Configurations] = None, verbose: bool = False, n_jobs: Optional[int] = None, random_seed: Optional[int] = None, ): if n_jobs is None: n_jobs = os.cpu_count() start_jvm() self.verbose = verbose self.n_jobs = n_jobs self.random_seed = random_seed # Setup logging self.logger = get_logger(name=__name__, verbose=self.verbose) self.logger.debug('Initializing CausalLearner') # Initialize configurations if configurations is None: # Initialize dataset if isinstance(dataset_input, Dataset): self.dataset = dataset_input elif isinstance(dataset_input, pd.DataFrame): # If a plain DataFrame is provided, initialize Dataset with a default name self.dataset = Dataset( data=dataset_input, data_time_info={'n_lags': 0, 'time_lagged': False}, time_series=False, dataset_name='Preloaded Dataset' ) elif isinstance(dataset_input, str): self.dataset = Dataset(filename=dataset_input) else: raise ValueError("dataset_input must be either a file path, a Dataset instance, or a pandas DataFrame") self.configurations = Configurations(dataset=self.dataset, verbose=self.verbose, n_jobs=n_jobs) elif not isinstance(configurations, Configurations): self.logger.error('Configurations must be of type Configurations.') raise TypeError('Configurations must be of type Configurations.') else: self.configurations = configurations self.dataset = configurations.dataset self.results_folder = self.configurations.results_folder # Initialize CDHPO (Combined Discovery and Hyperparameter Optimization) self.cdhpo = OCT( oct_params=self.configurations.cdhpo_params, data=self.dataset, results_folder=self.results_folder, verbose=self.verbose ) self.opt_conf = None self.matrix_mec_graph = None self.run_time = None # Set random seed for reproducibility if self.random_seed is not None: import numpy as np import random random.seed(self.random_seed) np.random.seed(self.random_seed)
[docs] def learn_model(self): """ Runs the causal discovery process using the OCT algorithm. Returns ------- Tuple containing: - opt_conf: The optimal configuration found. - matrix_mec_graph: The MEC graph matrix. - matrix_graph: The graph matrix - run_time: The runtime of the CDHPO process. - library_results: Results from the causal discovery libraries. """ self.logger.debug( 'Starting OCT Run') start_time = time.time() try: self.opt_conf, self.matrix_mec_graph, self.matrix_graph, library_results = self.cdhpo.run() except AttributeError as e: self.logger.error(f"Attribute error during CDHPO run: {e}") raise except Exception as e: self.logger.error(f"Error during CDHPO run: {e}") raise end_time = time.time() self.run_time = str(end_time - start_time) self.logger.debug(f'CDHPO Runtime: {self.run_time}') self.print_results() return { 'optimal_conf': self.opt_conf, 'matrix_mec_graph': self.matrix_mec_graph, 'matrix_graph' : self.matrix_graph, 'run_time': self.run_time, 'library_results': library_results }
[docs] def print_results(self, opt_conf=None): """ Prints the results of the causal discovery process. Parameters ---------- opt_conf : dict, optional The optimal configuration to print. If None, uses self.opt_conf. """ if opt_conf is None: opt_conf = self.opt_conf print('Best Causal Discovery configuration was:', opt_conf.get('name')) print('With parameters:') for par, val in opt_conf.items(): if par not in ['is_cat_var', 'model', 'var_type', 'indexes']: print(f'{par}: {val}') print('The MEC matrix graph is:') print(self.matrix_mec_graph)
[docs] def set_dataset(self, dataset): """ Sets the dataset for the causal learner. Parameters ---------- dataset : Dataset The Dataset object to set. Raises ------ TypeError If dataset is not of type Dataset. """ if not isinstance(dataset, Dataset): raise TypeError('Dataset must be of type Dataset.') self.dataset = dataset
[docs] def set_configurations(self, configurations): """ Sets the configurations for the causal learner. Parameters ---------- configurations : Configurations The Configurations object to set. Raises ------ TypeError If configurations is not of type Configurations. """ if not isinstance(configurations, Configurations): raise TypeError('Configurations must be of type Configurations.') self.configurations = configurations self.set_dataset(self.configurations.dataset)
[docs] def save_progress(self, path=None): """ Saves the progress of the experiment to a file. Parameters ---------- path : str, optional The file path to save the progress to. If None, saves to 'Experiment.pkl' in results_folder. """ if path is None: path = os.path.join(self.results_folder, 'Experiment.pkl') with open(path, 'wb') as f: pickle.dump(self, f) self.logger.debug( f'Progress saved to {path}')
[docs] @staticmethod def load_progress(path): """ Loads the progress of the experiment from a file. Parameters ---------- path : str The file path to load the progress from. Returns ------- CausalLearner The loaded CausalLearner object. """ with open(path, 'rb') as f: learner = pickle.load(f) learner.logger.debug( f'Progress loaded from {path}') return learner
[docs] def add_configurations_from_file(self, filename): """ Adds additional configurations to the experiment from a JSON file. Parameters ---------- filename : str The filename of the JSON file containing configurations. """ self.configurations.add_configurations_from_file(filename) self.logger.debug(f'Configurations added from {filename}')
[docs] def update_learnt_model(self): """ Updates the learnt model with the new configurations. """ self.logger.debug('Updating learnt model with new configurations') self.opt_conf, self.matrix_mec_graph, _ = self.cdhpo.run_new()
[docs] def get_best_model_between_algorithms(self, algorithms): """ Gets the best model between specified algorithms. Parameters ---------- algorithms : list A list of algorithm names to consider. Returns ------- dict The best configuration among the specified algorithms. """ best_config = self.cdhpo.find_best_config(algorithms) self.logger.debug(f'Best configuration among specified algorithms: {best_config}') return best_config
[docs] def get_best_model_between_family( self, causal_sufficiency=None, assume_faithfulness=None, is_output_mec=None, accepts_missing_values=None ): """ Gets the best model within a family of algorithms based on specified criteria. Parameters ---------- causal_sufficiency : bool, optional Filter algorithms that admit latent variables. assume_faithfulness : bool, optional Filter algorithms based on faithfulness assumption. is_output_mec : bool, optional Filter algorithms that output MEC graphs. accepts_missing_values : bool, optional Filter algorithms that accept missing values. Returns ------- dict The best configuration among the filtered algorithms. """ algorithms = [] for algo in causal_configs: if causal_sufficiency is not None and causal_configs[algo]['causal_sufficiency'] != causal_sufficiency: continue if assume_faithfulness is not None and causal_configs[algo]['assume_faithfulness'] != assume_faithfulness: continue if accepts_missing_values is not None and causal_configs[algo]['missing_values'] != accepts_missing_values: continue algorithms.append(algo) best_config = self.cdhpo.find_best_config(algorithms) self.logger.debug(f'Best configuration among filtered algorithms: {best_config}') return best_config