Source code for allensdk.brain_observatory.locally_sparse_noise

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2016-2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import allensdk.brain_observatory.stimulus_info as stimulus_info
import h5py
import numpy as np
import pandas as pd
import scipy.ndimage
from .receptive_field_analysis.receptive_field import \
    compute_receptive_field_with_postprocessing
from .receptive_field_analysis.visualization import plot_receptive_field_data

from . import circle_plots as cplots
from . import observatory_plots as oplots
from .brain_observatory_exceptions import MissingStimulusException
from .stimulus_analysis import StimulusAnalysis
from .receptive_field_analysis.tools import dict_generator, read_h5_group


[docs]class LocallySparseNoise(StimulusAnalysis): """ Perform tuning analysis specific to the locally sparse noise stimulus. Parameters ---------- data_set: BrainObservatoryNwbDataSet object stimulus: string Name of locally sparse noise stimulus. See brain_observatory.stimulus_info. nrows: int Number of rows in the stimulus template ncol: int Number of columns in the stimulus template """ LSN_ON = 255 LSN_OFF = 0 LSN_GREY = 127 LSN_OFF_SCREEN = 64 def __init__(self, data_set, stimulus=None, **kwargs): super(LocallySparseNoise, self).__init__(data_set, **kwargs) if stimulus is None: self.stimulus = stimulus_info.LOCALLY_SPARSE_NOISE else: self.stimulus = stimulus try: lsn_dims = stimulus_info.LOCALLY_SPARSE_NOISE_DIMENSIONS[ self.stimulus] except KeyError: raise KeyError("Unknown stimulus name: %s" % self.stimulus) self.nrows = lsn_dims[0] self.ncols = lsn_dims[1] self._LSN = LocallySparseNoise._PRELOAD self._LSN_mask = LocallySparseNoise._PRELOAD self._sweeplength = LocallySparseNoise._PRELOAD self._interlength = LocallySparseNoise._PRELOAD self._extralength = LocallySparseNoise._PRELOAD self._mean_response = LocallySparseNoise._PRELOAD self._receptive_field = LocallySparseNoise._PRELOAD self._cell_index_receptive_field_analysis_data = \ LocallySparseNoise._PRELOAD @property def LSN(self): if self._LSN is LocallySparseNoise._PRELOAD: self.populate_stimulus_table() return self._LSN @property def LSN_mask(self): if self._LSN_mask is LocallySparseNoise._PRELOAD: self.populate_stimulus_table() return self._LSN_mask @property def sweeplength(self): if self._sweeplength is LocallySparseNoise._PRELOAD: self.populate_stimulus_table() return self._sweeplength @property def interlength(self): if self._interlength is LocallySparseNoise._PRELOAD: self.populate_stimulus_table() return self._interlength @property def extralength(self): if self._extralength is LocallySparseNoise._PRELOAD: self.populate_stimulus_table() return self._extralength @property def receptive_field(self): if self._receptive_field is LocallySparseNoise._PRELOAD: self._receptive_field = self.get_receptive_field() return self._receptive_field @property def cell_index_receptive_field_analysis_data(self): if self._cell_index_receptive_field_analysis_data is \ LocallySparseNoise._PRELOAD: self._cell_index_receptive_field_analysis_data = \ self.get_receptive_field_analysis_data() return self._cell_index_receptive_field_analysis_data @property def mean_response(self): if self._mean_response is LocallySparseNoise._PRELOAD: self._mean_response = self.get_mean_response() return self._mean_response
[docs] def get_peak(self): LocallySparseNoise._log.info('Calculating peak response properties') peak = pd.DataFrame(index=range(self.numbercells), columns=( 'rf_center_on_x_lsn', 'rf_center_on_y_lsn', 'rf_center_off_x_lsn', 'rf_center_off_y_lsn', 'rf_area_on_lsn', 'rf_area_off_lsn', 'rf_distance_lsn', 'rf_overlap_index_lsn', 'rf_chi2_lsn', 'cell_specimen_id')) csids = self.data_set.get_cell_specimen_ids() df = self.get_receptive_field_attribute_df() peak.cell_specimen_id = csids for nc in range(self.numbercells): peak['rf_chi2_lsn'].iloc[nc] = \ df['chi_squared_analysis/min_p'].iloc[nc] # find the index of the largest on subunit, if it exists on_i = None if 'on/gaussian_fit/area' in df.columns: area_on = df['on/gaussian_fit/area'].iloc[nc] # watch out for NaNs and Nones if isinstance(area_on, np.ndarray): area_on[np.equal(area_on, None)] = np.nan if not np.all(np.isnan(area_on.astype(float))): on_i = np.nanargmax(area_on) else: on_i = None if on_i is None: peak['rf_area_on_lsn'].iloc[nc] = np.nan peak['rf_center_on_x_lsn'].iloc[nc] = np.nan peak['rf_center_on_y_lsn'].iloc[nc] = np.nan else: peak['rf_area_on_lsn'].iloc[nc] = \ df['on/gaussian_fit/area'].iloc[nc][on_i] peak['rf_center_on_x_lsn'].iloc[nc] = \ df['on/gaussian_fit/center_x'].iloc[nc][on_i] peak['rf_center_on_y_lsn'].iloc[nc] = \ df['on/gaussian_fit/center_y'].iloc[nc][on_i] # find the index of the largest off subunit, if it exists off_i = None if 'off/gaussian_fit/area' in df.columns: area_off = df['off/gaussian_fit/area'].iloc[nc] # watch out for NaNs and Nones if isinstance(area_off, np.ndarray): area_off[np.equal(area_off, None)] = np.nan if not np.all(np.isnan(area_off.astype(float))): off_i = np.nanargmax(area_off) else: off_i = None if off_i is None: peak['rf_area_off_lsn'].iloc[nc] = np.nan peak['rf_center_off_x_lsn'].iloc[nc] = np.nan peak['rf_center_off_y_lsn'].iloc[nc] = np.nan else: peak['rf_area_off_lsn'].iloc[nc] = \ df['off/gaussian_fit/area'].iloc[nc][off_i] peak['rf_center_off_x_lsn'].iloc[nc] = \ df['off/gaussian_fit/center_x'].iloc[nc][off_i] peak['rf_center_off_y_lsn'].iloc[nc] = \ df['off/gaussian_fit/center_y'].iloc[nc][off_i] if on_i is not None and off_i is not None: peak['rf_distance_lsn'].iloc[nc] = \ df['on/gaussian_fit/distance'].iloc[nc][on_i][off_i] peak['rf_overlap_index_lsn'].iloc[nc] = \ df['on/gaussian_fit/overlap'].iloc[nc][on_i][off_i] else: peak['rf_distance_lsn'].iloc[nc] = np.nan peak['rf_overlap_index_lsn'].iloc[nc] = np.nan return peak
[docs] def populate_stimulus_table(self): self._stim_table = self.data_set.get_stimulus_table(self.stimulus) self._LSN, self._LSN_mask = \ self.data_set.get_locally_sparse_noise_stimulus_template( self.stimulus, mask_off_screen=False) self._sweeplength = ( self._stim_table['end'][1] - self._stim_table['start'][1]) self._interlength = 4 * self._sweeplength self._extralength = self._sweeplength
[docs] def get_mean_response(self): logging.debug("Calculating mean responses") mean_response = np.empty( (self.nrows, self.ncols, self.numbercells + 1, 2)) for xp in range(self.nrows): for yp in range(self.ncols): on_frame = np.where(self.LSN[:, xp, yp] == self.LSN_ON)[0] off_frame = np.where(self.LSN[:, xp, yp] == self.LSN_OFF)[0] subset_on = self.mean_sweep_response[ self.stim_table.frame.isin(on_frame)] subset_off = self.mean_sweep_response[ self.stim_table.frame.isin(off_frame)] mean_response[xp, yp, :, 0] = subset_on.mean(axis=0) mean_response[xp, yp, :, 1] = subset_off.mean(axis=0) return mean_response
[docs] def get_receptive_field(self): ''' Calculates receptive fields for each cell ''' receptive_field = np.zeros( (self.nrows, self.ncols, self.numbercells, 2)) for cell_index in range( len(self.cell_index_receptive_field_analysis_data)): curr_rf = self.cell_index_receptive_field_analysis_data[ str(cell_index)] rf_on = curr_rf['on']['rts_convolution']['data'].copy() rf_off = curr_rf['off']['rts_convolution']['data'].copy() rf_on[np.logical_not( curr_rf['on']['fdr_mask']['data'].sum(axis=0))] = np.nan rf_off[np.logical_not( curr_rf['off']['fdr_mask']['data'].sum(axis=0))] = np.nan receptive_field[:, :, cell_index, 0] = rf_on receptive_field[:, :, cell_index, 1] = rf_off return receptive_field
[docs] def get_receptive_field_analysis_data(self): ''' Calculates receptive fields for each cell ''' csid_rf = {} for cell_index in range(self.data_set.number_of_cells): csid_rf[str(cell_index)] = \ compute_receptive_field_with_postprocessing( self.data_set, cell_index, self.stimulus, alpha=.05, number_of_shuffles=10000) return csid_rf
[docs] def plot_receptive_field_analysis_data(self, cell_index, **kwargs): rf = self._cell_index_receptive_field_analysis_data[str(cell_index)] return plot_receptive_field_data(rf, self, **kwargs)
[docs] def get_receptive_field_attribute_df(self): df_list = [] for cell_index_as_str, rf in \ self.cell_index_receptive_field_analysis_data.items(): attribute_dict = {} for x in dict_generator(rf): if x[-3] == 'attrs': if len(x[:-3]) == 0: key = x[-2] else: key = '/'.join(['/'.join(x[:-3]), x[-2]]) attribute_dict[key] = x[-1] massaged_dict = {} for key, val in attribute_dict.items(): massaged_dict[key] = [val] massaged_dict['oeid'] = self.data_set.get_metadata()[ 'ophys_experiment_id'] curr_df = pd.DataFrame.from_dict(massaged_dict) df_list.append(curr_df) attribute_df = pd.concat(df_list, sort=True) return attribute_df.sort_values('cell_index')
[docs] @staticmethod def merge_mean_response(rc1, rc2): """ Move out of this class, to session analysis """ # make sure that rc1 is the larger one if rc2.shape[0] > rc1.shape[0]: rc1, rc2 = rc2, rc1 shape_mult = np.array(rc1.shape) / np.array(rc2.shape) rc2_zoom = scipy.ndimage.zoom(rc2, shape_mult, order=0) return rc1 + rc2_zoom
[docs] def plot_cell_receptive_field(self, on, cell_specimen_id=None, color_map=None, clim=None, mask=None, cell_index=None, scalebar=True): if color_map is None: color_map = 'Reds' if on else 'Blues' onst = 'on' if on else 'off' cell_idx = self.row_from_cell_id(cell_specimen_id, cell_index) rf = self.cell_index_receptive_field_analysis_data[str(cell_idx)] rts = rf[onst]['rts']['data'] rts[np.logical_not(rf[onst]['fdr_mask']['data'].sum(axis=0))] = np.nan oplots.plot_receptive_field(rts, color_map=color_map, clim=clim, mask=mask, scalebar=scalebar)
[docs] def plot_population_receptive_field(self, color_map='RdPu', clim=None, mask=None, scalebar=True): rf = np.nansum(self.receptive_field, axis=(2, 3)) oplots.plot_receptive_field(rf, color_map=color_map, clim=clim, mask=mask, scalebar=scalebar)
[docs] def sort_trials(self): ds = self.data_set lsn_movie, lsn_mask = ds.get_locally_sparse_noise_stimulus_template( self.stimulus, mask_off_screen=False) baseline_trials = np.unique( np.where(lsn_movie[:, -5:, -1] != LocallySparseNoise.LSN_GREY)[0]) valid_indices = pd.Index(set(baseline_trials) & set(self.mean_sweep_response.index.tolist())) baseline_df = self.mean_sweep_response.loc[valid_indices] cell_baselines = np.nanmean(baseline_df.values, axis=0) lsn_movie[:, ~lsn_mask] = LocallySparseNoise.LSN_OFF_SCREEN trials = {} for row in range(self.nrows): for col in range(self.ncols): on_trials = np.where( lsn_movie[:, row, col] == LocallySparseNoise.LSN_ON) off_trials = np.where( lsn_movie[:, row, col] == LocallySparseNoise.LSN_OFF) trials[(col, row, True)] = on_trials trials[(col, row, False)] = off_trials return trials, cell_baselines
[docs] def open_pincushion_plot(self, on, cell_specimen_id=None, color_map=None, cell_index=None): cell_index = self.row_from_cell_id(cell_specimen_id, cell_index) trials, baselines = self.sort_trials() data = self.mean_sweep_response[str(cell_index)].values cplots.make_pincushion_plot(data, trials, on, self.nrows, self.ncols, clim=[baselines[cell_index], data.mean() + data.std() * 3], color_map=color_map, radius=1.0 / 16.0)
[docs] @staticmethod def from_analysis_file(data_set, analysis_file, stimulus): lsn = LocallySparseNoise(data_set, stimulus) lsn.populate_stimulus_table() if stimulus == stimulus_info.LOCALLY_SPARSE_NOISE: stimulus_suffix = stimulus_info.LOCALLY_SPARSE_NOISE_SHORT elif stimulus == stimulus_info.LOCALLY_SPARSE_NOISE_4DEG: stimulus_suffix = stimulus_info.LOCALLY_SPARSE_NOISE_4DEG_SHORT elif stimulus == stimulus_info.LOCALLY_SPARSE_NOISE_8DEG: stimulus_suffix = stimulus_info.LOCALLY_SPARSE_NOISE_8DEG_SHORT try: with h5py.File(analysis_file, "r") as f: k = "analysis/mean_response_%s" % stimulus_suffix if k in f: lsn._mean_response = f[k][()] lsn._sweep_response = pd.read_hdf(analysis_file, "analysis/sweep_response_%s" % stimulus_suffix) lsn._mean_sweep_response = pd.read_hdf( analysis_file, "analysis/mean_sweep_response_%s" % stimulus_suffix) with h5py.File(analysis_file, "r") as f: lsn._cell_index_receptive_field_analysis_data = \ LocallySparseNoise.\ read_cell_index_receptive_field_analysis(f, stimulus) except Exception as e: raise MissingStimulusException(e.args) return lsn
[docs] @staticmethod def save_cell_index_receptive_field_analysis( cell_index_receptive_field_analysis_data, new_nwb, prefix): attr_list = [] file_handle = h5py.File(new_nwb.nwb_file, 'a') if prefix in file_handle['analysis']: del file_handle['analysis'][prefix] f = file_handle.create_group('analysis/%s' % prefix) for x in dict_generator(cell_index_receptive_field_analysis_data): if x[-2] == 'data': f['/'.join(x[:-1])] = x[-1] elif x[-3] == 'attrs': attr_list.append(x) else: raise Exception for x in attr_list: # replace None => nan before writing # set array type to float for ii, item in enumerate(x): if isinstance(item, np.ndarray): if item.dtype == np.dtype('O'): item[item == None] = np.nan # noqa E711 x[ii] = np.array(item, dtype=float) if len(x) > 3: f['/'.join(x[:-3])].attrs[x[-2]] = x[-1] else: assert len(x) == 3 if x[-1] is None: f.attrs[x[-2]] = np.NaN else: f.attrs[x[-2]] = x[-1] file_handle.close()
[docs] @staticmethod def read_cell_index_receptive_field_analysis(file_handle, prefix, path=None): k = 'analysis/%s' % prefix if k in file_handle: f = file_handle['analysis/%s' % prefix] if path is None: rf = read_h5_group(f) else: rf = read_h5_group(f[path]) return rf else: return None