Source code for AFQ.viz.plot

import os.path as op
import os
import logging
import tempfile

import numpy as np
from scipy.stats import sem
import pandas as pd
from tqdm.auto import tqdm

import AFQ.viz.utils as vut
from AFQ.viz.utils import display_string
from AFQ.utils.stats import contrast_index as calc_contrast_index
from AFQ.data.utils import BUNDLE_RECO_2_AFQ, BUNDLE_MAT_2_PYTHON

try:
    from pingouin import intraclass_corr, corr
    import seaborn as sns
    import matplotlib.pyplot as plt
    from matplotlib.lines import Line2D
    from matplotlib.patches import Patch
    import IPython.display as display
except (ImportError, ModuleNotFoundError):
    raise ImportError(vut.viz_import_msg_error("plot"))

__all__ = ["visualize_tract_profiles", "visualize_gif_inline"]


[docs]def visualize_tract_profiles(tract_profiles, scalar="dti_fa", ylim=None, n_boot=1000, file_name=None, positions=vut.POSITIONS): """ Visualize all tract profiles for a scalar in one plot Parameters ---------- tract_profiles : string Path to CSV containing tract_profiles. scalar : string, optional Scalar to use in plots. Default: "dti_fa". ylim : list of 2 floats, optional Minimum and maximum value used for y-axis bounds. If None, ylim is not set. Default: None n_boot : int, optional Number of bootstrap resamples for seaborn to use to estimate the ci. Default: 1000 file_name : string, optional File name to save figure to if not None. Default: None positions : dictionary, optional Dictionary that maps bundle names to position in plot. Default: vut.POSITIONS Returns ------- Matplotlib figure and axes. """ csv_comparison = GroupCSVComparison( None, [tract_profiles], ["my_tract_profiles"], remove_model=False, scalar_bounds={'lb': {}, 'ub': {}}) df = csv_comparison.tract_profiles( scalar=scalar, ylim=ylim, positions=positions, out_file=file_name, plot_subject_lines=False, n_boot=n_boot) return df
class BrainAxes(): ''' Helper class. Creates and handles a grid of axes. Each axis corresponds to a bundle. Axis placement should roughly correspond to the actual bundle placement in the brain. ''' def __init__(self, size=(5, 5), positions=vut.POSITIONS, fig=None): self.size = size self.positions = positions self.on_grid = np.zeros((5, 5), dtype=bool) if fig is None: self.fig = plt.figure() label = "1" self.twinning = False else: # we are twinning with another BrainAxes self.fig = fig label = "2" self.twinning = True self.axes = self.fig.subplots( self.size[0], self.size[1], subplot_kw={"label": label, "frame_on": False}) self.fig.subplots_adjust( left=None, bottom=None, right=None, top=None, wspace=0.4, hspace=0.6) self.fig.set_size_inches((18, 18)) self.temp_fig, self.temp_axis = plt.subplots() self.temp_axis_owner = None def get_axis(self, bundle, axes_dict={}): ''' Given a bundle, turn on and get an axis. If bundle not positioned, will claim the temporary axis. If bundle in axes_dict, onyl return relevant axis. ''' if bundle in axes_dict.keys(): return axes_dict[bundle] elif bundle in self.positions.keys(): self.on_grid[self.positions[bundle]] = True return self.axes[self.positions[bundle]] else: if self.temp_axis_owner != bundle: plt.close(self.temp_fig) self.temp_fig, self.temp_axis = plt.subplots() self.temp_axis_owner = bundle return self.temp_axis def plot_line(self, bundle, x, y, data, ylabel, ylim, n_boot, alpha, lineplot_kwargs, plot_subject_lines=True, ax=None): ''' Given a dataframe data with at least columns x, y, and subjectID, plot the mean of subjects with ci of 95 in alpha and the individual subjects in alpha-0.2 using sns.lineplot() ''' if ax is None: ax = self.get_axis(bundle) lineplot_kwargs_mean = lineplot_kwargs.copy() sns.set(style="whitegrid", rc={"lines.linewidth": 1}) ax.hlines(0, 0, 95, linestyles='dashed', color='red') if plot_subject_lines: lineplot_kwargs_mean["hue"] = None lineplot_kwargs_mean["palette"] = None lineplot_kwargs_mean["color"] = "lightgray" sns.set(style="whitegrid", rc={"lines.linewidth": 0.5}) sns.lineplot( x=x, y=y, data=data, ci=None, estimator=None, units='subjectID', legend=False, ax=ax, alpha=alpha - 0.2, style=[True] * len(data.index), **lineplot_kwargs) sns.set(style="whitegrid", rc={"lines.linewidth": 3}) sns.lineplot( x=x, y=y, data=data, estimator='mean', errorbar=('ci', 95), n_boot=n_boot, legend=False, ax=ax, alpha=alpha, style=[True] * len(data.index), **lineplot_kwargs_mean) ax.set_title(display_string(bundle), fontsize=vut.large_font) ax.set_ylabel(ylabel, fontsize=vut.medium_font) ax.set_ylim(ylim) def format(self, disable_x=True, disable_y=True): ''' Call this functions once after all axes that you intend to use have been plotted on. Automatically formats brain axes. ''' for i in range(self.size[0]): for j in range(self.size[1]): if self.twinning: self.axes[i, j].xaxis.tick_top() self.axes[i, j].yaxis.tick_right() self.axes[i, j].xaxis.set_label_position('top') self.axes[i, j].yaxis.set_label_position('right') self.axes[i, j].tick_params( axis='y', which='major', labelsize=vut.small_font) self.axes[i, j].tick_params( axis='x', which='major', labelsize=vut.small_font) if not self.on_grid[i, j]: self.axes[i, j].axis("off") if self.twinning: if j != self.size[1] - 1 and self.on_grid[i][j + 1]: self.axes[i, j].set_yticklabels([]) self.axes[i, j].set_ylabel("") self.axes[i, j].set_xticklabels([]) self.axes[i, j].set_xlabel("") else: if disable_y and (j != 0 and self.on_grid[i][j - 1]): self.axes[i, j].set_yticklabels([]) self.axes[i, j].set_ylabel("") if disable_x or (i != self.size[0] - 1 and self.on_grid[i + 1][j]): self.axes[i, j].set_xticklabels([]) self.axes[i, j].set_xlabel("") self.fig.tight_layout() def save_temp_fig(self, o_folder, o_file, save_func): ''' If using a temporary axis, save it out and clear it. Returns True if temporary axis was saved, false if no temporary axis was in use ''' if self.temp_axis_owner is None: return False self.temp_fig.tight_layout() save_func(self.temp_fig, o_folder, o_file) plt.close(self.temp_fig) self.temp_axis_owner = None return True def is_using_temp_axis(self): return (self.temp_axis_owner is not None) def close_all(self): ''' Close all associated figures. ''' plt.close(self.temp_fig) plt.close(self.fig) class GroupCSVComparison(): """ Compare different CSVs, using: tract profiles, contrast indices, scan-rescan reliability using ICC. """ def __init__(self, out_folder, csv_fnames, names, is_special="", subjects=None, scalar_bounds={'lb': {'FA': 0.2}, 'ub': {'MD': 0.002}}, bundles=None, percent_nan_tol=10, percent_edges_removed=10, remove_model=True, mat_bundle_converter=BUNDLE_MAT_2_PYTHON, mat_column_converter=vut.CSV_MAT_2_PYTHON, mat_scale_converter=vut.SCALE_MAT_2_PYTHON, bundle_converter=BUNDLE_RECO_2_AFQ, ICC_func="ICC2"): """ Load in csv files, converting from matlab if necessary. Parameters ---------- out_folder : path Folder where outputs of this class's methods will be saved. csv_fnames : list of filenames Filenames for the two CSVs containing tract profiles to compare. Will obtain subject list from the first file. names : list of strings Name to use to identify each CSV dataset. is_special : str or list of strs, optional Whether or not the csv needs special attention. Can be "", "mat" if the csv was generated using mAFQ, or "reco" if the csv was generated using Recobundles. Default: "" subjects : list of num, optional List of subjects to consider. If None, will use all subjects in first dataset. Default: None scalar_bounds : dictionary, optional A dictionary with a lower bound and upper bound containting a series of scalar / threshold pairs used as a white-matter mask on the profiles (any values outside of the threshold will be marked NaN and not used or set to 0, depending on the case). Default: {'lb': {'FA': 0.2}, 'ub': {'MD': 0.002}} bundles : list of strings, optional Bundles to compare. If None, use all bundles in the first profile group. Default: None percent_nan_tol : int, optional Percentage of NaNs tolerable. If a profile has less than this percentage of NaNs, NaNs are interpolated. If it has more, the profile is thrown out. Default: 10 percent_edges_removed : int, optional Percentage of nodes to remove from the edges of the profile. Scalar values often change dramatically at the boundary between the grey matter and the white matter, and these effects can dominate plots. However, they are generally not interesting to us, and have low intersubject reliability. In a profile of 100 nodes, percent_edges_removed=10 would remove 5 nodes from each edge. Default: 10 remove_model : bool, optional Whether to remove prefix of scalars which specify model i.e., dti_fa => FA. Default: True mat_bundle_converter : dictionary, optional Dictionary that maps matlab bundle names to python bundle names. Default: BUNDLE_MAT_2_PYTHON mat_column_converter : dictionary, optional Dictionary that maps matlab column names to python column names. Default: CSV_MAT_2_PYTHON mat_scale_converter : dictionary, optional Dictionary that maps scalar names to how they should be scaled to match pyAFQ's scale for that scalar. Default: SCALE_MAT_2_PYTHON bundle_converter : dictionary, optional Dictionary that maps bundle names to more standard bundle names. Unlike mat_bundle_converter, this converter is applied to all CSVs Default: BUNDLE_RECO_2_AFQ ICC_func : string, optional ICC function to use to calculate correlations. Can be 'ICC1, 'ICC2', 'ICC3', 'ICC1k', 'ICC2k', 'ICC3k'. Default: "ICC2" """ self.logger = logging.getLogger('AFQ') self.ICC_func = ICC_func if "k" in self.ICC_func: self.ICC_func_name = f"ICC({self.ICC_func[3]},k)" else: self.ICC_func_name = f"ICC({self.ICC_func[3]},1)" self.out_folder = out_folder self.percent_nan_tol = percent_nan_tol if not isinstance(is_special, list): is_special = [is_special] * len(csv_fnames) self.profile_dict = {} for i, fname in enumerate(csv_fnames): profile = pd.read_csv(fname) if 'subjectID' in profile.columns: profile['subjectID'] = \ profile['subjectID'].apply( lambda x: int( ''.join(c for c in x if c.isdigit()) ) if isinstance(x, str) else x) else: profile['subjectID'] = 0 if is_special[i] == "mat": profile.rename( columns=mat_column_converter, inplace=True) profile['tractID'] = \ profile['tractID'].apply( lambda x: mat_bundle_converter[x]) for scalar, scale in mat_scale_converter.items(): profile[scalar] = \ profile[scalar].apply(lambda x: x * scale) profile.replace({"tractID": bundle_converter}, inplace=True) if is_special[i] == "reco": def reco_flip(df): if df.tractID in vut.RECO_FLIP: return 99 - df.nodeID else: return df.nodeID profile["nodeID"] = profile.apply(reco_flip, axis=1) if remove_model: profile.rename( columns=vut.SCALAR_REMOVE_MODEL, inplace=True) for bound, constraint in scalar_bounds.items(): for scalar, threshold in constraint.items(): profile[scalar] = \ profile[scalar].apply( lambda x: self._threshold_scalar( bound, threshold, x)) if percent_edges_removed > 0: profile = profile.drop(profile[np.logical_or( (profile["nodeID"] < percent_nan_tol // 2), (profile["nodeID"] >= 100 - (percent_nan_tol // 2)) )].index) self.profile_dict[names[i]] = profile if subjects is None: self.subjects = self.profile_dict[names[0]]['subjectID'].unique() else: self.subjects = subjects self.prof_len = 100 - (percent_nan_tol // 2) * 2 if bundles is None: self.bundles = self.profile_dict[names[0]]['tractID'].unique() self.bundles.sort() else: self.bundles = bundles self.color_dict = vut.gen_color_dict([*self.bundles, "median"]) # TODO: make these parameters self.scalar_markers = ["o", "x"] self.patterns = ( None, '/', 'o', 'x', '-', '.', '+', '//', '\\', '*', 'O', '|') def _threshold_scalar(self, bound, threshold, val): """ Threshold scalars by a lower and upper bound. """ if bound == "lb": if val > threshold: return val else: return np.nan elif bound == "ub": if val < threshold: return val else: return np.nan else: raise RuntimeError("scalar_bounds dictionary " + " formatted incorrectly. See" + " the default for reference") def _save_fig(self, fig, folder, f_name): """ Get file to save to, and generate the folder if it does not exist. """ if self.out_folder is None and folder is None: f_folder = None elif self.out_folder is None: f_folder = folder elif folder is None: f_folder = self.out_folder else: f_folder = op.join( self.out_folder, folder) if f_folder is None: fig.savefig(f_name + ".png") fig.savefig(f_name + ".svg", format='svg', dpi=300) else: os.makedirs(f_folder, exist_ok=True) fig.savefig(op.join(f_folder, f_name) + ".png") fig.savefig(op.join(f_folder, f_name) + ".svg", format='svg', dpi=300) def _get_profile(self, name, bundle, subject, scalar): """ Get a single profile, then handle not found / NaNs """ profile = self.profile_dict[name] single_profile = profile[ (profile['subjectID'] == subject) & (profile['tractID'] == bundle) ].sort_values("nodeID")[scalar].to_numpy() nans = np.isnan(single_profile) percent_nan = (np.sum(nans) * 100) // self.prof_len if len(single_profile) < 1: self.logger.warning( 'No scalars found for scalar ' + scalar + ' for subject ' + str(subject) + ' for bundle ' + bundle + ' for CSV ' + name) return None if percent_nan > 0: message = ( f'{percent_nan}% NaNs found in scalar ' + scalar + ' for subject ' + str(subject) + ' for bundle ' + bundle + ' for CSV ' + name) if np.sum(nans) > self.percent_nan_tol: self.logger.warn(message + '. Profile ignored. ') return None else: self.logger.info(message + '. NaNs interpolated. ') non_nan = np.logical_not(nans) single_profile[nans] = np.interp( nans.nonzero()[0], non_nan.nonzero()[0], single_profile[non_nan]) return single_profile def _alpha(self, alpha): ''' Keep alpha in a reasonable range Useful when calculating alpha automatically ''' if alpha < 0.3: return 0.3 if alpha > 1: return 1 return alpha def _array_to_df(self, arr): ''' Converts a 2xn array to a pandas dataframe with columns x, y Useful for plotting with seaborn. ''' df = pd.DataFrame() df['x'] = arr[0] df['y'] = arr[1] return df def masked_corr(self, arr, corrtype): ''' Mask arr for NaNs before calling np.corrcoef ''' mask = np.logical_not( np.logical_or( np.isnan(arr[0, ...]), np.isnan(arr[1, ...]))) if np.sum(mask) < 2: return np.nan, np.nan, np.nan arr = arr[:, mask] if corrtype == "ICC": data = pd.DataFrame({ "targets": np.concatenate( (np.arange(arr.shape[1]), np.arange(arr.shape[1]))), "raters": np.concatenate( (np.zeros(arr.shape[1]), np.ones(arr.shape[1]))), "ratings": np.concatenate( (arr[0], arr[1]))}) stats = intraclass_corr( data=data, targets="targets", raters="raters", ratings="ratings") row = stats[stats["Type"] == self.ICC_func].iloc[0] return row["ICC"], row["ICC"] - row["CI95%"][0],\ row["CI95%"][1] - row["ICC"] elif corrtype == "Srho": stats = corr( x=arr[0], y=arr[1], method="spearman") row = stats.iloc[0] return row["r"], row["r"] - row["CI95%"][0],\ row["CI95%"][1] - row["r"] else: raise ValueError("corrtype not recognized") def tract_profiles(self, names=None, scalar="FA", ylim=[0.0, 1.0], show_plots=False, positions=vut.POSITIONS, out_file=None, n_boot=1000, plot_subject_lines=True, axes_dict={}): """ Compare all tract profiles for a scalar from different CSVs. Plots tract profiles for all in one plot. Bundles taken from positions argument. Parameters ---------- names : list of strings, optional Names of datasets to plot profiles of. If None, all datasets are used. Default: None scalar : string, optional Scalar to use in plots. Default: "FA". ylim : list of 2 floats, optional Minimum and maximum value used for y-axis bounds. If None, ylim is not set. Default: [0.0, 1.0] out_file : str, optional Path to save the figure to. If None, use the default naming convention in self.out_folder Default: None n_boot : int, optional Number of bootstrap resamples for seaborn to use to estimate the ci. Default: 1000 show_plots : bool, optional Whether to show plots if in an interactive environment. Default: False positions : dictionary, optional Dictionary that maps bundle names to position in plot. Default: vut.POSITIONS plot_subject_lines : bool, optional Whether to plot individual subject lines with a smaller width. Default: True axes_dict : dictionary of axes, optional Plot contrast index for bundles that are keys of axes_dict on the corresponding axis. Default: {} """ if not show_plots: plt.ioff() if names is None: names = list(self.profile_dict.keys()) if out_file is None: o_folder = f"tract_profiles/{scalar}" o_file = f"{'_'.join(names)}" else: o_folder = None o_file = out_file ba = BrainAxes(positions=positions) labels = [] self.logger.info("Calculating means and CIs...") for j, bundle in enumerate(tqdm(self.bundles)): labels_temp = [] for i, name in enumerate(names): if i == 0: plot_kwargs = { "hue": "tractID", "palette": [self.color_dict[bundle]]} else: plot_kwargs = { "dashes": [(2**(i - 1), 2**(i - 1))], "hue": "tractID", "palette": [self.color_dict[bundle]]} profile = self.profile_dict[name] profile = profile[profile['tractID'] == bundle] ba.plot_line( bundle, "nodeID", scalar, profile, display_string(scalar), ylim, n_boot, self._alpha( 0.6 + 0.2 * i), plot_kwargs, plot_subject_lines=plot_subject_lines, ax=axes_dict.get(bundle)) if j == 0: line = Line2D( [], [], color=[0, 0, 0]) line.set_dashes((2**(i + 1), 2**(i + 1))) if ba.is_using_temp_axis(): labels_temp.append(line) else: labels.append(line) if ba.is_using_temp_axis(): ba.temp_fig.legend( labels_temp, names, fontsize=vut.medium_font) ba.save_temp_fig( o_folder, f"{o_file}_{bundle}", self._save_fig) if len(names) > 1: ba.fig.legend( labels, names, loc='center', fontsize=vut.medium_font) ba.format() self._save_fig(ba.fig, o_folder, o_file) if not show_plots: ba.close_all() plt.ion() def _contrast_index_df_maker(self, bundles, names, scalar): ci_df = pd.DataFrame(columns=["subjectID", "nodeID", "diff"]) for subject in self.subjects: profiles = [None] * 2 both_found = True for i, name in enumerate(names): for j, bundle in enumerate(bundles): profiles[i + j] = self._get_profile( name, bundle, subject, scalar) if profiles[i + j] is None: both_found = False if both_found: this_contrast_index = \ calc_contrast_index(profiles[0], profiles[1]) for i, diff in enumerate(this_contrast_index): ci_df = ci_df.append({ "subjectID": subject, "nodeID": i, "diff": diff}, ignore_index=True) return ci_df def contrast_index(self, names=None, scalar="FA", show_plots=False, n_boot=1000, ylim=(-0.5, 0.5), show_legend=False, positions=vut.POSITIONS, plot_subject_lines=True, axes_dict={}): """ Calculate the contrast index for each bundle in two datasets. Parameters ---------- names : list of strings, optional Names of datasets to plot profiles of. If None, all datasets are used. Should be a total of only two datasets. Default: None scalar : string, optional Scalar to use for the contrast index. Default: "FA". show_plots : bool, optional Whether to show plots if in an interactive environment. Default: False n_boot : int, optional Number of bootstrap resamples for seaborn to use to estimate the ci. Default: 1000 ylim : list of 2 floats, optional Minimum and maximum value used for y-axis bounds. If None, ylim is not set. Default: None show_legend : bool, optional Show legend in center with single entry denoting the scalar used. Default: False positions : dictionary, optional Dictionary that maps bundle names to position in plot. Default: vut.POSITIONS plot_subject_lines : bool, optional Whether to plot individual subject lines with a smaller width. Default: True axes_dict : dictionary of axes, optional Plot contrast index for bundles that are keys of axes_dict on the corresponding axis. Default: {} """ if not show_plots: plt.ioff() if names is None: names = list(self.profile_dict.keys()) if len(names) != 2: self.logger.error("To calculate the contrast index, " + "only two dataset names should be given") return None ba = BrainAxes(positions=positions) ci_all_df = {} for j, bundle in enumerate(tqdm(self.bundles)): ci_df = self._contrast_index_df_maker( [bundle], names, scalar) ba.plot_line( bundle, "nodeID", "diff", ci_df, "ACI", ylim, n_boot, 1.0, {"color": self.color_dict[bundle]}, plot_subject_lines=plot_subject_lines, ax=axes_dict.get(bundle)) ci_all_df[bundle] = ci_df ba.save_temp_fig( f"contrast_plots/{scalar}/", f"{names[0]}_vs_{names[1]}_contrast_index_{bundle}", self._save_fig) if show_legend: ba.fig.legend([scalar], loc='center', fontsize=vut.medium_font) ba.format() self._save_fig( ba.fig, f"contrast_plots/{scalar}/", f"{names[0]}_vs_{names[1]}_contrast_index") if not show_plots: ba.close_all() plt.ion() return ba.fig, ba.axes, ci_all_df def lateral_contrast_index(self, name, scalar="FA", show_plots=False, n_boot=1000, ylim=(-1, 1), positions=vut.POSITIONS, plot_subject_lines=True): """ Calculate the lateral contrast index for each bundle in a given dataset, for each dataset in names. Parameters ---------- name : string Names of dataset to plot profiles of. scalar : string, optional Scalar to use for the contrast index. Default: "FA". show_plots : bool, optional Whether to show plots if in an interactive environment. Default: False n_boot : int, optional Number of bootstrap resamples for seaborn to use to estimate the ci. Default: 1000 ylim : list of 2 floats, optional Minimum and maximum value used for y-axis bounds. If None, ylim is not set. Default: None positions : dictionary, optional Dictionary that maps bundle names to position in plot. Default: vut.POSITIONS plot_subject_lines : bool, optional Whether to plot individual subject lines with a smaller width. Default: True """ if not show_plots: plt.ioff() ba = BrainAxes(positions=positions) for j, bundle in enumerate(tqdm(self.bundles)): other_bundle = list(bundle) if other_bundle[-1] == 'L': other_bundle[-1] = 'R' elif other_bundle[-1] == 'R': other_bundle[-1] = 'L' else: continue other_bundle = "".join(other_bundle) if other_bundle not in self.bundles: continue ci_df = self._contrast_index_df_maker( [bundle, other_bundle], [name], scalar) ba.plot_line( bundle, "nodeID", "diff", ci_df, "ACI", ylim, n_boot, 1.0, {"color": self.color_dict[bundle]}, plot_subject_lines=plot_subject_lines) ba.save_temp_fig( f"contrast_plots/{scalar}/", f"{name}_lateral_contrast_index_{bundle}", self._save_fig) ba.fig.legend([scalar], loc='center', fontsize=vut.medium_font) ba.format() self._save_fig( ba.fig, f"contrast_plots/{scalar}/", f"{name}_lateral_contrast_index") if not show_plots: ba.close_all() if not show_plots: plt.ion() def reliability_plots(self, names=None, scalars=["FA", "MD"], ylims=[0.0, 1.0], show_plots=False, only_plot_above_thr=None, rotate_y_labels=False, rtype="Reliability", positions=vut.POSITIONS, fig_axes=None, prof_axes_dict={}, sub_axes_dict={}): """ Plot the scan-rescan reliability using ICC for 2 scalars. Parameters ---------- names : list of strings, optional Names of datasets to plot profiles of. If None, all datasets are used. Should be a total of only two datasets. Default: None scalars : list of strings, optional Scalars to correlate. Default: ["FA", "MD"]. ylims : 2-tuple of floats, optional Limits of the y-axis. Useful to synchronize axes across graphs. Default: [0.0, 1.0]. show_plots : bool, optional Whether to show plots if in an interactive environment. Default: False only_plot_above_thr : int or None, optional Only plot bundles with intrersubject reliability above this threshold on the final reliability bar plots. If None, plot all. Default: None rotate_y_labels : bool, optional Rotate y labels on final reliability plots. Default: False rtype : str, optional Type of reliability to name the y axis of the reliability bar charts. Default: "Reliability" positions : dictionary, optional Dictionary that maps bundle names to position in plot. Default: vut.POSITIONS fig_axes : tuple of matplotlib figure and axes, optional If not None, the resulting reliability plots will use this figure and axes. Default: None prof_axes_dict : dictionary of axes, optional Plot profile reliability histograms for bundles that are keys of prof_axes_dict on the corresponding axis. Default: {} sub_axes_dict : dictionary of axes, optional Plot subject reliability scatter plots for bundles that are keys of sub_axes_dict on the corresponding axis. Default: {} Returns ------- Returns 8 objects: 1. Matplotlib figure 2. Matplotlib axes 3. A dictionary containing the number of missing bundles for each dataset. 4. A list of bundles with sufficient correlation 5. A pandas dataframe describing the intersubject reliabilities, per bundle 6. A numpy array describing the intersubject reliability errors, per bundle 7. A pandas dataframe desribing the profile reliabilities, per bundle 8. A numpy array desribing the profile reliability errors, per bundle """ if not show_plots: plt.ioff() if names is None: names = list(self.profile_dict.keys()) if len(names) != 2: self.logger.error("To plot correlations, " + "only two dataset names should be given") return None # extract relevant statistics / data from profiles N = len(self.subjects) all_sub_coef = np.zeros((len(scalars), len(self.bundles))) all_sub_coef_err = np.zeros((len(scalars), len(self.bundles), 2)) all_sub_means = np.zeros( (len(scalars), len(self.bundles), 2, N)) all_profile_coef = \ np.zeros((len(scalars), len(self.bundles), N)) all_node_coef = np.zeros( (len(scalars), len(self.bundles), self.prof_len)) miss_counts = pd.DataFrame(0, index=self.bundles, columns=[ f"miss_count{names[0]}", f"miss_count{names[1]}"]) for m, scalar in enumerate(scalars): for k, bundle in enumerate(tqdm(self.bundles)): bundle_profiles =\ np.zeros((2, N, self.prof_len)) for j, name in enumerate(names): for i, subject in enumerate(self.subjects): single_profile = self._get_profile( name, bundle, subject, scalar) if single_profile is None: bundle_profiles[j, i] = np.nan miss_counts.at[bundle, f"miss_count{name}"] =\ miss_counts.at[ bundle, f"miss_count{name}"] + 1 else: bundle_profiles[j, i] = single_profile all_sub_means[m, k] = np.nanmean(bundle_profiles, axis=2) all_sub_coef[m, k], all_sub_coef_err[m, k, 0],\ all_sub_coef_err[m, k, 1] =\ self.masked_corr(all_sub_means[m, k], "Srho") if np.isnan(all_sub_coef[m, k]).all(): self.logger.error(( f"Not enough non-nan profiles" f"for scalar {scalar} for bundle {bundle}")) all_sub_coef[m, k] = 0 bundle_coefs = np.zeros(N) for i in range(N): bundle_coefs[i], _, _ = \ self.masked_corr(bundle_profiles[:, i, :], "ICC") all_profile_coef[m, k] = bundle_coefs node_coefs = np.zeros(self.prof_len) for i in range(self.prof_len): node_coefs[i], _, _ =\ self.masked_corr(bundle_profiles[:, :, i], "ICC") all_node_coef[m, k] = node_coefs # plot histograms of subject ICC maxi = np.nanmax(all_profile_coef) mini = np.nanmin(all_profile_coef) bins = np.linspace(mini, maxi, 10) ba = BrainAxes(positions=positions) for k, bundle in enumerate(self.bundles): ax = ba.get_axis(bundle, axes_dict=prof_axes_dict) for m, scalar in enumerate(scalars): bundle_coefs = all_profile_coef[m, k] bundle_coefs = bundle_coefs[~np.isnan(bundle_coefs)] sns.set(style="whitegrid") sns.histplot( data=bundle_coefs, bins=bins, alpha=0.5, color=self.color_dict[bundle], hatch=self.patterns[m], label=scalar, ax=ax) ax.set_title(display_string(bundle), fontsize=vut.large_font) ax.set_xlabel(self.ICC_func_name, fontsize=vut.medium_font) ax.set_ylabel("Subject count", fontsize=vut.medium_font) ba.temp_fig.legend( display_string(scalars), fontsize=vut.medium_font) ba.save_temp_fig( f"rel_plots/{'_'.join(scalars)}/verbose", (f"{names[0]}_vs_{names[1]}_profile_r_distributions" f"_{bundle}"), self._save_fig) legend_labels = [] for m, _ in enumerate(scalars): legend_labels.append(Patch( facecolor='k', hatch=self.patterns[m])) ba.fig.legend( legend_labels, display_string(scalars), loc='center', fontsize=vut.medium_font) ba.format(disable_x=False) self._save_fig( ba.fig, f"rel_plots/{'_'.join(scalars)}/verbose", f"{names[0]}_vs_{names[1]}_profile_r_distributions") if not show_plots: ba.close_all() # plot node reliability profile all_node_coef[np.isnan(all_node_coef)] = 0 if ylims is None: maxi = all_node_coef.max() mini = all_node_coef.min() else: maxi = ylims[1] mini = ylims[0] ba = BrainAxes(positions=positions) for k, bundle in enumerate(self.bundles): ax = ba.get_axis(bundle) for m, scalar in enumerate(scalars): sns.set(style="whitegrid") sns.lineplot( data=all_node_coef[m, k], label=scalar, color=vut.tableau_20[m * 2], ax=ax, legend=False, ci=None, estimator=None) ax.set_ylim([mini, maxi]) ax.set_title(display_string(bundle), fontsize=vut.large_font) ax.set_ylabel(self.ICC_func_name, fontsize=vut.medium_font) ba.temp_fig.legend( display_string(scalars), fontsize=vut.medium_font) ba.save_temp_fig( f"rel_plots/{'_'.join(scalars)}/verbose", (f"{names[0]}_vs_{names[1]}_node_profiles" f"_{bundle}"), self._save_fig) ba.fig.legend(display_string(scalars), loc='center', fontsize=vut.medium_font) ba.format() self._save_fig( ba.fig, f"rel_plots/{'_'.join(scalars)}/verbose", f"{names[0]}_vs_{names[1]}_node_profiles") if not show_plots: ba.close_all() # plot mean profile scatter plots for m, scalar in enumerate(scalars): maxi = np.nanmax(all_sub_means[m]) mini = np.nanmin(all_sub_means[m]) if len(scalars) == 2: twinning_next = (m == 0) twinning = (m == 1) else: twinning = False twinning_next = False if twinning: ba = BrainAxes(positions=positions, fig=ba.fig) else: ba = BrainAxes(positions=positions) for k, bundle in enumerate(self.bundles): if twinning: fc = 'w' ec = self.color_dict[bundle] else: fc = self.color_dict[bundle] ec = 'w' ax = ba.get_axis(bundle, axes_dict=sub_axes_dict) sns.set(style="whitegrid") if not twinning: ax.plot( [[0, 0], [1, 1]], [[0, 0], [1, 1]], '--', color='red') ax.scatter( all_sub_means[m, k, 0], all_sub_means[m, k, 1], label=scalar, marker=self.scalar_markers[m - twinning], facecolors=fc, edgecolors=ec, s=vut.marker_size, linewidth=1) if twinning or twinning_next: twinning_color = 'k' if twinning_next: ax.spines['bottom'].set_color(twinning_color) ax.spines['left'].set_color(twinning_color) else: ax.spines['top'].set_color(twinning_color) ax.spines['right'].set_color(twinning_color) ax.xaxis.label.set_color(twinning_color) ax.tick_params(axis='x', colors=twinning_color) ax.xaxis.label.set_color(twinning_color) ax.yaxis.label.set_color(twinning_color) ax.tick_params(axis='y', colors=twinning_color) ax.yaxis.label.set_color(twinning_color) if not twinning: ax.set_title( display_string(bundle), fontsize=vut.large_font) ax.set_xlabel(names[0], fontsize=vut.medium_font) ax.set_ylabel(names[1], fontsize=vut.medium_font) ax.set_ylim([mini, maxi]) ax.set_xlim([mini, maxi]) ba.temp_fig.legend( [scalar], fontsize=vut.medium_font) ba.save_temp_fig( f"rel_plots/{'_'.join(scalars)}/verbose", (f"{names[0]}_vs_{names[1]}_{scalar}_mean_profiles" f"_{bundle}"), self._save_fig) if twinning: legend_labels = [ Line2D( [], [], markerfacecolor='k', markeredgecolor='w', marker=self.scalar_markers[0], linewidth=0, markersize=15), Line2D( [], [], markeredgecolor='k', markerfacecolor='w', marker=self.scalar_markers[0], linewidth=0, markersize=15)] ba.fig.legend( legend_labels, display_string(scalars), loc='center', fontsize=vut.medium_font) elif not twinning_next: ba.fig.legend([scalar], loc='center', fontsize=vut.medium_font) self._save_fig( ba.fig, f"rel_plots/{'_'.join(scalars)}/verbose", f"{names[0]}_vs_{names[1]}_{scalar}_mean_profiles") ba.format(disable_x=False) if not (show_plots or twinning_next): ba.close_all() # plot bar plots of ICC if fig_axes is None: fig, axes = plt.subplots(2, 1) fig.set_size_inches((8, 8)) else: fig = fig_axes[0] axes = fig_axes[1] bundle_prof_means = np.nanmean(all_profile_coef, axis=2) bundle_prof_stds = 1.95 * \ sem(all_profile_coef, axis=2, nan_policy='omit') if ylims is None: maxi = np.maximum(bundle_prof_means.max(), all_sub_coef.max()) mini = np.minimum(bundle_prof_means.min(), all_sub_coef.min()) else: maxi = ylims[1] mini = ylims[0] if only_plot_above_thr is not None: is_removed_bundle =\ np.logical_not( np.logical_and( np.all(all_sub_coef > only_plot_above_thr, axis=0), np.all(bundle_prof_means > only_plot_above_thr, axis=0))) removal_idx = np.where(is_removed_bundle)[0] bundle_prof_means_removed = np.delete( bundle_prof_means, removal_idx, axis=1) bundle_prof_stds_removed = np.delete( bundle_prof_stds, removal_idx, axis=1) all_sub_coef_removed = np.delete( all_sub_coef, removal_idx, axis=1) all_sub_coef_err_removed = np.delete( all_sub_coef_err, removal_idx, axis=1) else: is_removed_bundle = [False] * len(self.bundles) bundle_prof_means_removed = bundle_prof_means bundle_prof_stds_removed = bundle_prof_stds all_sub_coef_err_removed = all_sub_coef_err all_sub_coef_removed = all_sub_coef updated_bundles = [] for k, bundle in enumerate(self.bundles): if not is_removed_bundle[k]: if bundle == "CC_ForcepsMinor": updated_bundles.append("CC_FMi") else: updated_bundles.append(bundle) updated_bundles.append("median") sns.set(style="whitegrid") width = 0.6 spacing = 1.5 x = np.arange(len(updated_bundles)) * spacing x_shift = np.linspace(-0.5 * width, 0.5 * width, num=len(scalars)) bundle_prof_means_removed = np.pad( bundle_prof_means_removed, [(0, 0), (0, 1)]) bundle_prof_stds_removed = np.pad( bundle_prof_stds_removed, [(0, 0), (0, 1)]) all_sub_coef_removed = np.pad( all_sub_coef_removed, [(0, 0), (0, 1)]) all_sub_coef_err_removed = np.transpose(np.pad( all_sub_coef_err_removed, [(0, 0), (0, 1), (0, 0)])) for m, scalar in enumerate(scalars): bundle_prof_means_removed[m, -1] = np.median( bundle_prof_means_removed[m, :-1]) all_sub_coef_removed[m, -1] = np.median( all_sub_coef_removed[m, :-1]) # This code can be used as a baseline to make violin plots # # mask = ~np.isnan(all_profile_coef[m].T) # all_profile_coef_m_removed =\ # [d[k] for d, k in zip(all_profile_coef[m], mask.T)] # vl_parts = axes[0].violinplot( # all_profile_coef_m_removed, # positions=x[:-1] + x_shift[m], # showmedians=True # ) # color_list = list(self.color_dict.values()) # for c, pc in enumerate(vl_parts['bodies']): # pc.set_facecolor(color_list[c]) # pc.set_edgecolor(color_list[c]) # pc.set_alpha(1) # pc.set_hatch(self.patterns[m]) # vl_parts['cbars'].set_color(color_list) axes[0].bar( x + x_shift[m], bundle_prof_means_removed[m], width, label=scalar, yerr=bundle_prof_stds_removed[m], hatch=self.patterns[m], color=self.color_dict.values()) axes[1].bar( x + x_shift[m], all_sub_coef_removed[m], width, label=scalar, yerr=all_sub_coef_err_removed[:, :, m], hatch=self.patterns[m], color=self.color_dict.values()) if len(updated_bundles) > 20: xaxis_font_size = vut.small_font - 6 else: xaxis_font_size = vut.small_font axes[0].set_title("A", fontsize=vut.large_font) axes[0].set_ylabel(f'Profile {rtype}', fontsize=vut.medium_font) axes[0].set_ylim([mini, maxi]) axes[0].set_xlabel("") axes[0].set_yticklabels( [0, 0.2, 0.4, 0.6, 0.8, 1.0], fontsize=vut.small_font - 8) axes[0].set_xticks(x + 0.5) axes[0].set_xticklabels( display_string(updated_bundles), fontsize=xaxis_font_size) axes[1].set_title("B", fontsize=vut.large_font) axes[1].set_ylabel(f'Subject {rtype}', fontsize=vut.medium_font) axes[1].set_ylim([mini, maxi]) axes[1].set_xlabel("") axes[1].set_yticklabels( [0, 0.2, 0.4, 0.6, 0.8, 1.0], fontsize=vut.small_font - 8) axes[1].set_xticks(x + 0.5) axes[1].set_xticklabels( display_string(updated_bundles), fontsize=xaxis_font_size) plt.setp(axes[0].get_xticklabels(), rotation=65, horizontalalignment='right') plt.setp(axes[1].get_xticklabels(), rotation=65, horizontalalignment='right') if rotate_y_labels: plt.setp(axes[0].get_yticklabels(), rotation=90) plt.setp(axes[1].get_yticklabels(), rotation=90) fig.tight_layout() legend_labels = [] for m, _ in enumerate(scalars): legend_labels.append(Patch( facecolor='k', hatch=self.patterns[m])) fig.legend( legend_labels, display_string(scalars), fontsize=vut.small_font, bbox_to_anchor=(1.25, 0.5)) self._save_fig( fig, f"rel_plots/{'_'.join(scalars)}", f"{names[0]}_vs_{names[1]}") if not show_plots: plt.close(fig) plt.ion() return fig, axes, miss_counts, updated_bundles,\ all_sub_coef_removed, all_sub_coef_err_removed,\ bundle_prof_means_removed, bundle_prof_stds_removed def compare_reliability(self, reliability1, reliability2, analysis_label1, analysis_label2, bundles, errors1=None, errors2=None, scalars=["FA", "MD"], rtype="Subject Reliability", show_plots=False, show_legend=True, fig_ax=None): """ Plot a comparison of scan-rescan reliability between two analyses. Parameters ---------- reliability1, reliability2 : numpy arrays numpy arrays of reliabilities. Typically, each of this will be outputs of separate calls to reliability_plots. analysis_label1, analysis_label2 : Strings Names of the analyses used to obtain each dataset. Used to label the x and y axes. bundles : list of str List of bundles that correspond to the second dimension of the reliability arrays. errors1, errors2 : numpy arrays or None Numpy arrays describing the errors. Typically, each of this will be outputs of separate calls to reliability_plots. If None, errors are not shown. Default is None. scalars : list of str, optional Lsit of scalars that correspond to the first dimension of the reliability arrays. Default: ["FA", "MD"] rtype : str type of reliability. Can be any string; used in x axis lavel. Default: Subject Reliability show_plots : bool, optional Whether to show plots if in an interactive environment. Default: False show_legend : bool, optional Show legend for the plot, off to the right hand side. Default: True fig_ax : tuple of matplotlib figure and axis, optional If not None, the resulting reliability plots will use this figure and axis. Default: None Returns ------- Returns a Matplotlib figure and axes. """ show_error = ((errors1 is not None) and (errors2 is not None)) if fig_ax is None: fig, ax = plt.subplots() else: fig = fig_ax[0] ax = fig_ax[1] legend_labels = [] for i, scalar in enumerate(scalars): marker = self.scalar_markers[i] if marker == "x": marker = marker.upper() for j, bundle in enumerate(bundles): ax.scatter( reliability1[i, j], reliability2[i, j], s=vut.marker_size, c=[self.color_dict[bundle]], marker=marker ) if show_error: if len(errors1.shape) > 2: xerr = errors1[:, j, i].reshape((2, 1)) else: xerr = errors1[i, j] if len(errors2.shape) > 2: yerr = errors2[:, j, i].reshape((2, 1)) else: yerr = errors2[i, j] ax.errorbar( reliability1[i, j], reliability2[i, j], xerr=xerr, yerr=yerr, c=[self.color_dict[bundle]], alpha=0.5, fmt="none" ) if i == 0: legend_labels.append(Patch( facecolor=self.color_dict[bundle], label=bundle)) legend_labels.append(Line2D( [0], [0], marker=marker, color='k', lw=0, markersize=10, label=scalar)) ax.set_xlabel(f"{analysis_label1} {rtype}", fontsize=vut.medium_font) ax.set_ylabel(f"{analysis_label2} {rtype}", fontsize=vut.medium_font) ax.tick_params( axis='x', which='major', labelsize=vut.medium_font) ax.tick_params( axis='y', which='major', labelsize=vut.medium_font) ax.set_ylim(0.2, 1) ax.set_xlim(0.2, 1) ax.plot([[0, 0], [1, 1]], [[0, 0], [1, 1]], '--', color='red') legend_labels.append(Line2D( [0], [0], linewidth=3, linestyle='--', color='red', label='X=Y')) if show_legend: fig.legend( handles=legend_labels, fontsize=vut.small_font - 6, bbox_to_anchor=(1.5, 2.0)) fig.tight_layout() return fig, ax
[docs]def visualize_gif_inline(fname, use_s3fs=False): """Display a gif inline, possible from s3fs """ if use_s3fs: import s3fs fs = s3fs.S3FileSystem() tdir = tempfile.gettempdir() fname_remote = fname fname = op.join(tdir, "fig.gif") fs.get(fname_remote, fname) display.display(display.Image(fname))
def show_anatomical_slices(img_data, title): """ display anatomical slices from midpoint based on: https://nipy.org/nibabel/coordinate_systems.html """ axial_slice = img_data[:, :, int(img_data.shape[2] / 2)] coronal_slice = img_data[:, int(img_data.shape[1] / 2), :] sagittal_slice = img_data[int(img_data.shape[0] / 2), :, :] fig = plt.figure(constrained_layout=False) gs = fig.add_gridspec(nrows=3, ncols=2, wspace=0.01, hspace=0.01) ax1 = fig.add_subplot(gs[:-1, :]) ax1.imshow(axial_slice.T, cmap="gray", origin="lower") ax1.axis('off') ax2 = fig.add_subplot(gs[2, 0]) ax2.imshow(coronal_slice.T, cmap="gray", origin="lower") ax2.axis('off') ax3 = fig.add_subplot(gs[2, 1]) ax3.imshow(sagittal_slice.T, cmap="gray", origin="lower") ax3.axis('off') plt.suptitle(title) plt.show()