Source code for allensdk.brain_observatory.chisquare_categorical

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Wed Jun  5 15:52:22 2019

@author: dan
"""
# TODO: Fix header

import numpy as np
import warnings


[docs]def chisq_from_stim_table(stim_table, columns, mean_sweep_events, num_shuffles=1000, verbose=False): # stim_table is a pandas DataFrame with len = num_sweeps # columns is a list of column names that define the categories (e.g. ['Ori','Contrast']) # mean_sweep_events is a numpy array with shape (num_sweeps,num_cells) sweep_categories = stim_table_to_categories(stim_table,columns,verbose=verbose) p_vals = compute_chi_shuffle(mean_sweep_events,sweep_categories,num_shuffles=num_shuffles) return p_vals
[docs]def compute_chi_shuffle(mean_sweep_events, sweep_categories, num_shuffles=1000): # mean_sweep_events is a numpy array with shape (num_sweeps,num_cells) # sweep_conditions is a numpy array with shape (num_sweeps) # sweep_conditions gives the category label for each sweep (num_sweeps,num_cells) = np.shape(mean_sweep_events) if len(sweep_categories) != num_sweeps: warnings.warn('sweep_categories and num_sweeps do not match') return np.nan sweep_categories_dummy = make_category_dummy(sweep_categories) expected = compute_expected(mean_sweep_events,sweep_categories_dummy) observed = compute_observed(mean_sweep_events,sweep_categories_dummy) chi_actual = compute_chi(observed,expected) chi_shuffle = np.zeros((num_cells,num_shuffles)) for ns in range(num_shuffles): shuffle_sweeps = np.random.choice(num_sweeps,size=(num_sweeps,)) shuffle_sweep_events = mean_sweep_events[shuffle_sweeps] shuffle_expected = compute_expected(shuffle_sweep_events,sweep_categories_dummy) shuffle_observed = compute_observed(shuffle_sweep_events,sweep_categories_dummy) chi_shuffle[:,ns] = compute_chi(shuffle_observed,shuffle_expected) p_vals = np.mean(chi_actual.reshape(num_cells,1)<chi_shuffle,axis=1) return p_vals
[docs]def stim_table_to_categories(stim_table, columns, verbose=False): # get the categories for all sweeps with each unique combination of # parameters in 'columns' being one category # sweeps with non-finite values in ANY column (e.g. np.NaN) are labeled # as blank sweeps (category = -1) # TODO: Replace with EcephysSession.get_stimulus_conditions num_sweeps = len(stim_table) num_params = len(columns) unique_params = [] options_per_column = [] max_combination = 1 for column in columns: column_params = np.unique(stim_table[column].values) # column_params = column_params[np.isfinite(column_params)] unique_params.append(column_params) options_per_column.append(len(column_params)) max_combination*=len(column_params) category = 0 sweep_categories = -1*np.ones((num_sweeps,)) curr_combination = np.zeros((num_params,),dtype=np.int) options_per_column = np.array(options_per_column).astype(np.int) all_tried = False while not all_tried: matches_combination = np.ones((num_sweeps,),dtype=np.bool) for i_col,column in enumerate(columns): param = unique_params[i_col][curr_combination[i_col]] matches_param = stim_table[column].values == param matches_combination *= matches_param if np.any(matches_combination): sweep_categories[matches_combination] = category if verbose: print('Category ' + str(category)) for i_col,column in enumerate(columns): param = unique_params[i_col][curr_combination[i_col]] print(column + ': ' + str(param)) category+=1 #advance the combination curr_combination = advance_combination(curr_combination,options_per_column) all_tried = curr_combination[0]==options_per_column[0] if verbose: blank_sweeps = sweep_categories==-1 print('num blank: ' + str(blank_sweeps.sum())) return sweep_categories
[docs]def advance_combination(curr_combination, options_per_column): num_cols = len(curr_combination) might_carry = True col = num_cols-1 while might_carry: curr_combination[col] += 1 if col==0 or curr_combination[col]<options_per_column[col]: might_carry = False else: curr_combination[col] = 0 col-=1 return curr_combination
[docs]def make_category_dummy(sweep_categories): #makes a dummy variable version of the sweep category list num_sweeps = len(sweep_categories) categories = np.unique(sweep_categories) num_categories = len(categories) sweep_category_mat = np.zeros((num_sweeps,num_categories),dtype=np.bool) for i_cat,category in enumerate(categories): category_idx = np.argwhere(sweep_categories==category)[:,0] sweep_category_mat[category_idx,i_cat] = True return sweep_category_mat
[docs]def compute_observed(mean_sweep_events,sweep_conditions): (num_sweeps,num_conditions) = np.shape(sweep_conditions) num_cells = np.shape(mean_sweep_events)[1] observed_mat = (mean_sweep_events.T).reshape(num_cells,num_sweeps,1) * sweep_conditions.reshape(1,num_sweeps,num_conditions) observed = np.sum(observed_mat,axis=1) return observed
[docs]def compute_expected(mean_sweep_events,sweep_conditions): num_conditions = np.shape(sweep_conditions)[1] num_cells = np.shape(mean_sweep_events)[1] sweeps_per_condition = np.sum(sweep_conditions,axis=0) events_per_sweep = np.mean(mean_sweep_events,axis=0) expected = sweeps_per_condition.reshape(1,num_conditions) * events_per_sweep.reshape(num_cells,1) return expected
[docs]def compute_chi(observed,expected): chi = (observed - expected) ** 2 /expected chi = np.where(expected>0,chi,0.0) return np.sum(chi,axis=1)
# %%