import os
import json
import logging
import scipy.ndimage.measurements as measurements
from scipy.spatial import cKDTree
from allensdk.brain_observatory.roi_masks import create_roi_mask
import pandas as pd
import numpy as np
CRITERIA_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)),
"resources",
"roi_filter_training_criteria.json")
_CRITERIA = None
[docs]def CRITERIA():
global _CRITERIA
if _CRITERIA is None:
with open(CRITERIA_FILE, "r") as f:
_CRITERIA = json.load(f)
return _CRITERIA
[docs]class TrainingLabelClassifier(object):
'''Very basic threshold_based classifier.
Has a decision function that is just the number of distinct
criteria met by the classifier. Criteria are defined as a list
of strings used with pandas.DataFrame.eval.
Parameters
----------
criteria : list
List of evaluation strings.
'''
def __init__(self, criteria):
'''Constructor.'''
if criteria is None:
self.criteria = []
else:
self.criteria = criteria
[docs] def decision_function(self, X):
'''Get the distance from the decision boundary.
Parameters
----------
X : array-like
Features for each ROI.
Returns
-------
T : array-like
Distance for each sample from the decision boundary.
'''
T = np.zeros((X.shape[0],), dtype=int)
for crit in self.criteria:
T[X.eval(crit).as_matrix()] += 1
return T
[docs]class TrainingMultiLabelClassifier(object):
'''Multilabel classifier using groups of TrainingLabelClassifiers.
This was used to generate labeling for training the original SVM
for classification.
Parameters
----------
criteria : dictionary
Label names and criteria for each label.
'''
def __init__(self, criteria=None):
'''Constructor.'''
if criteria is None:
criteria = CRITERIA()
i = 0
self.labels = sorted(criteria.keys())
self._codes = {}
self._classifiers = {}
for label in self.labels:
label_criteria = criteria[label]
self._codes[label] = 2**i
self._classifiers[2**i] = TrainingLabelClassifier(label_criteria)
i += 1
def _labels_as_columns(self, label_codes):
'''Convert label series to boolean columns for each label.
Parameters
----------
label_codes : pandas.Series
Label codes.
Returns
-------
pandas.DataFrame
Dataframe where each column is a label, and values are
True for labeled or False otherwise.
'''
output = pd.DataFrame()
for name in self.labels:
number = self._codes[name]
output[name] = (label_codes & number) > 0
return output
def _map_code_to_list(self, label_code):
output = []
for name in self.labels:
number = self._codes[name]
if (label_code & number) > 0:
output.append(name)
return output
[docs] def get_eXcluded(self, X):
'''Get the calculated value of the eXcluded column.
This is useful for comparison with the original classifier
implementation.
Parameters
----------
X : pandas.DataFrame
Object features from the object list file.
Returns
-------
numpy.ndarray
Calculated eXcluded score from the classifier.
'''
eXcluded = np.zeros((X.shape[0],), dtype=X["eXcluded"].dtype)
for classifier in self._classifiers.values():
eXcluded += classifier.decision_function(X)
# match the object list values
eXcluded[eXcluded > 0] += 10
eXcluded[X["eXcluded"] == 1] = 1
eXcluded[X["eXcluded"] == 2] = 2
return eXcluded.as_matrix()
[docs] def label_data(self, X, as_columns=True):
'''Generate labels for each row in X.
Parameters
----------
X : pandas.DataFrame
Object features from the object list file.
Returns
-------
numpy.ndarray
Array of label codes representing the combination of labels
found for each row.
'''
labels = np.zeros((X.shape[0],), dtype=int)
for label, classifier in self._classifiers.items():
labels[classifier.decision_function(X) > 0] += label
if as_columns:
return self._labels_as_columns(labels)
else:
return pd.Series(labels).apply(self._map_code_to_list)
[docs]def calculate_max_border(motion_df, max_shift):
'''Calculate motion boundary from frame offsets.
When the motion correction algorithm fails to find sufficient
matches, it generates very large frame offsets. The use of
`max_shift` avoids filtering too many cells due to the large
offsets, with the tradeoff that those frames will be noise.
Parameters
----------
motion_df : pandas.DataFrame
Dataframe containing the x, y offsets from motion correction.
max_shift : float
Maximum shift to allow when considering motion correction. Any
larger shifts are considered outliers.
Returns
-------
list
[right_shift, left_shift, down_shift, up_shift]
'''
# strip outliers
x_no_outliers = motion_df["x"][(motion_df["x"] >= -max_shift)
& (motion_df["x"] <= max_shift)]
y_no_outliers = motion_df["y"][(motion_df["y"] >= -max_shift)
& (motion_df["y"] <= max_shift)]
right_shift = np.max(-1*x_no_outliers.min(), 0)
left_shift = np.max(x_no_outliers.max(), 0)
down_shift = np.max(-1*y_no_outliers.min(), 0)
up_shift = np.max(y_no_outliers.max(), 0)
border = [right_shift, left_shift, down_shift, up_shift]
if np.any(np.isnan(np.array(border))):
raise ValueError("Motion correction failed.")
return border
[docs]def order_rois_by_object_list(object_data, rois):
'''Reorder rois by matching bounding boxes to object list.
Parameters
----------
object_data : pandas.DataFrame
Object list data.
rois : list
List of RoiMasks.
Returns
-------
list
The list of rois reordered to index the same as object_data.
'''
object_points = object_data[["minx",
"miny",
"maxx",
"maxy",
"area"]].copy()
object_points["maxx"] += 1
object_points["maxy"] += 1
roi_points = []
for roi in rois:
roi_points.append([roi.x, roi.y, roi.x+roi.width, roi.y+roi.height,
roi.mask.sum()])
reorder_index = get_indices_by_distance(object_points,
np.array(roi_points))
multi_mapped = set()
if len(set(reorder_index)) != reorder_index.shape[0]:
unique, counts = np.unique(reorder_index, return_counts=True)
multi_mapped = set(unique[counts > 1])
not_mapped = set(np.setdiff1d(np.arange(reorder_index.shape[0]),
reorder_index))
logging.warning("ROIs don't uniquely map to object_list")
for idx in (multi_mapped | not_mapped):
logging.warning(
"%s has ambiguous mapping to object list" % rois[idx].label)
out_rois = []
for i in reorder_index:
roi = rois[i]
if i in multi_mapped:
roi.labels.append("duplicate")
out_rois.append(roi)
return out_rois
[docs]def get_rois(segmentation_stack, border=None):
'''Extract a list of rois from the segmentation data array.
Parameters
----------
segmentation_stack : numpy.ndarray
The array from the maxInt_masks file showing the object masks.
border : list
[right_shift, left_shift, down_shift, up_shift] bounding box
determined from motion correction.
Returns
-------
list
List of RoiMask objects.
'''
rois = []
if border is None:
border = [0, 0, 0, 0]
height = segmentation_stack.shape[1]
width = segmentation_stack.shape[2]
for i in range(segmentation_stack.shape[0]):
page = segmentation_stack[i, :, :]
label_mask, num_labels = measurements.label(
page, structure=[[1, 1, 1], [1, 1, 1], [1, 1, 1]])
for label in range(1, num_labels + 1):
img_mask = label_mask == label
mask = create_roi_mask(width, height, border,
roi_mask=img_mask,
label="ROI {}:{}".format(i, label),
mask_group=i)
mask.labels = []
if mask.overlaps_motion_border:
mask.labels.append("motion_border")
rois.append(mask)
return rois
[docs]def get_indices_by_distance(object_list_points, mask_points):
'''Find indices of nearest neighbor matches.
Require a distance of 0 (perfect match) and a unique match between
masks and object_list entries.
'''
if np.array(mask_points).ndim != 2:
raise ValueError("number of dimensions is incorrect. Expected 2 "
f"got {np.array(mask_points).ndim}")
tree = cKDTree(mask_points)
distance, indices = tree.query(object_list_points)
if distance.max() > 0:
logging.error("An ROI did not match object list exactly.")
raise AssertionError("Max match distance greater than 0")
return indices