Source code for allensdk.internal.brain_observatory.roi_filter_utils

import os
import json
import logging
import scipy.ndimage.measurements as measurements
from scipy.spatial import cKDTree
from allensdk.brain_observatory.roi_masks import create_roi_mask
import pandas as pd
import numpy as np

CRITERIA_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)),
                             "resources",
                             "roi_filter_training_criteria.json")
_CRITERIA = None


[docs]def CRITERIA(): global _CRITERIA if _CRITERIA is None: with open(CRITERIA_FILE, "r") as f: _CRITERIA = json.load(f) return _CRITERIA
[docs]class TrainingLabelClassifier(object): '''Very basic threshold_based classifier. Has a decision function that is just the number of distinct criteria met by the classifier. Criteria are defined as a list of strings used with pandas.DataFrame.eval. Parameters ---------- criteria : list List of evaluation strings. ''' def __init__(self, criteria): '''Constructor.''' if criteria is None: self.criteria = [] else: self.criteria = criteria
[docs] def decision_function(self, X): '''Get the distance from the decision boundary. Parameters ---------- X : array-like Features for each ROI. Returns ------- T : array-like Distance for each sample from the decision boundary. ''' T = np.zeros((X.shape[0],), dtype=int) for crit in self.criteria: T[X.eval(crit).as_matrix()] += 1 return T
[docs]class TrainingMultiLabelClassifier(object): '''Multilabel classifier using groups of TrainingLabelClassifiers. This was used to generate labeling for training the original SVM for classification. Parameters ---------- criteria : dictionary Label names and criteria for each label. ''' def __init__(self, criteria=None): '''Constructor.''' if criteria is None: criteria = CRITERIA() i = 0 self.labels = sorted(criteria.keys()) self._codes = {} self._classifiers = {} for label in self.labels: label_criteria = criteria[label] self._codes[label] = 2**i self._classifiers[2**i] = TrainingLabelClassifier(label_criteria) i += 1 def _labels_as_columns(self, label_codes): '''Convert label series to boolean columns for each label. Parameters ---------- label_codes : pandas.Series Label codes. Returns ------- pandas.DataFrame Dataframe where each column is a label, and values are True for labeled or False otherwise. ''' output = pd.DataFrame() for name in self.labels: number = self._codes[name] output[name] = (label_codes & number) > 0 return output def _map_code_to_list(self, label_code): output = [] for name in self.labels: number = self._codes[name] if (label_code & number) > 0: output.append(name) return output
[docs] def get_eXcluded(self, X): '''Get the calculated value of the eXcluded column. This is useful for comparison with the original classifier implementation. Parameters ---------- X : pandas.DataFrame Object features from the object list file. Returns ------- numpy.ndarray Calculated eXcluded score from the classifier. ''' eXcluded = np.zeros((X.shape[0],), dtype=X["eXcluded"].dtype) for classifier in self._classifiers.values(): eXcluded += classifier.decision_function(X) # match the object list values eXcluded[eXcluded > 0] += 10 eXcluded[X["eXcluded"] == 1] = 1 eXcluded[X["eXcluded"] == 2] = 2 return eXcluded.as_matrix()
[docs] def label_data(self, X, as_columns=True): '''Generate labels for each row in X. Parameters ---------- X : pandas.DataFrame Object features from the object list file. Returns ------- numpy.ndarray Array of label codes representing the combination of labels found for each row. ''' labels = np.zeros((X.shape[0],), dtype=int) for label, classifier in self._classifiers.items(): labels[classifier.decision_function(X) > 0] += label if as_columns: return self._labels_as_columns(labels) else: return pd.Series(labels).apply(self._map_code_to_list)
[docs]def calculate_max_border(motion_df, max_shift): '''Calculate motion boundary from frame offsets. When the motion correction algorithm fails to find sufficient matches, it generates very large frame offsets. The use of `max_shift` avoids filtering too many cells due to the large offsets, with the tradeoff that those frames will be noise. Parameters ---------- motion_df : pandas.DataFrame Dataframe containing the x, y offsets from motion correction. max_shift : float Maximum shift to allow when considering motion correction. Any larger shifts are considered outliers. Returns ------- list [right_shift, left_shift, down_shift, up_shift] ''' # strip outliers x_no_outliers = motion_df["x"][(motion_df["x"] >= -max_shift) & (motion_df["x"] <= max_shift)] y_no_outliers = motion_df["y"][(motion_df["y"] >= -max_shift) & (motion_df["y"] <= max_shift)] right_shift = np.max(-1*x_no_outliers.min(), 0) left_shift = np.max(x_no_outliers.max(), 0) down_shift = np.max(-1*y_no_outliers.min(), 0) up_shift = np.max(y_no_outliers.max(), 0) border = [right_shift, left_shift, down_shift, up_shift] if np.any(np.isnan(np.array(border))): raise ValueError("Motion correction failed.") return border
[docs]def order_rois_by_object_list(object_data, rois): '''Reorder rois by matching bounding boxes to object list. Parameters ---------- object_data : pandas.DataFrame Object list data. rois : list List of RoiMasks. Returns ------- list The list of rois reordered to index the same as object_data. ''' object_points = object_data[["minx", "miny", "maxx", "maxy", "area"]].copy() object_points["maxx"] += 1 object_points["maxy"] += 1 roi_points = [] for roi in rois: roi_points.append([roi.x, roi.y, roi.x+roi.width, roi.y+roi.height, roi.mask.sum()]) reorder_index = get_indices_by_distance(object_points, np.array(roi_points)) multi_mapped = set() if len(set(reorder_index)) != reorder_index.shape[0]: unique, counts = np.unique(reorder_index, return_counts=True) multi_mapped = set(unique[counts > 1]) not_mapped = set(np.setdiff1d(np.arange(reorder_index.shape[0]), reorder_index)) logging.warning("ROIs don't uniquely map to object_list") for idx in (multi_mapped | not_mapped): logging.warning( "%s has ambiguous mapping to object list" % rois[idx].label) out_rois = [] for i in reorder_index: roi = rois[i] if i in multi_mapped: roi.labels.append("duplicate") out_rois.append(roi) return out_rois
[docs]def get_rois(segmentation_stack, border=None): '''Extract a list of rois from the segmentation data array. Parameters ---------- segmentation_stack : numpy.ndarray The array from the maxInt_masks file showing the object masks. border : list [right_shift, left_shift, down_shift, up_shift] bounding box determined from motion correction. Returns ------- list List of RoiMask objects. ''' rois = [] if border is None: border = [0, 0, 0, 0] height = segmentation_stack.shape[1] width = segmentation_stack.shape[2] for i in range(segmentation_stack.shape[0]): page = segmentation_stack[i, :, :] label_mask, num_labels = measurements.label( page, structure=[[1, 1, 1], [1, 1, 1], [1, 1, 1]]) for label in range(1, num_labels + 1): img_mask = label_mask == label mask = create_roi_mask(width, height, border, roi_mask=img_mask, label="ROI {}:{}".format(i, label), mask_group=i) mask.labels = [] if mask.overlaps_motion_border: mask.labels.append("motion_border") rois.append(mask) return rois
[docs]def get_indices_by_distance(object_list_points, mask_points): '''Find indices of nearest neighbor matches. Require a distance of 0 (perfect match) and a unique match between masks and object_list entries. ''' if np.array(mask_points).ndim != 2: raise ValueError("number of dimensions is incorrect. Expected 2 " f"got {np.array(mask_points).ndim}") tree = cKDTree(mask_points) distance, indices = tree.query(object_list_points) if distance.max() > 0: logging.error("An ROI did not match object list exactly.") raise AssertionError("Max match distance greater than 0") return indices