Source code for allensdk.brain_observatory.r_neuropil

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import numpy as np
import scipy.sparse as sparse
from scipy.linalg import solve_banded
import logging


[docs]def get_diagonals_from_sparse(mat): ''' Returns a dictionary of diagonals keyed by offsets Parameters ---------- mat: scipy.sparse matrix Returns ------- dictionary: diagonals keyed by offsets ''' mat_dia = mat.todia() # make sure the matrix is in diagonal format offsets = mat_dia.offsets diagonals = mat_dia.data mat_dict = {} for i, o in enumerate(offsets): mat_dict[o] = diagonals[i] return mat_dict
[docs]def ab_from_diagonals(mat_dict): ''' Constructs value for scipy.linalg.solve_banded Parameters ---------- mat_dict: dictionary of diagonals keyed by offsets Returns ------- ab: value for scipy.linalg.solve_banded ''' offsets = list(mat_dict.keys()) l = -np.min(offsets) u = np.max(offsets) T = mat_dict[offsets[0]].shape[0] ab = np.zeros([l + u + 1, T]) for o in offsets: index = u - o ab[index] = mat_dict[o] return ab
[docs]def error_calc(F_M, F_N, F_C, r): er = np.sqrt(np.mean(np.square(F_C - (F_M - r * F_N)))) / np.mean(F_M) return er
[docs]def error_calc_outlier(F_M, F_N, F_C, r): std_F_M = np.std(F_M) mean_F_M = np.mean(F_M) ind_outlier = np.where(F_M > mean_F_M + 2. * std_F_M) er = np.sqrt(np.mean(np.square( F_C[ind_outlier] - (F_M[ind_outlier] - r * F_N[ind_outlier])))) / np.mean(F_M[ind_outlier]) return er
[docs]def ab_from_T(T, lam, dt): # using csr because multiplication is fast Ls = -sparse.eye(T - 1, T, format='csr') + \ sparse.eye(T - 1, T, 1, format='csr') Ls /= dt Ls2 = Ls.T.dot(Ls) M = sparse.eye(T) + lam * Ls2 mat_dict = get_diagonals_from_sparse(M) ab = ab_from_diagonals(mat_dict) return ab
[docs]def normalize_F(F_M, F_N): F_N_min, F_N_max = float(np.amin(F_N)), float(np.amax(F_N)) # rescale so F_N is [0,1] F_M_s = (F_M - F_N_min) / (F_N_max - F_N_min) F_N_s = (F_N - F_N_min) / (F_N_max - F_N_min) return F_M_s, F_N_s
[docs]def alpha_filter(A=1.0, alpha=0.05, beta=0.25, T=100): return A * np.exp(-alpha * np.arange(T)) - np.exp(-beta * np.arange(T))
[docs]def validate_with_synthetic_F(T, N): """ Compute N synthetic traces of length T with known values of r, then estimate r. TODO: docs """ af1 = alpha_filter() af2 = alpha_filter(alpha=0.1, beta=0.5) r_truth_vals = [] r_est_vals = [] for n in range(N): F_M_truth, F_N_truth, F_C_truth, r_truth = synthesize_F(T, af1, af2) results = estimate_contamination_ratios(F_M_truth, F_N_truth) r_est = results['r'] r_truth_vals.append(r_truth) r_est_vals.append(r_est) return r_truth_vals, r_est_vals
[docs]def synthesize_F(T, af1, af2, p1=0.05, p2=0.1): """ Build a synthetic F_C, F_M, F_N, and r of length T TODO: docs """ x1 = np.random.random(T) < p1 F_C = np.convolve(af1, x1, mode='full')[:T] x2 = np.random.random(T) < p2 F_N = np.convolve(af2, x2, mode='full')[:T] r = 2.0 * np.random.random() F_M = F_C + r * F_N return F_M, F_N, F_C, r
[docs]class NeuropilSubtract(object): """ TODO: docs """ def __init__(self, lam=0.05, dt=1.0, folds=4): self.lam = lam self.dt = dt self.folds = folds self.T = None self.T_f = None self.ab = None self.F_M = None self.F_N = None self.r_vals = None self.error_vals = None self.r = None self.error = None
[docs] def set_F(self, F_M, F_N): """ Break the F_M and F_N traces into the number of folds specified in the class constructor and normalize each fold of F_M and R_N relative to F_N. """ F_M_len = len(F_M) F_N_len = len(F_N) if F_M_len != F_N_len: raise Exception( "F_M and F_N must have the same length (%d vs %d)" % (F_M_len, F_N_len)) if self.T != F_M_len: logging.debug("updating ab matrix for new T=%d", F_M_len) self.T = F_M_len self.T_f = int(self.T / self.folds) self.ab = ab_from_T(self.T_f, self.lam, self.dt) self.F_M = [] self.F_N = [] for fi in range(self.folds): # F_M_i_s, F_N_i_s = normalize_F(F_M[fi*self.T_f:(fi+1)*self.T_f], # F_N[fi*self.T_f:(fi+1)*self.T_f]) self.F_M.append(F_M[fi * self.T_f:(fi + 1) * self.T_f]) self.F_N.append(F_N[fi * self.T_f:(fi + 1) * self.T_f])
[docs] def fit_block_coordinate_desc(self, r_init=5.0, min_delta_r=0.00000001): F_M = np.concatenate(self.F_M) F_N = np.concatenate(self.F_N) r_vals = [] error_vals = [] r = r_init delta_r = None it = 0 ab = ab_from_T(self.T, self.lam, self.dt) while delta_r is None or delta_r > min_delta_r: F_C = solve_banded((1, 1), ab, F_M - r * F_N) new_r = - np.sum((F_C - F_M) * F_N) / np.sum(np.square(F_N)) error = self.estimate_error(new_r) error_vals.append(error) r_vals.append(new_r) if r is not None: delta_r = np.abs(r - new_r) / r r = new_r it += 1 self.r_vals = r_vals self.error_vals = error_vals self.r = r_vals[-1] self.error = error_vals.min()
[docs] def fit(self, r_range=[0.0, 2.0], iterations=3, dr=0.1, dr_factor=0.1): """ Estimate error values for a range of r values. Identify a new r range around the minimum error values and repeat multiple times. TODO: docs """ global_min_error = None global_min_r = None r_vals = [] error_vals = [] it_range = r_range it = 0 it_dr = dr while it < iterations: it_errors = [] # build a set of r values evenly distributed in a current range rs = np.arange(it_range[0], it_range[1], it_dr) # estimate error for each r for r in rs: error = self.estimate_error(r) it_errors.append(error) r_vals.append(r) error_vals.append(error) # find the minimum in this range and update the global minimum min_i = np.argmin(it_errors) min_error = it_errors[min_i] if global_min_error is None or min_error < global_min_error: global_min_error = min_error global_min_r = rs[min_i] logging.debug("iteration %d, r=%0.4f, e=%.6e", it, global_min_r, global_min_error) # if the minimum error is on the upper boundary, # extend the boundary and redo this iteration if min_i == len(it_errors) - 1: logging.debug( "minimum error found on upper r bound, extending range") it_range = [rs[-1], rs[-1] + (rs[-1] - rs[0])] else: # error is somewhere on either side of the minimum error index it_range = [rs[max(min_i - 1, 0)], rs[min(min_i + 1, len(rs) - 1)]] it_dr *= dr_factor it += 1 self.r_vals = r_vals self.error_vals = error_vals self.r = global_min_r self.error = global_min_error
[docs] def estimate_error(self, r): """ Estimate error values for a given r for each fold and return the mean. """ errors = np.zeros(self.folds) for fi in range(self.folds): F_M = self.F_M[fi] F_N = self.F_N[fi] F_C = solve_banded((1, 1), self.ab, F_M - r * F_N) errors[fi] = abs(error_calc(F_M, F_N, F_C, r)) return np.mean(errors)
[docs]def estimate_contamination_ratios(F_M, F_N, lam=0.05, folds=4, iterations=3, r_range=[0.0, 2.0], dr=0.1, dr_factor=0.1): ''' Calculates neuropil contamination of ROI Parameters ---------- F_M: ROI trace F_N: Neuropil trace Returns ------- dictionary: key-value pairs * 'r': the contamination ratio -- corrected trace = M - r*N * 'err': RMS error * 'min_error': minimum error * 'bounds_error': boolean. True if error or R are outside tolerance ''' ns = NeuropilSubtract(lam=lam, folds=folds) ns.set_F(F_M, F_N) ns.fit(r_range=r_range, iterations=iterations, dr=dr, dr_factor=dr_factor) # ns.fit_block_coordinate_desc() if ns.r < 0: logging.warning("r is negative (%f). return 0.0.", ns.r) ns.r = 0 return { "r": ns.r, "r_vals": ns.r_vals, "err": ns.error, "err_vals": ns.error_vals, "min_error": ns.error, "it": len(ns.r_vals) }