Source code for allensdk.internal.brain_observatory.roi_filter

import itertools
from six.moves import cPickle
import logging
from allensdk.internal.brain_observatory import roi_filter_utils
import allensdk.internal.brain_observatory.mask_set as mask_set
from allensdk.brain_observatory.roi_masks import create_roi_mask_array

try:
    from sklearn.model_selection import cross_val_score
except ImportError:
    from sklearn.cross_validation import cross_val_score
from sklearn import __version__ as sklearn_version
import numpy as np
import pandas as pd


[docs]class ROIClassifier(object): '''Wrapper for machine learning classifier. Provides an underlying classifier model implementing `fit`, `score`, and `predict`. Tracks additional information for constructing the feature array from input datastreams, as well as training data used and cross validation scores generated. Parameters ---------- model_data : dictionary Dictionary of classifier properties `sklearn_version`: Version of sklearn used for training. `model`: Underlying classifier. `training_features`: Feature set used to train model. `training_labels`: Label set used to train model. `trimmed_features`: Features to remove from input data. `structure_ids`: Structure ID set used for training. `drivers`: Driver set used for training. `reporters`: Reporter set used for training. `other_appended_labels`: Labels appended outside model. `cross_validation_scores`: Cross validation if generated. ''' def __init__(self, model_data=None): '''Constructor.''' if model_data is None: model_data = {} self.sklearn_version = sklearn_version model_sklearn = model_data.get("sklearn_version", None) if sklearn_version != model_sklearn: logging.warning("Using sklearn %s, model trained using %s", sklearn_version, model_sklearn) self.model = model_data.get("model", None) self.training_features = model_data.get("training_features", pd.DataFrame()) self.training_labels = model_data.get("training_labels", pd.DataFrame()) self.trimmed_features = model_data.get("trimmed_features", []) self.structure_ids = model_data.get("structure_ids", []) self.drivers = model_data.get("drivers", []) self.reporters = model_data.get("reporters", []) self.other_appended_labels = model_data.get("other_appended_labels", []) # this is a harsh score for multilabel because it requires ALL # labels predicted self.cross_validation_scores = model_data.get( "cross_validation_scores", None) self.unexpected_features = [] @property def model_data(self): '''The classifier properties as a dictionary.''' data = {"model": self.model, "training_features": self.training_features, "training_labels": self.training_labels, "trimmed_features": self.trimmed_features, "structure_ids": self.structure_ids, "drivers": self.drivers, "reporters": self.reporters, "other_appended_labels": self.other_appended_labels, "sklearn_version": self.sklearn_version, "cross_validation_scores": self.cross_validation_scores} return data @property def label_names(self): '''Return label names for the classifier.''' return self.training_labels.columns
[docs] def create_feature_array(self, object_data, depth, structure_id, drivers, reporters): '''Creates feature array from input data. See Also -------- create_feature_array : Create a feature array given model and inputs ''' features = create_feature_array(self.model_data, object_data, depth, structure_id, drivers, reporters)
[docs] def get_labels(self, object_data, depth, structure_id, drivers, reporters): '''Generate labels from input data. See Also -------- ROIClassifier.create_feature_array ''' features = create_feature_array(self.model_data, object_data, depth, structure_id, drivers, reporters) self.unexpected_features = get_unexpected_features( self.model_data, object_data, structure_id, drivers, reporters) return self.predict(features)
[docs] def fit(self, features, labels): '''Fit model to data. Parameters ---------- features : pandas.DataFrame Training feature set. labels : pandas.DataFrame Training labels. ''' self.training_features = features self.training_labels = labels self.model.fit(features, labels)
[docs] def score(self, features, labels): '''Calculate classifier score on data.''' return self.model.score(features, labels)
[docs] def predict(self, features): '''Generate classification labels given features.''' return self.model.predict(features)
[docs] def cross_validate(self, features, labels, n_folds=5, n_jobs=1): '''Generate cross-validation scores for the classifier. Parameters ---------- features : pandas.DataFrame Set of features for classification. labels : pandas.DataFrame Set of ground truth labels for training and evaluation. n_folds : int Number of folds for K-Fold cross-validation. n_jobjs : int Number of CPUs to use. Returns ------- numpy.ndarray `n_folds` cross-validation scores. ''' self.cross_validation_scores = cross_val_score( self.model, features, labels, cv=n_folds, n_jobs=n_jobs) return self.cross_validation_scores
[docs] def save(self, filename): '''Save the classifier to file by pickling.''' with open(filename, "wb") as f: cPickle.dump(self.model_data, f)
[docs] @staticmethod def from_file(filename): '''Load an ROIClassifier from file.''' with open(filename, "rb") as f: return ROIClassifier(cPickle.load(f))
[docs]def mean_gray_to_sigma(meanInt0, snpoffsetstdv): '''Calculate intensity variation used in prior code. Parameters ---------- meanInt0 : pandas.Series Array of intensity averages. snpoffsetstdv : pandas.Series Array of soma-neuropil standard deviations. Returns ------- pandas.Series meanInt0/snpoffsetstdv, preventing Inf (returns as 0). ''' mean_gray_to_sigma = meanInt0 / snpoffsetstdv.astype(float) mean_gray_to_sigma[snpoffsetstdv == 0.0] = 0 return mean_gray_to_sigma
[docs]def create_feature_array(model_data, object_data, depth, structure_id, drivers, reporters): '''Create feature array from input data. This creates the feature array with column ordering matching what the classifier was trained on. Parameters ---------- model_data : dictionary Dictionary containing information about the machine learning model and training set. object_data : pandas.DataFrame Object list data. depth : float Imaging depth of the experiment. structure_id : string Targeted structure id. drivers : list List of drivers for the mouse. reporters : list List of reporters for the mouse. ''' training_features = model_data["training_features"].columns if np.isnan(depth): depth = 0 meanGrayToSigma = mean_gray_to_sigma( object_data["meanInt0"], object_data["snpoffsetstdv"]) features = pd.DataFrame() for column in training_features: if column == "depth": features[column] = depth # special case that isn't in object list elif column == "meanGrayToSigma": features[column] = meanGrayToSigma elif column in model_data["structure_ids"]: features[column] = int(structure_id == column) elif column in model_data["drivers"]: features[column] = int(column in drivers) elif column in model_data["reporters"]: features[column] = int(column in reporters) elif column in object_data.columns: features[column] = object_data[column] else: logging.error("Feature %s missing from input data", column) raise KeyError( "Feature {} missing from input data".format(column)) return features
[docs]def get_unexpected_features(model_data, object_data, structure_id, drivers, reporters): '''Get list of incoming features that weren't in traning data. Parameters ---------- model_data : dictionary Dictionary containing information about the machine learning model and training set. object_data : pandas.DataFrame Object list data. structure_id : string Targeted structure id. drivers : list List of drivers for the mouse. reporters : list List of reporters for the mouse. ''' training_features = model_data["training_features"].columns trimmed_features = model_data["trimmed_features"] inputs = list(itertools.chain(object_data.columns, [structure_id], drivers, reporters)) unexpected_features = [] for feature in inputs: if (feature not in training_features) and \ (feature not in trimmed_features): unexpected_features.append(feature) return unexpected_features
[docs]def label_unions_and_duplicates(rois, overlap_threshold): '''Detect unions and duplicates and label ROIs.''' masks = create_roi_mask_array(rois) valid_masks = np.ones(masks.shape[0]).astype(bool) ms = mask_set.MaskSet(masks=masks) # detect and label duplicates duplicates = ms.detect_duplicates(overlap_threshold) for duplicate in duplicates: index = duplicate[0] if "duplicate" not in rois[index].labels: rois[index].labels.append("duplicate") valid_masks[index] = False # detect and label unions only for remaining valid masks valid_idxs = np.where(valid_masks) ms = mask_set.MaskSet(masks=masks[valid_idxs].astype(bool)) unions = ms.detect_unions() if unions: union_idxs = list(unions.keys()) idxs = valid_idxs[0][union_idxs] for idx in idxs: if "union" not in rois[idx].labels: rois[idx].labels.append("union") return rois
[docs]def apply_labels(rois, label_array, label_names): '''Apply labels to rois. Parameters ---------- rois : list List of RoiMask objects sorted to `label_array` order. label_array : numpy.ndarray Label array output from classifier. label_names : list Names to apply to columns of `label_array`. Returns ------- list List of ROIs with labels appended. ''' label_df = pd.DataFrame(data=label_array, columns=label_names) label_lists = label_df.apply(_column_match).apply( _compress_to_list, args=(label_df.columns,), axis=1) for i, roi in enumerate(rois): roi.labels.extend(label_lists[i]) return rois
def _column_match(column): return column == 1 def _compress_to_list(row, names): '''Get names that have value 1 in row.''' return list(names[row.values])