Source code for ETIA.CRV.causal_graph_utils.mag_to_pag


import pandas as pd
from .orientation_rules import *
from .enforce_stationarity import *
from .get_unshielded_triples import *


# Functions to convert MAG to PAG
# Author: kbiza@csd.uoc.gr, based on the matlab code by striant@csd.uoc.gr


[docs] def FCI_rules_mag(G, mag, verbose): ''' Applies the FCI rules on the given graph Parameters ---------- G(numpy matrix): the matrix of the graph mag(numpy matrix) : the matrix of the mag verbose (bool) Returns --------- G(numpy matrix): the matrix of the graph dnc (dictionary) flagcount (int) ''' flagcount = 0 unshielded_triples = get_unshielded_triples(G) G, dnc = R0(G, unshielded_triples, mag, verbose) flag = True while flag: flag = False G, flag = R1(G, flag, verbose) G, flag = R2_(G, flag, verbose) G, flag = R3(G, flag, verbose) G, flag = R4(G, mag, flag, verbose) flagcount = flagcount + int(flag) flag = True while flag: flag = False G, flag = R8(G, flag, verbose) G, flag = R9_R10(G, dnc, flag, verbose) flagcount = flagcount + int(flag) return G, dnc, flagcount
[docs] def mag_to_pag(mag_pd, verbose, n_lags=None): ''' Converts MAG to PAG Parameters ---------- mag_pd (pandas Dataframe): the matrix of the MAG verbose (bool) n_lags (int) : the maximum number of previous time lags in case of time-lagged graphs Returns ------- pag_pd (pandas Dataframe) : the matrix of the PAG ''' mag = mag_pd.to_numpy() pag = mag.copy() pag[pag != 0] = 1 pag, dnc, flagcount = FCI_rules_mag(pag, mag, verbose) if isinstance(n_lags, int): pag = enforce_stationarity_arrowheads(pag, mag_pd, n_lags, verbose) pag = enforce_stationarity_tails_and_orientation(pag, mag_pd, n_lags, verbose) pag, dnc, flagcount = FCI_rules_mag(pag, mag, verbose) pag_pd = pd.DataFrame(pag, columns=mag_pd.columns, index=mag_pd.index) return pag_pd