Source code for allensdk.internal.ephys.plot_qc_figures

import matplotlib

matplotlib.use('agg')

import logging

import allensdk.internal.core.lims_utilities as lims_utilities
import allensdk.core.json_utilities as json_utilities

from allensdk.core.nwb_data_set import NwbDataSet
import allensdk.ephys.ephys_features as ft
from allensdk.ephys.extract_cell_features import get_square_stim_characteristics, get_ramp_stim_characteristics, get_stim_characteristics

import sys
import argparse
import os
import json
import h5py
import numpy as np
from six import iteritems

from scipy.optimize import curve_fit
import scipy.signal as sg
import scipy.misc

import datetime
import matplotlib.pyplot as plt
#import seaborn as sns

AXIS_Y_RANGE = [ -110, 60 ]

[docs]def get_time_string(): return datetime.datetime.now().strftime("%I:%M%p %B %d, %Y")
[docs]def get_spikes(sweep_features, sweep_number): return get_features(sweep_features, sweep_number)["spikes"]
[docs]def get_features(sweep_features, sweep_number): try: return sweep_features[int(sweep_number)] except KeyError: return sweep_features[str(sweep_number)]
[docs]def load_experiment(file_name, sweep_number): ds = NwbDataSet(file_name) sweep = ds.get_sweep(sweep_number) r = sweep['index_range'] v = sweep['response'] * 1e3 i = sweep['stimulus'] * 1e12 dt = 1.0 / sweep['sampling_rate'] t = np.arange(0, len(v)) * dt return (v, i, t, r, dt)
[docs]def plot_single_ap_values(nwb_file, sweep_numbers, lims_features, sweep_features, cell_features, type_name): figs = [ plt.figure() for f in range(3+len(sweep_numbers)) ] v, i, t, r, dt = load_experiment(nwb_file, sweep_numbers[0]) if type_name == "short_square" or type_name == "long_square": stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) elif type_name == "ramp": stim_start, start_idx = get_ramp_stim_characteristics(i, t) gen_features = ["threshold", "peak", "trough", "fast_trough", "slow_trough"] voltage_features = ["threshold_v", "peak_v", "trough_v", "fast_trough_v", "slow_trough_v"] time_features = ["threshold_t", "peak_t", "trough_t", "fast_trough_t", "slow_trough_t"] for sn in sweep_numbers: spikes = get_spikes(sweep_features, sn) if (len(spikes) < 1): logging.warning("no spikes in sweep %d" % sn) continue if type_name != "long_square": voltages = [spikes[0][f] for f in voltage_features] times = [spikes[0][f] for f in time_features] else: rheo_sn = cell_features["long_squares"]["rheobase_sweep"]["id"] rheo_spike = get_spikes(sweep_features, rheo_sn)[0] voltages = [ rheo_spike[f] for f in voltage_features] times = [ rheo_spike[f] for f in time_features] plt.figure(figs[0].number) plt.scatter(range(len(voltages)), voltages, color='gray') plt.tight_layout() plt.figure(figs[1].number) plt.scatter(range(len(times)), times, color='gray') plt.tight_layout() plt.figure(figs[2].number) plt.scatter([0], [spikes[0]['upstroke'] / (-spikes[0]['downstroke'])], color='gray') plt.tight_layout() plt.figure(figs[0].number) yvals = [float(lims_features[k + "_v_" + type_name]) for k in gen_features if lims_features[k + "_v_" + type_name] is not None] xvals = range(len(yvals)) plt.scatter(xvals, yvals, color='blue', marker='_', s=40, zorder=100) plt.xticks(xvals, ['thr', 'pk', 'tr', 'ftr', 'str']) plt.title(type_name + ": voltages") plt.figure(figs[1].number) yvals = [float(lims_features[k + "_t_" + type_name]) for k in gen_features if lims_features[k + "_t_" + type_name] is not None] xvals = range(len(yvals)) plt.scatter(xvals, yvals, color='blue', marker='_', s=40, zorder=100) plt.xticks(xvals, ['thr', 'pk', 'tr', 'ftr', 'str']) plt.title(type_name + ": times") plt.figure(figs[2].number) if lims_features["upstroke_downstroke_ratio_" + type_name] is not None: plt.scatter([0], [float(lims_features["upstroke_downstroke_ratio_" + type_name])], color='blue', marker='_', s=40, zorder=100) plt.xticks([]) plt.title(type_name + ": up/down") for index, sn in enumerate(sweep_numbers): plt.figure(figs[3 + index].number) v, i, t, r, dt = load_experiment(nwb_file, sn) plt.plot(t, v, color='black') plt.title(str(sn)) spikes = get_spikes(sweep_features, sn) nspikes = len(spikes) if type_name != "long_square" and nspikes: if nspikes == 0: logging.warning("no spikes in sweep %d" % sn) continue voltages = [spikes[0][f] for f in voltage_features] times = [spikes[0][f] for f in time_features] else: rheo_sn = cell_features["long_squares"]["rheobase_sweep"]["id"] rheo_spike = get_spikes(sweep_features, rheo_sn)[0] voltages = [ rheo_spike[f] for f in voltage_features ] times = [ rheo_spike[f] for f in time_features ] plt.scatter(times, voltages, color='red', zorder=20) delta_v = 5.0 if nspikes: plt.plot([spikes[0]['upstroke_t'] - 1e-3 * (delta_v / spikes[0]['upstroke']), spikes[0]['upstroke_t'] + 1e-3 * (delta_v / spikes[0]['upstroke'])], [spikes[0]['upstroke_v'] - delta_v, spikes[0]['upstroke_v'] + delta_v], color='red') if 'downstroke_t' in spikes[0]: plt.plot([spikes[0]['downstroke_t'] - 1e-3 * (delta_v / spikes[0]['downstroke']), spikes[0]['downstroke_t'] + 1e-3 * (delta_v / spikes[0]['downstroke'])], [spikes[0]['downstroke_v'] - delta_v, spikes[0]['downstroke_v'] + delta_v], color='red') else: logging.warning("spike has no downstroke time, clipped") if type_name == "ramp": if nspikes: plt.xlim(spikes[0]["threshold_t"] - 0.002, spikes[0]["fast_trough_t"] + 0.01) elif type_name == "short_square": plt.xlim(stim_start - 0.002, stim_start + stim_dur + 0.01) elif type_name == "long_square": plt.xlim(times[0]- 0.002, times[-2] + 0.002) plt.tight_layout() return figs
[docs]def plot_sweep_figures(nwb_file, ephys_roi_result, image_dir, sizes): sweeps = ephys_roi_result["specimens"][0]["ephys_sweeps"] vclamp_sweep_numbers = sorted([ s['sweep_number'] for s in sweeps if s['stimulus_units'] == 'Amps' or s['stimulus_units'] == 'pA' ]) image_file_sets = {} tp_set = [] exp_set = [] prev_sweep_number = None tp_len = 0.035 tp_steps = int(tp_len * 200000) b, a = sg.bessel(4, 0.1, "low") for i, sweep_number in enumerate(vclamp_sweep_numbers): logging.info("plotting sweep %d" % sweep_number) if i == 0: v_init, i_init, t_init, r_init, dt_init = load_experiment(nwb_file, sweep_number) tp_fig = plt.figure() axTP = plt.gca() axTP.set_yticklabels([]) axTP.set_xticklabels([]) axTP.set_xlabel(str(sweep_number)) axTP.set_ylabel('') xTP = t_init[0:tp_steps] yTP = v_init[0:tp_steps] axTP.plot(xTP, yTP, linewidth=1) axTP.set_xlim(0, tp_len) # sns.despine() exp_fig = plt.figure() axDP = plt.gca() axDP.set_yticklabels([]) axDP.set_xticklabels([]) axDP.set_xlabel(str(sweep_number)) axDP.set_ylabel('') v_exp = v_init[r_init[0]:] t_exp = t_init[r_init[0]:] yDP = sg.filtfilt(b, a, v_exp, axis=0) xDP = t_exp baseline = yDP[5000:9000] baselineMean = np.mean(baseline) baselineV = (np.ones(len(xDP))) * baselineMean axDP.plot(xDP, yDP, linewidth=1) axDP.plot(xDP, baselineV, linewidth=1) axDP.set_xlim(t_exp[0], t_exp[-1]) # sns.despine() v_prev, i_prev, t_prev, r_prev = v_init, i_init, t_init, r_init else: v, i, t, r, dt = load_experiment(nwb_file, sweep_number) tp_fig = plt.figure() axTP = plt.gca() axTP.set_yticklabels([]) axTP.set_xticklabels([]) axTP.set_xlabel(str(sweep_number)) axTP.set_ylabel('') yTP = v[:tp_steps] xTP = t[:tp_steps] TPBL = np.mean(yTP[0:100]) yTPN = yTP - TPBL yTPp = v_prev[:tp_steps] TPpBL = np.mean(yTPp[0:100]) yTPpN = yTPp - TPpBL yTPi = v_init[:tp_steps] TPiBL = np.mean(yTPi[0:100]) yTPiN = yTPi - TPiBL axTP.plot(xTP, yTPiN, linewidth=1) axTP.plot(xTP, yTPpN, linewidth=1) axTP.plot(xTP, yTPN, linewidth=1) axTP.set_xlim(0, tp_len) # sns.despine() exp_fig = plt.figure() axDP = plt.gca() axDP.set_yticklabels([]) axDP.set_xticklabels([]) axDP.set_xlabel(str(sweep_number)) axDP.set_ylabel('') v_exp = v[r[0]:] t_exp = t[r[0]:] yDP = sg.filtfilt(b, a, v_exp, axis=0) xDP = t_exp baseline = yDP[5000:9000] baselineMean = np.mean(baseline) baselineV = (np.ones(len(xDP))) * baselineMean axDP.plot(xDP, yDP, linewidth=1) axDP.plot(xDP, baselineV, linewidth=1) axDP.set_xlim(t_exp[0], t_exp[-1]) # sns.despine() v_prev, i_prev, t_prev, r_prev = v, i, t, r prev_sweep_number = sweep_number save_figure(tp_fig, 'test_pulse_%d' % sweep_number, 'test_pulses', image_dir, sizes, image_file_sets) save_figure(exp_fig, 'experiment_%d' % sweep_number, 'experiments', image_dir, sizes, image_file_sets) return image_file_sets
[docs]def save_figure(fig, image_name, image_set_name, image_dir, sizes, image_sets, scalew=1, scaleh=1, ext='jpg'): plt.figure(fig.number) if image_set_name not in image_sets: image_sets[image_set_name] = { size_name: [] for size_name in sizes } for size_name, size in iteritems(sizes): fig.set_size_inches(size*scalew, size*scaleh) image_file = os.path.join(image_dir, "%s_%s.%s" % (image_name, size_name, ext)) plt.savefig(image_file, bbox_inches="tight") image_sets[image_set_name][size_name].append(image_file) plt.close()
[docs]def plot_images(ephys_roi_result, image_dir, sizes, image_sets): wkfs = [ f for f in ephys_roi_result['well_known_files'] if f['filename'].endswith('tif') ] paths = [ os.path.join(f['storage_directory'], f['filename']) for f in wkfs ] paths = [ lims_utilities.safe_system_path(p) for p in paths ] image_set_name = "images" image_sets[image_set_name] = { size_name: [] for size_name in sizes } for i, path in enumerate(paths): image_data = plt.imread(path) image_data = np.array(image_data, dtype=np.float32) vmin = image_data.min() vmax = image_data.max() image_data = np.array((image_data - vmin) / (vmax - vmin) * 255.0, dtype=np.uint8) for size_name, size in iteritems(sizes): if size: s = image_data.shape skip = int(s[0] / size) sdata = image_data[::skip, ::skip] else: sdata = image_data filename = os.path.join(image_dir, "image_%d_%s.jpg" % (i, size_name)) scipy.misc.imsave(filename, sdata) image_sets['images'][size_name].append(filename)
[docs]def plot_subthreshold_long_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): lsq_sweeps = cell_features["long_squares"]["sweeps"] sub_sweeps = cell_features["long_squares"]["subthreshold_sweeps"] tau_sweeps = cell_features["long_squares"]["subthreshold_membrane_property_sweeps"] # 0a - Plot VI curve and linear fit, along with vrest x = np.array([ s['stim_amp'] for s in sub_sweeps ]) y = np.array([ s['peak_deflect'][0] for s in sub_sweeps ]) i = np.array([ s['stim_amp'] for s in tau_sweeps ]) fig = plt.figure() plt.scatter(x, y, color='black') plt.plot([x.min(), x.max()], [lims_features["vrest"], lims_features["vrest"]], color="blue", linewidth=2) plt.plot(i, i * 1e-3 * lims_features["ri"] + lims_features["vrest"], color="red", linewidth=2) plt.xlabel("pA") plt.ylabel("mV") plt.title("ri = {:.1f}, vrest = {:.1f}".format(lims_features["ri"], lims_features["vrest"])) plt.tight_layout() save_figure(fig, 'VI_curve', 'subthreshold_long_squares', image_dir, sizes, cell_image_files) # 0b - Plot tau curve and average fig = plt.figure() x = np.array([ s['stim_amp'] for s in tau_sweeps ]) y = np.array([ s['tau'] for s in tau_sweeps ]) plt.scatter(x, y, color='black') i = np.array([ s['stim_amp'] for s in tau_sweeps ]) plt.plot([i.min(), i.max()], [cell_features["long_squares"]["tau"], cell_features["long_squares"]["tau"]], color="red", linewidth=2) plt.xlabel("pA") ylim = plt.ylim() plt.ylim(0, ylim[1]) plt.ylabel("tau (s)") plt.tight_layout() save_figure(fig, 'tau_curve', 'subthreshold_long_squares', image_dir, sizes, cell_image_files) subthresh_dict = {s['id']:s for s in tau_sweeps} # 0c - Plot the subthreshold squares tau_sweeps = [ s['id'] for s in tau_sweeps ] tau_figs = [ plt.figure() for i in range(len(tau_sweeps)) ] for index, s in enumerate(tau_sweeps): v, i, t, r, dt = load_experiment(nwb_file, s) plt.figure(tau_figs[index].number) plt.plot(t, v, color="black") if index == 0: min_y, max_y = plt.ylim() else: ylims = plt.ylim() if min_y > ylims[0]: min_y = ylims[0] if max_y < ylims[1]: max_y = ylims[1] stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) plt.xlim(stim_start - 0.05, stim_start + stim_dur + 0.05) peak_idx = subthresh_dict[s]['peak_deflect'][1] peak_t = peak_idx*dt plt.scatter([peak_t], [subthresh_dict[s]['peak_deflect'][0]], color='red', zorder=10) popt = ft.fit_membrane_time_constant(v, t, stim_start, peak_t) plt.title(str(s)) plt.plot(t[start_idx:peak_idx], exp_curve(t[start_idx:peak_idx] - t[start_idx], *popt), color='blue') for index, s in enumerate(tau_sweeps): plt.figure(tau_figs[index].number) plt.ylim(min_y, max_y) plt.tight_layout() for index, tau_fig in enumerate(tau_figs): save_figure(tau_figs[index], 'tau_%d' % index, 'subthreshold_long_squares', image_dir, sizes, cell_image_files)
[docs]def plot_short_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): repeat_amp = cell_features["short_squares"].get("stimulus_amplitude", None) if repeat_amp is not None: short_square_sweep_nums = [ s['id'] for s in cell_features["short_squares"]["common_amp_sweeps"] ] figs = plot_single_ap_values(nwb_file, short_square_sweep_nums, lims_features, sweep_features, cell_features, "short_square") for index, fig in enumerate(figs): save_figure(fig, 'short_squares_%d' % index, 'short_squares', image_dir, sizes, cell_image_files) fig = plot_instantaneous_threshold_thumbnail(nwb_file, short_square_sweep_nums, cell_features, lims_features, sweep_features) save_figure(fig, 'instantaneous_threshold_thumbnail', 'short_squares', image_dir, sizes, cell_image_files) else: logging.warning("No short square figures to plot.")
[docs]def plot_instantaneous_threshold_thumbnail(nwb_file, sweep_numbers, cell_features, lims_features, sweep_features, color='red'): min_sweep_number = None for sn in sorted(sweep_numbers): spikes = get_spikes(sweep_features, sn) if len(spikes) > 0: min_sweep_number = sn if min_sweep_number is None else min(min_sweep_number, sn) fig = plt.figure(frameon=False) ax = plt.Axes(fig, [0., 0., 1., 1.]) ax.set_axis_off() fig.add_axes(ax) ax.set_yticklabels([]) ax.set_xticklabels([]) ax.set_xlabel('') ax.set_ylabel('') v, i, t, r, dt = load_experiment(nwb_file, sn) stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) tstart = stim_start - 0.002 tend = stim_start + stim_dur + 0.005 tscale = 0.005 plt.plot(t, v, linewidth=1, color=color) plt.ylim(AXIS_Y_RANGE[0], AXIS_Y_RANGE[1]) plt.xlim(tstart, tend) return fig
[docs]def plot_ramp_figures(nwb_file, cell_specimen, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): sweeps = cell_specimen['ephys_sweeps'] ramps_sweeps = [ s["sweep_number"] for s in sweeps if s["workflow_state"].endswith("passed") and s["ephys_stimulus"]["description"][:10] == "C1RP25PR1S"] figs = [] if len(ramps_sweeps) > 0: figs = plot_single_ap_values(nwb_file, ramps_sweeps, lims_features, sweep_features, cell_features, "ramp") for index, fig in enumerate(figs): save_figure(fig, 'ramps_%d' % index, 'ramps', image_dir, sizes, cell_image_files)
[docs]def plot_rheo_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): rheo_sweeps = [ lims_features["rheobase_sweep_num"] ] figs = plot_single_ap_values(nwb_file, rheo_sweeps, lims_features, sweep_features, cell_features, "long_square") for index, fig in enumerate(figs): save_figure(fig, 'rheo_%d' % index, 'rheo', image_dir, sizes, cell_image_files)
[docs]def plot_hero_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): fig = plt.figure() v, i, t, r, dt = load_experiment(nwb_file, int(lims_features["thumbnail_sweep_num"])) plt.plot(t, v, color='black') stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) plt.xlim(stim_start - 0.05, stim_start + stim_dur + 0.05) plt.ylim(-110, 50) spike_times = [spk['threshold_t'] for spk in get_spikes(sweep_features, lims_features["thumbnail_sweep_num"])] isis = np.diff(np.array(spike_times)) plt.title("thumbnail {:d}, amp = {:.1f}".format(lims_features["thumbnail_sweep_num"], stim_amp)) plt.tight_layout() save_figure(fig, 'thumbnail_0', 'thumbnail', image_dir, sizes, cell_image_files, scalew=2) fig = plt.figure() plt.plot(range(len(isis)), isis) plt.ylabel("ISI (ms)") if lims_features.get("adaptation", None) is not None: plt.title("adapt = {:.3g}".format(lims_features["adaptation"])) else: plt.title("adapt = not defined") for k in ["has_delay", "has_burst", "has_pause"]: if lims_features.get(k, None) is None: lims_features[k] = False plt.tight_layout() save_figure(fig, 'thumbnail_1', 'thumbnail', image_dir, sizes, cell_image_files) yvals = [ float(lims_features["has_delay"]), float(lims_features["has_burst"]), float(lims_features["has_pause"]), ] xvals = range(len(yvals)) fig = plt.figure() plt.scatter(xvals, yvals, color='red') plt.xticks(xvals, ['Delay', 'Burst', 'Pause']) plt.title("flags") plt.tight_layout() save_figure(fig, 'thumbnail_2', 'thumbnail', image_dir, sizes, cell_image_files) summary_fig = plot_long_square_summary(nwb_file, cell_features, lims_features, sweep_features) save_figure(summary_fig, 'ephys_summary', 'thumbnail', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_long_square_summary(nwb_file, cell_features, lims_features, sweep_features): long_square_sweeps = cell_features['long_squares']['sweeps'] long_square_sweep_numbers = [ int(s['id']) for s in long_square_sweeps ] thumbnail_summary_fig = plot_sweep_set_summary(nwb_file, int(lims_features['thumbnail_sweep_num']), long_square_sweep_numbers) plt.figure(thumbnail_summary_fig.number) return thumbnail_summary_fig
[docs]def plot_fi_curve_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): fig = plt.figure() fi_sorted = sorted(cell_features["long_squares"]["spiking_sweeps"], key=lambda s:s['stim_amp']) x = [d['stim_amp'] for d in fi_sorted] y = [d['avg_rate'] for d in fi_sorted] last_zero_idx = np.nonzero(y)[0][0] - 1 plt.scatter(x, y, color='black') plt.plot(x[last_zero_idx:], cell_features["long_squares"]["fi_fit_slope"] * (np.array(x[last_zero_idx:]) - x[last_zero_idx]), color='red') plt.xlabel("pA") plt.ylabel("spikes/sec") plt.title("slope = {:.3g}".format(lims_features["f_i_curve_slope"])) rheo_hero_sweeps = [int(lims_features["rheobase_sweep_num"]), int(lims_features["thumbnail_sweep_num"])] rheo_hero_x = [] for s in rheo_hero_sweeps: v, i, t, r, dt = load_experiment(nwb_file, s) stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) rheo_hero_x.append(stim_amp) rheo_hero_y = [ len(get_spikes(sweep_features, s)) for s in rheo_hero_sweeps ] plt.scatter(rheo_hero_x, rheo_hero_y, zorder=20) plt.tight_layout() save_figure(fig, 'fi_curve', 'fi_curve', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_sag_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files): fig = plt.figure() for d in cell_features["long_squares"]["subthreshold_sweeps"]: if d['peak_deflect'][0] == lims_features["vm_for_sag"]: v, i, t, r, dt = load_experiment(nwb_file, int(d['id'])) stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) plt.plot(t, v, color='black') plt.scatter(d['peak_deflect'][1], d['peak_deflect'][0], color='red', zorder=10) #plt.plot([stim_start + stim_dur - 0.1, stim_start + stim_dur], [d['steady'], d['steady']], color='red', zorder=10) plt.xlim(stim_start - 0.25, stim_start + stim_dur + 0.25) plt.title("sag = {:.3g}".format(lims_features['sag'])) plt.tight_layout() save_figure(fig, 'sag', 'sag', image_dir, sizes, cell_image_files, scalew=2)
[docs]def mask_nulls(data): data[0, np.equal(data[0,:], None) | np.equal(data[0,:],0)] = np.nan
[docs]def plot_sweep_value_figures(cell_specimen, image_dir, sizes, cell_image_files): sweeps = sorted(cell_specimen['ephys_sweeps'], key=lambda s: s['sweep_number'] ) # plot bridge balance data = np.array([ [ s['bridge_balance_mohm'], s['sweep_number'] ] for s in sweeps ]).T mask_nulls(data) fig = plt.figure() plt.title('bridge balance') plt.plot(data[1,:], data[0,:], marker='.') save_figure(fig, 'bridge_balance', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2) # plot pre_vm_mv, no blowout sweep data = np.array([ [ s['pre_vm_mv'], s['sweep_number'] ] for s in sweeps if not s['ephys_stimulus']['description'].startswith('EXTPBLWOUT')]).T mask_nulls(data) fig = plt.figure() plt.title('pre vm') plt.plot(data[1,:], data[0,:], marker='.') save_figure(fig, 'pre_vm_mv', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2) # plot bias current data = np.array([ [ s['leak_pa'], s['sweep_number'] ] for s in sweeps ]).T mask_nulls(data) fig = plt.figure() plt.title('leak') plt.plot(data[1,:], data[0,:], marker='.') save_figure(fig, 'leak', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_cell_figures(nwb_file, ephys_roi_result, image_dir, sizes): cell_image_files = {} plt.style.use('ggplot') cell_specimen = ephys_roi_result["specimens"][0] cell_features = cell_specimen["cell_ephys_features"] lims_features = cell_specimen["ephys_features"][0] sweep_features = cell_specimen["sweep_ephys_features"] logging.info("saving sweep feature figures") plot_sweep_value_figures(cell_specimen, image_dir, sizes, cell_image_files) logging.info("saving tau and vi figs") plot_subthreshold_long_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving short square figs") plot_short_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving ramps") plot_ramp_figures(nwb_file, cell_specimen, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving rheo figs") plot_rheo_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving thumbnail figs") plot_hero_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving fi curve figs") plot_fi_curve_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) logging.info("saving sag figs") plot_sag_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files) return cell_image_files
[docs]def plot_sweep_set_summary(nwb_file, highlight_sweep_number, sweep_numbers, highlight_color='#0779BE', background_color='#dddddd'): fig = plt.figure(frameon=False) ax = plt.Axes(fig, [0., 0., 1., 1.]) ax.set_axis_off() fig.add_axes(ax) ax.set_yticklabels([]) ax.set_xticklabels([]) ax.set_xlabel('') ax.set_ylabel('') for sn in sweep_numbers: v, i, t, r, dt = load_experiment(nwb_file, sn) ax.plot(t, v, linewidth=0.5, color=background_color) v, i, t, r, dt = load_experiment(nwb_file, highlight_sweep_number) plt.plot(t, v, linewidth=1, color=highlight_color) stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t) tstart = stim_start - 0.05 tend = stim_start + stim_dur + 0.25 ax.set_ylim(AXIS_Y_RANGE[0], AXIS_Y_RANGE[1]) ax.set_xlim(tstart, tend) return fig
[docs]def make_sweep_html(sweep_files, file_name): html = "<html><body>" html += "<a href='index.html'>Cell QC Figures</a>" html += "<p>page created at: %s</p>" % get_time_string() html += "<div style='position:absolute;width:50%;left:0;top:40'>" if 'test_pulses' in sweep_files: for small_img, large_img in zip(sweep_files['test_pulses']['small'], sweep_files['test_pulses']['large']): html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img), os.path.basename(small_img) ) html += "</div>" html += "<div style='position:absolute;width:50%;right:0;top:40'>" if 'experiments' in sweep_files: for small_img, large_img in zip(sweep_files['experiments']['small'], sweep_files['experiments']['large']): html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img), os.path.basename(small_img) ) html += "</div>" html += "</body></html>" with open(file_name, 'w') as f: f.write(html)
[docs]def make_cell_html(image_files, ephys_roi_result, file_name, relative_sweep_link): html = "<html><body>" specimen = ephys_roi_result['specimens'][0] html += "<h3>Specimen %d: %s</h3>" % ( specimen['id'], specimen['name'] ) html += "<p>page created at: %s</p>" % get_time_string() if relative_sweep_link: html += "<p><a href='sweep.html' target='_blank'> Sweep QC Figures </a></p>" else: sweep_qc_link = '/'.join([ephys_roi_result['storage_directory'], 'qc_figures', 'sweep.html']) sweep_qc_link = lims_utilities.safe_system_path(sweep_qc_link) html += "<p><a href='%s' target='_blank'> Sweep QC Figures </a></p>" % sweep_qc_link fields_to_show = [ 'electrode_0_pa', 'seal_gohm', 'initial_access_resistance_mohm', 'input_resistance_mohm' ] html += "<table>" for field in fields_to_show: html += "<tr><td>%s</td><td>%s</td></tr>" % (field, ephys_roi_result.get(field,None)) html += "</table>" for image_file_set_name in image_files: html += "<h3>%s</h3>" % image_file_set_name image_set_files = image_files[image_file_set_name] for small_img, large_img in zip(image_set_files['small'], image_set_files['large']): html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img), os.path.basename(small_img) ) html += ("</body></html>") with open(file_name, 'w') as f: f.write(html)
[docs]def make_sweep_page(nwb_file, ephys_roi_result, working_dir): sizes = { 'small': 2.0, 'large': 6.0 } sweep_files = plot_sweep_figures(nwb_file, ephys_roi_result, working_dir, sizes) make_sweep_html(sweep_files, os.path.join(working_dir, 'sweep.html'))
[docs]def make_cell_page(nwb_file, ephys_roi_result, working_dir, save_cell_plots=True): if save_cell_plots: sizes = { 'small': 2.0, 'large': 6.0 } cell_files = plot_cell_figures(nwb_file, ephys_roi_result, working_dir, sizes) else: cell_files = {} logging.info("saving images") sizes = { 'small': 200, 'large': None } plot_images(ephys_roi_result, working_dir, sizes, cell_files) sweep_page = os.path.join(working_dir, 'sweep.html') relative_sweep_link = os.path.exists(sweep_page) if not relative_sweep_link: logging.info("sweep page doesn't exist, point to production sweep page") make_cell_html(cell_files, ephys_roi_result, os.path.join(working_dir, 'index.html'), relative_sweep_link)
[docs]def exp_curve(x, a, inv_tau, y0): ''' Function used for tau curve fitting ''' return y0 + a * np.exp(-inv_tau * x)
[docs]def main(): parser = argparse.ArgumentParser(description='analyze specimens for cell-wide features') parser.add_argument('nwb_file') parser.add_argument('feature_json') parser.add_argument('--output_directory', default='.') parser.add_argument('--no-sweep-page', action='store_false', dest='sweep_page') parser.add_argument('--no-cell-page', action='store_false', dest='cell_page') parser.add_argument('--log_level') args = parser.parse_args() if args.log_level: logging.getLogger().setLevel(args.log_level) ephys_roi_result = json_utilities.read(args.feature_json) if args.sweep_page: logging.debug("making sweep page") make_sweep_page(args.nwb_file, ephys_roi_result, args.output_directory) if args.cell_page: logging.debug("making cell page") make_cell_page(args.nwb_file, ephys_roi_result, args.output_directory, True)
if __name__ == '__main__': main()