Source code for allensdk.internal.ephys.plot_qc_figures
import matplotlib
matplotlib.use('agg')
import logging
import allensdk.internal.core.lims_utilities as lims_utilities
import allensdk.core.json_utilities as json_utilities
from allensdk.core.nwb_data_set import NwbDataSet
import allensdk.ephys.ephys_features as ft
from allensdk.ephys.extract_cell_features import get_square_stim_characteristics, get_ramp_stim_characteristics, get_stim_characteristics
import sys
import argparse
import os
import json
import h5py
import numpy as np
from six import iteritems
from scipy.optimize import curve_fit
import scipy.signal as sg
import scipy.misc
import datetime
import matplotlib.pyplot as plt
#import seaborn as sns
AXIS_Y_RANGE = [ -110, 60 ]
[docs]def get_spikes(sweep_features, sweep_number):
return get_features(sweep_features, sweep_number)["spikes"]
[docs]def get_features(sweep_features, sweep_number):
try:
return sweep_features[int(sweep_number)]
except KeyError:
return sweep_features[str(sweep_number)]
[docs]def load_experiment(file_name, sweep_number):
ds = NwbDataSet(file_name)
sweep = ds.get_sweep(sweep_number)
r = sweep['index_range']
v = sweep['response'] * 1e3
i = sweep['stimulus'] * 1e12
dt = 1.0 / sweep['sampling_rate']
t = np.arange(0, len(v)) * dt
return (v, i, t, r, dt)
[docs]def plot_single_ap_values(nwb_file, sweep_numbers, lims_features, sweep_features, cell_features, type_name):
figs = [ plt.figure() for f in range(3+len(sweep_numbers)) ]
v, i, t, r, dt = load_experiment(nwb_file, sweep_numbers[0])
if type_name == "short_square" or type_name == "long_square":
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
elif type_name == "ramp":
stim_start, start_idx = get_ramp_stim_characteristics(i, t)
gen_features = ["threshold", "peak", "trough", "fast_trough", "slow_trough"]
voltage_features = ["threshold_v", "peak_v", "trough_v", "fast_trough_v", "slow_trough_v"]
time_features = ["threshold_t", "peak_t", "trough_t", "fast_trough_t", "slow_trough_t"]
for sn in sweep_numbers:
spikes = get_spikes(sweep_features, sn)
if (len(spikes) < 1):
logging.warning("no spikes in sweep %d" % sn)
continue
if type_name != "long_square":
voltages = [spikes[0][f] for f in voltage_features]
times = [spikes[0][f] for f in time_features]
else:
rheo_sn = cell_features["long_squares"]["rheobase_sweep"]["id"]
rheo_spike = get_spikes(sweep_features, rheo_sn)[0]
voltages = [ rheo_spike[f] for f in voltage_features]
times = [ rheo_spike[f] for f in time_features]
plt.figure(figs[0].number)
plt.scatter(range(len(voltages)), voltages, color='gray')
plt.tight_layout()
plt.figure(figs[1].number)
plt.scatter(range(len(times)), times, color='gray')
plt.tight_layout()
plt.figure(figs[2].number)
plt.scatter([0], [spikes[0]['upstroke'] / (-spikes[0]['downstroke'])], color='gray')
plt.tight_layout()
plt.figure(figs[0].number)
yvals = [float(lims_features[k + "_v_" + type_name]) for k in gen_features if lims_features[k + "_v_" + type_name] is not None]
xvals = range(len(yvals))
plt.scatter(xvals, yvals, color='blue', marker='_', s=40, zorder=100)
plt.xticks(xvals, ['thr', 'pk', 'tr', 'ftr', 'str'])
plt.title(type_name + ": voltages")
plt.figure(figs[1].number)
yvals = [float(lims_features[k + "_t_" + type_name]) for k in gen_features if lims_features[k + "_t_" + type_name] is not None]
xvals = range(len(yvals))
plt.scatter(xvals, yvals, color='blue', marker='_', s=40, zorder=100)
plt.xticks(xvals, ['thr', 'pk', 'tr', 'ftr', 'str'])
plt.title(type_name + ": times")
plt.figure(figs[2].number)
if lims_features["upstroke_downstroke_ratio_" + type_name] is not None:
plt.scatter([0], [float(lims_features["upstroke_downstroke_ratio_" + type_name])], color='blue', marker='_', s=40, zorder=100)
plt.xticks([])
plt.title(type_name + ": up/down")
for index, sn in enumerate(sweep_numbers):
plt.figure(figs[3 + index].number)
v, i, t, r, dt = load_experiment(nwb_file, sn)
plt.plot(t, v, color='black')
plt.title(str(sn))
spikes = get_spikes(sweep_features, sn)
nspikes = len(spikes)
if type_name != "long_square" and nspikes:
if nspikes == 0:
logging.warning("no spikes in sweep %d" % sn)
continue
voltages = [spikes[0][f] for f in voltage_features]
times = [spikes[0][f] for f in time_features]
else:
rheo_sn = cell_features["long_squares"]["rheobase_sweep"]["id"]
rheo_spike = get_spikes(sweep_features, rheo_sn)[0]
voltages = [ rheo_spike[f] for f in voltage_features ]
times = [ rheo_spike[f] for f in time_features ]
plt.scatter(times, voltages, color='red', zorder=20)
delta_v = 5.0
if nspikes:
plt.plot([spikes[0]['upstroke_t'] - 1e-3 * (delta_v / spikes[0]['upstroke']),
spikes[0]['upstroke_t'] + 1e-3 * (delta_v / spikes[0]['upstroke'])],
[spikes[0]['upstroke_v'] - delta_v, spikes[0]['upstroke_v'] + delta_v], color='red')
if 'downstroke_t' in spikes[0]:
plt.plot([spikes[0]['downstroke_t'] - 1e-3 * (delta_v / spikes[0]['downstroke']),
spikes[0]['downstroke_t'] + 1e-3 * (delta_v / spikes[0]['downstroke'])],
[spikes[0]['downstroke_v'] - delta_v, spikes[0]['downstroke_v'] + delta_v], color='red')
else:
logging.warning("spike has no downstroke time, clipped")
if type_name == "ramp":
if nspikes:
plt.xlim(spikes[0]["threshold_t"] - 0.002, spikes[0]["fast_trough_t"] + 0.01)
elif type_name == "short_square":
plt.xlim(stim_start - 0.002, stim_start + stim_dur + 0.01)
elif type_name == "long_square":
plt.xlim(times[0]- 0.002, times[-2] + 0.002)
plt.tight_layout()
return figs
[docs]def plot_sweep_figures(nwb_file, ephys_roi_result, image_dir, sizes):
sweeps = ephys_roi_result["specimens"][0]["ephys_sweeps"]
vclamp_sweep_numbers = sorted([ s['sweep_number'] for s in sweeps if s['stimulus_units'] == 'Amps' or s['stimulus_units'] == 'pA' ])
image_file_sets = {}
tp_set = []
exp_set = []
prev_sweep_number = None
tp_len = 0.035
tp_steps = int(tp_len * 200000)
b, a = sg.bessel(4, 0.1, "low")
for i, sweep_number in enumerate(vclamp_sweep_numbers):
logging.info("plotting sweep %d" % sweep_number)
if i == 0:
v_init, i_init, t_init, r_init, dt_init = load_experiment(nwb_file, sweep_number)
tp_fig = plt.figure()
axTP = plt.gca()
axTP.set_yticklabels([])
axTP.set_xticklabels([])
axTP.set_xlabel(str(sweep_number))
axTP.set_ylabel('')
xTP = t_init[0:tp_steps]
yTP = v_init[0:tp_steps]
axTP.plot(xTP, yTP, linewidth=1)
axTP.set_xlim(0, tp_len)
# sns.despine()
exp_fig = plt.figure()
axDP = plt.gca()
axDP.set_yticklabels([])
axDP.set_xticklabels([])
axDP.set_xlabel(str(sweep_number))
axDP.set_ylabel('')
v_exp = v_init[r_init[0]:]
t_exp = t_init[r_init[0]:]
yDP = sg.filtfilt(b, a, v_exp, axis=0)
xDP = t_exp
baseline = yDP[5000:9000]
baselineMean = np.mean(baseline)
baselineV = (np.ones(len(xDP))) * baselineMean
axDP.plot(xDP, yDP, linewidth=1)
axDP.plot(xDP, baselineV, linewidth=1)
axDP.set_xlim(t_exp[0], t_exp[-1])
# sns.despine()
v_prev, i_prev, t_prev, r_prev = v_init, i_init, t_init, r_init
else:
v, i, t, r, dt = load_experiment(nwb_file, sweep_number)
tp_fig = plt.figure()
axTP = plt.gca()
axTP.set_yticklabels([])
axTP.set_xticklabels([])
axTP.set_xlabel(str(sweep_number))
axTP.set_ylabel('')
yTP = v[:tp_steps]
xTP = t[:tp_steps]
TPBL = np.mean(yTP[0:100])
yTPN = yTP - TPBL
yTPp = v_prev[:tp_steps]
TPpBL = np.mean(yTPp[0:100])
yTPpN = yTPp - TPpBL
yTPi = v_init[:tp_steps]
TPiBL = np.mean(yTPi[0:100])
yTPiN = yTPi - TPiBL
axTP.plot(xTP, yTPiN, linewidth=1)
axTP.plot(xTP, yTPpN, linewidth=1)
axTP.plot(xTP, yTPN, linewidth=1)
axTP.set_xlim(0, tp_len)
# sns.despine()
exp_fig = plt.figure()
axDP = plt.gca()
axDP.set_yticklabels([])
axDP.set_xticklabels([])
axDP.set_xlabel(str(sweep_number))
axDP.set_ylabel('')
v_exp = v[r[0]:]
t_exp = t[r[0]:]
yDP = sg.filtfilt(b, a, v_exp, axis=0)
xDP = t_exp
baseline = yDP[5000:9000]
baselineMean = np.mean(baseline)
baselineV = (np.ones(len(xDP))) * baselineMean
axDP.plot(xDP, yDP, linewidth=1)
axDP.plot(xDP, baselineV, linewidth=1)
axDP.set_xlim(t_exp[0], t_exp[-1])
# sns.despine()
v_prev, i_prev, t_prev, r_prev = v, i, t, r
prev_sweep_number = sweep_number
save_figure(tp_fig, 'test_pulse_%d' % sweep_number, 'test_pulses', image_dir, sizes, image_file_sets)
save_figure(exp_fig, 'experiment_%d' % sweep_number, 'experiments', image_dir, sizes, image_file_sets)
return image_file_sets
[docs]def save_figure(fig, image_name, image_set_name, image_dir, sizes, image_sets, scalew=1, scaleh=1, ext='jpg'):
plt.figure(fig.number)
if image_set_name not in image_sets:
image_sets[image_set_name] = { size_name: [] for size_name in sizes }
for size_name, size in iteritems(sizes):
fig.set_size_inches(size*scalew, size*scaleh)
image_file = os.path.join(image_dir, "%s_%s.%s" % (image_name, size_name, ext))
plt.savefig(image_file, bbox_inches="tight")
image_sets[image_set_name][size_name].append(image_file)
plt.close()
[docs]def plot_images(ephys_roi_result, image_dir, sizes, image_sets):
wkfs = [ f for f in ephys_roi_result['well_known_files'] if f['filename'].endswith('tif') ]
paths = [ os.path.join(f['storage_directory'], f['filename']) for f in wkfs ]
paths = [ lims_utilities.safe_system_path(p) for p in paths ]
image_set_name = "images"
image_sets[image_set_name] = { size_name: [] for size_name in sizes }
for i, path in enumerate(paths):
image_data = plt.imread(path)
image_data = np.array(image_data, dtype=np.float32)
vmin = image_data.min()
vmax = image_data.max()
image_data = np.array((image_data - vmin) / (vmax - vmin) * 255.0, dtype=np.uint8)
for size_name, size in iteritems(sizes):
if size:
s = image_data.shape
skip = int(s[0] / size)
sdata = image_data[::skip, ::skip]
else:
sdata = image_data
filename = os.path.join(image_dir, "image_%d_%s.jpg" % (i, size_name))
scipy.misc.imsave(filename, sdata)
image_sets['images'][size_name].append(filename)
[docs]def plot_subthreshold_long_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
lsq_sweeps = cell_features["long_squares"]["sweeps"]
sub_sweeps = cell_features["long_squares"]["subthreshold_sweeps"]
tau_sweeps = cell_features["long_squares"]["subthreshold_membrane_property_sweeps"]
# 0a - Plot VI curve and linear fit, along with vrest
x = np.array([ s['stim_amp'] for s in sub_sweeps ])
y = np.array([ s['peak_deflect'][0] for s in sub_sweeps ])
i = np.array([ s['stim_amp'] for s in tau_sweeps ])
fig = plt.figure()
plt.scatter(x, y, color='black')
plt.plot([x.min(), x.max()], [lims_features["vrest"], lims_features["vrest"]], color="blue", linewidth=2)
plt.plot(i, i * 1e-3 * lims_features["ri"] + lims_features["vrest"], color="red", linewidth=2)
plt.xlabel("pA")
plt.ylabel("mV")
plt.title("ri = {:.1f}, vrest = {:.1f}".format(lims_features["ri"], lims_features["vrest"]))
plt.tight_layout()
save_figure(fig, 'VI_curve', 'subthreshold_long_squares', image_dir, sizes, cell_image_files)
# 0b - Plot tau curve and average
fig = plt.figure()
x = np.array([ s['stim_amp'] for s in tau_sweeps ])
y = np.array([ s['tau'] for s in tau_sweeps ])
plt.scatter(x, y, color='black')
i = np.array([ s['stim_amp'] for s in tau_sweeps ])
plt.plot([i.min(), i.max()], [cell_features["long_squares"]["tau"], cell_features["long_squares"]["tau"]], color="red", linewidth=2)
plt.xlabel("pA")
ylim = plt.ylim()
plt.ylim(0, ylim[1])
plt.ylabel("tau (s)")
plt.tight_layout()
save_figure(fig, 'tau_curve', 'subthreshold_long_squares', image_dir, sizes, cell_image_files)
subthresh_dict = {s['id']:s for s in tau_sweeps}
# 0c - Plot the subthreshold squares
tau_sweeps = [ s['id'] for s in tau_sweeps ]
tau_figs = [ plt.figure() for i in range(len(tau_sweeps)) ]
for index, s in enumerate(tau_sweeps):
v, i, t, r, dt = load_experiment(nwb_file, s)
plt.figure(tau_figs[index].number)
plt.plot(t, v, color="black")
if index == 0:
min_y, max_y = plt.ylim()
else:
ylims = plt.ylim()
if min_y > ylims[0]:
min_y = ylims[0]
if max_y < ylims[1]:
max_y = ylims[1]
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
plt.xlim(stim_start - 0.05, stim_start + stim_dur + 0.05)
peak_idx = subthresh_dict[s]['peak_deflect'][1]
peak_t = peak_idx*dt
plt.scatter([peak_t], [subthresh_dict[s]['peak_deflect'][0]], color='red', zorder=10)
popt = ft.fit_membrane_time_constant(v, t, stim_start, peak_t)
plt.title(str(s))
plt.plot(t[start_idx:peak_idx], exp_curve(t[start_idx:peak_idx] - t[start_idx], *popt), color='blue')
for index, s in enumerate(tau_sweeps):
plt.figure(tau_figs[index].number)
plt.ylim(min_y, max_y)
plt.tight_layout()
for index, tau_fig in enumerate(tau_figs):
save_figure(tau_figs[index], 'tau_%d' % index, 'subthreshold_long_squares', image_dir, sizes, cell_image_files)
[docs]def plot_short_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
repeat_amp = cell_features["short_squares"].get("stimulus_amplitude", None)
if repeat_amp is not None:
short_square_sweep_nums = [ s['id'] for s in cell_features["short_squares"]["common_amp_sweeps"] ]
figs = plot_single_ap_values(nwb_file, short_square_sweep_nums,
lims_features, sweep_features, cell_features,
"short_square")
for index, fig in enumerate(figs):
save_figure(fig, 'short_squares_%d' % index, 'short_squares', image_dir, sizes, cell_image_files)
fig = plot_instantaneous_threshold_thumbnail(nwb_file, short_square_sweep_nums,
cell_features, lims_features, sweep_features)
save_figure(fig, 'instantaneous_threshold_thumbnail', 'short_squares', image_dir, sizes, cell_image_files)
else:
logging.warning("No short square figures to plot.")
[docs]def plot_instantaneous_threshold_thumbnail(nwb_file, sweep_numbers, cell_features, lims_features, sweep_features, color='red'):
min_sweep_number = None
for sn in sorted(sweep_numbers):
spikes = get_spikes(sweep_features, sn)
if len(spikes) > 0:
min_sweep_number = sn if min_sweep_number is None else min(min_sweep_number, sn)
fig = plt.figure(frameon=False)
ax = plt.Axes(fig, [0., 0., 1., 1.])
ax.set_axis_off()
fig.add_axes(ax)
ax.set_yticklabels([])
ax.set_xticklabels([])
ax.set_xlabel('')
ax.set_ylabel('')
v, i, t, r, dt = load_experiment(nwb_file, sn)
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
tstart = stim_start - 0.002
tend = stim_start + stim_dur + 0.005
tscale = 0.005
plt.plot(t, v, linewidth=1, color=color)
plt.ylim(AXIS_Y_RANGE[0], AXIS_Y_RANGE[1])
plt.xlim(tstart, tend)
return fig
[docs]def plot_ramp_figures(nwb_file, cell_specimen, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
sweeps = cell_specimen['ephys_sweeps']
ramps_sweeps = [ s["sweep_number"] for s in sweeps if s["workflow_state"].endswith("passed") and s["ephys_stimulus"]["description"][:10] == "C1RP25PR1S"]
figs = []
if len(ramps_sweeps) > 0:
figs = plot_single_ap_values(nwb_file, ramps_sweeps, lims_features, sweep_features, cell_features, "ramp")
for index, fig in enumerate(figs):
save_figure(fig, 'ramps_%d' % index, 'ramps', image_dir, sizes, cell_image_files)
[docs]def plot_rheo_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
rheo_sweeps = [ lims_features["rheobase_sweep_num"] ]
figs = plot_single_ap_values(nwb_file, rheo_sweeps, lims_features, sweep_features, cell_features, "long_square")
for index, fig in enumerate(figs):
save_figure(fig, 'rheo_%d' % index, 'rheo', image_dir, sizes, cell_image_files)
[docs]def plot_hero_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
fig = plt.figure()
v, i, t, r, dt = load_experiment(nwb_file, int(lims_features["thumbnail_sweep_num"]))
plt.plot(t, v, color='black')
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
plt.xlim(stim_start - 0.05, stim_start + stim_dur + 0.05)
plt.ylim(-110, 50)
spike_times = [spk['threshold_t'] for spk in get_spikes(sweep_features, lims_features["thumbnail_sweep_num"])]
isis = np.diff(np.array(spike_times))
plt.title("thumbnail {:d}, amp = {:.1f}".format(lims_features["thumbnail_sweep_num"], stim_amp))
plt.tight_layout()
save_figure(fig, 'thumbnail_0', 'thumbnail', image_dir, sizes, cell_image_files, scalew=2)
fig = plt.figure()
plt.plot(range(len(isis)), isis)
plt.ylabel("ISI (ms)")
if lims_features.get("adaptation", None) is not None:
plt.title("adapt = {:.3g}".format(lims_features["adaptation"]))
else:
plt.title("adapt = not defined")
for k in ["has_delay", "has_burst", "has_pause"]:
if lims_features.get(k, None) is None:
lims_features[k] = False
plt.tight_layout()
save_figure(fig, 'thumbnail_1', 'thumbnail', image_dir, sizes, cell_image_files)
yvals = [
float(lims_features["has_delay"]),
float(lims_features["has_burst"]),
float(lims_features["has_pause"]),
]
xvals = range(len(yvals))
fig = plt.figure()
plt.scatter(xvals, yvals, color='red')
plt.xticks(xvals, ['Delay', 'Burst', 'Pause'])
plt.title("flags")
plt.tight_layout()
save_figure(fig, 'thumbnail_2', 'thumbnail', image_dir, sizes, cell_image_files)
summary_fig = plot_long_square_summary(nwb_file, cell_features, lims_features, sweep_features)
save_figure(summary_fig, 'ephys_summary', 'thumbnail', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_long_square_summary(nwb_file, cell_features, lims_features, sweep_features):
long_square_sweeps = cell_features['long_squares']['sweeps']
long_square_sweep_numbers = [ int(s['id']) for s in long_square_sweeps ]
thumbnail_summary_fig = plot_sweep_set_summary(nwb_file, int(lims_features['thumbnail_sweep_num']), long_square_sweep_numbers)
plt.figure(thumbnail_summary_fig.number)
return thumbnail_summary_fig
[docs]def plot_fi_curve_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
fig = plt.figure()
fi_sorted = sorted(cell_features["long_squares"]["spiking_sweeps"], key=lambda s:s['stim_amp'])
x = [d['stim_amp'] for d in fi_sorted]
y = [d['avg_rate'] for d in fi_sorted]
last_zero_idx = np.nonzero(y)[0][0] - 1
plt.scatter(x, y, color='black')
plt.plot(x[last_zero_idx:], cell_features["long_squares"]["fi_fit_slope"] * (np.array(x[last_zero_idx:]) - x[last_zero_idx]), color='red')
plt.xlabel("pA")
plt.ylabel("spikes/sec")
plt.title("slope = {:.3g}".format(lims_features["f_i_curve_slope"]))
rheo_hero_sweeps = [int(lims_features["rheobase_sweep_num"]), int(lims_features["thumbnail_sweep_num"])]
rheo_hero_x = []
for s in rheo_hero_sweeps:
v, i, t, r, dt = load_experiment(nwb_file, s)
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
rheo_hero_x.append(stim_amp)
rheo_hero_y = [ len(get_spikes(sweep_features, s)) for s in rheo_hero_sweeps ]
plt.scatter(rheo_hero_x, rheo_hero_y, zorder=20)
plt.tight_layout()
save_figure(fig, 'fi_curve', 'fi_curve', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_sag_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files):
fig = plt.figure()
for d in cell_features["long_squares"]["subthreshold_sweeps"]:
if d['peak_deflect'][0] == lims_features["vm_for_sag"]:
v, i, t, r, dt = load_experiment(nwb_file, int(d['id']))
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
plt.plot(t, v, color='black')
plt.scatter(d['peak_deflect'][1], d['peak_deflect'][0], color='red', zorder=10)
#plt.plot([stim_start + stim_dur - 0.1, stim_start + stim_dur], [d['steady'], d['steady']], color='red', zorder=10)
plt.xlim(stim_start - 0.25, stim_start + stim_dur + 0.25)
plt.title("sag = {:.3g}".format(lims_features['sag']))
plt.tight_layout()
save_figure(fig, 'sag', 'sag', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_sweep_value_figures(cell_specimen, image_dir, sizes, cell_image_files):
sweeps = sorted(cell_specimen['ephys_sweeps'], key=lambda s: s['sweep_number'] )
# plot bridge balance
data = np.array([ [ s['bridge_balance_mohm'], s['sweep_number'] ] for s in sweeps ]).T
mask_nulls(data)
fig = plt.figure()
plt.title('bridge balance')
plt.plot(data[1,:], data[0,:], marker='.')
save_figure(fig, 'bridge_balance', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2)
# plot pre_vm_mv, no blowout sweep
data = np.array([ [ s['pre_vm_mv'], s['sweep_number'] ]
for s in sweeps
if not s['ephys_stimulus']['description'].startswith('EXTPBLWOUT')]).T
mask_nulls(data)
fig = plt.figure()
plt.title('pre vm')
plt.plot(data[1,:], data[0,:], marker='.')
save_figure(fig, 'pre_vm_mv', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2)
# plot bias current
data = np.array([ [ s['leak_pa'], s['sweep_number'] ] for s in sweeps ]).T
mask_nulls(data)
fig = plt.figure()
plt.title('leak')
plt.plot(data[1,:], data[0,:], marker='.')
save_figure(fig, 'leak', 'sweep_values', image_dir, sizes, cell_image_files, scalew=2)
[docs]def plot_cell_figures(nwb_file, ephys_roi_result, image_dir, sizes):
cell_image_files = {}
plt.style.use('ggplot')
cell_specimen = ephys_roi_result["specimens"][0]
cell_features = cell_specimen["cell_ephys_features"]
lims_features = cell_specimen["ephys_features"][0]
sweep_features = cell_specimen["sweep_ephys_features"]
logging.info("saving sweep feature figures")
plot_sweep_value_figures(cell_specimen, image_dir, sizes, cell_image_files)
logging.info("saving tau and vi figs")
plot_subthreshold_long_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving short square figs")
plot_short_square_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving ramps")
plot_ramp_figures(nwb_file, cell_specimen, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving rheo figs")
plot_rheo_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving thumbnail figs")
plot_hero_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving fi curve figs")
plot_fi_curve_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
logging.info("saving sag figs")
plot_sag_figures(nwb_file, cell_features, lims_features, sweep_features, image_dir, sizes, cell_image_files)
return cell_image_files
[docs]def plot_sweep_set_summary(nwb_file, highlight_sweep_number, sweep_numbers,
highlight_color='#0779BE', background_color='#dddddd'):
fig = plt.figure(frameon=False)
ax = plt.Axes(fig, [0., 0., 1., 1.])
ax.set_axis_off()
fig.add_axes(ax)
ax.set_yticklabels([])
ax.set_xticklabels([])
ax.set_xlabel('')
ax.set_ylabel('')
for sn in sweep_numbers:
v, i, t, r, dt = load_experiment(nwb_file, sn)
ax.plot(t, v, linewidth=0.5, color=background_color)
v, i, t, r, dt = load_experiment(nwb_file, highlight_sweep_number)
plt.plot(t, v, linewidth=1, color=highlight_color)
stim_start, stim_dur, stim_amp, start_idx, end_idx = get_square_stim_characteristics(i, t)
tstart = stim_start - 0.05
tend = stim_start + stim_dur + 0.25
ax.set_ylim(AXIS_Y_RANGE[0], AXIS_Y_RANGE[1])
ax.set_xlim(tstart, tend)
return fig
[docs]def make_sweep_html(sweep_files, file_name):
html = "<html><body>"
html += "<a href='index.html'>Cell QC Figures</a>"
html += "<p>page created at: %s</p>" % get_time_string()
html += "<div style='position:absolute;width:50%;left:0;top:40'>"
if 'test_pulses' in sweep_files:
for small_img, large_img in zip(sweep_files['test_pulses']['small'],
sweep_files['test_pulses']['large']):
html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img),
os.path.basename(small_img) )
html += "</div>"
html += "<div style='position:absolute;width:50%;right:0;top:40'>"
if 'experiments' in sweep_files:
for small_img, large_img in zip(sweep_files['experiments']['small'],
sweep_files['experiments']['large']):
html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img),
os.path.basename(small_img) )
html += "</div>"
html += "</body></html>"
with open(file_name, 'w') as f:
f.write(html)
[docs]def make_cell_html(image_files, ephys_roi_result, file_name, relative_sweep_link):
html = "<html><body>"
specimen = ephys_roi_result['specimens'][0]
html += "<h3>Specimen %d: %s</h3>" % ( specimen['id'], specimen['name'] )
html += "<p>page created at: %s</p>" % get_time_string()
if relative_sweep_link:
html += "<p><a href='sweep.html' target='_blank'> Sweep QC Figures </a></p>"
else:
sweep_qc_link = '/'.join([ephys_roi_result['storage_directory'], 'qc_figures', 'sweep.html'])
sweep_qc_link = lims_utilities.safe_system_path(sweep_qc_link)
html += "<p><a href='%s' target='_blank'> Sweep QC Figures </a></p>" % sweep_qc_link
fields_to_show = [ 'electrode_0_pa', 'seal_gohm', 'initial_access_resistance_mohm', 'input_resistance_mohm' ]
html += "<table>"
for field in fields_to_show:
html += "<tr><td>%s</td><td>%s</td></tr>" % (field, ephys_roi_result.get(field,None))
html += "</table>"
for image_file_set_name in image_files:
html += "<h3>%s</h3>" % image_file_set_name
image_set_files = image_files[image_file_set_name]
for small_img, large_img in zip(image_set_files['small'], image_set_files['large']):
html += "<a href='%s' target='_blank'><img src='%s'></img></a>" % ( os.path.basename(large_img),
os.path.basename(small_img) )
html += ("</body></html>")
with open(file_name, 'w') as f:
f.write(html)
[docs]def make_sweep_page(nwb_file, ephys_roi_result, working_dir):
sizes = { 'small': 2.0, 'large': 6.0 }
sweep_files = plot_sweep_figures(nwb_file, ephys_roi_result, working_dir, sizes)
make_sweep_html(sweep_files,
os.path.join(working_dir, 'sweep.html'))
[docs]def make_cell_page(nwb_file, ephys_roi_result, working_dir, save_cell_plots=True):
if save_cell_plots:
sizes = { 'small': 2.0, 'large': 6.0 }
cell_files = plot_cell_figures(nwb_file, ephys_roi_result, working_dir, sizes)
else:
cell_files = {}
logging.info("saving images")
sizes = { 'small': 200, 'large': None }
plot_images(ephys_roi_result, working_dir, sizes, cell_files)
sweep_page = os.path.join(working_dir, 'sweep.html')
relative_sweep_link = os.path.exists(sweep_page)
if not relative_sweep_link:
logging.info("sweep page doesn't exist, point to production sweep page")
make_cell_html(cell_files, ephys_roi_result,
os.path.join(working_dir, 'index.html'),
relative_sweep_link)
[docs]def exp_curve(x, a, inv_tau, y0):
''' Function used for tau curve fitting '''
return y0 + a * np.exp(-inv_tau * x)
[docs]def main():
parser = argparse.ArgumentParser(description='analyze specimens for cell-wide features')
parser.add_argument('nwb_file')
parser.add_argument('feature_json')
parser.add_argument('--output_directory', default='.')
parser.add_argument('--no-sweep-page', action='store_false', dest='sweep_page')
parser.add_argument('--no-cell-page', action='store_false', dest='cell_page')
parser.add_argument('--log_level')
args = parser.parse_args()
if args.log_level:
logging.getLogger().setLevel(args.log_level)
ephys_roi_result = json_utilities.read(args.feature_json)
if args.sweep_page:
logging.debug("making sweep page")
make_sweep_page(args.nwb_file, ephys_roi_result, args.output_directory)
if args.cell_page:
logging.debug("making cell page")
make_cell_page(args.nwb_file, ephys_roi_result, args.output_directory, True)
if __name__ == '__main__': main()