import itertools
from six.moves import cPickle
import logging
from allensdk.internal.brain_observatory import roi_filter_utils
import allensdk.internal.brain_observatory.mask_set as mask_set
from allensdk.brain_observatory.roi_masks import create_roi_mask_array
try:
from sklearn.model_selection import cross_val_score
except ImportError:
from sklearn.cross_validation import cross_val_score
from sklearn import __version__ as sklearn_version
import numpy as np
import pandas as pd
[docs]class ROIClassifier(object):
'''Wrapper for machine learning classifier.
Provides an underlying classifier model implementing `fit`,
`score`, and `predict`. Tracks additional information for
constructing the feature array from input datastreams, as well
as training data used and cross validation scores generated.
Parameters
----------
model_data : dictionary
Dictionary of classifier properties
`sklearn_version`: Version of sklearn used for training.
`model`: Underlying classifier.
`training_features`: Feature set used to train model.
`training_labels`: Label set used to train model.
`trimmed_features`: Features to remove from input data.
`structure_ids`: Structure ID set used for training.
`drivers`: Driver set used for training.
`reporters`: Reporter set used for training.
`other_appended_labels`: Labels appended outside model.
`cross_validation_scores`: Cross validation if generated.
'''
def __init__(self, model_data=None):
'''Constructor.'''
if model_data is None:
model_data = {}
self.sklearn_version = sklearn_version
model_sklearn = model_data.get("sklearn_version", None)
if sklearn_version != model_sklearn:
logging.warning("Using sklearn %s, model trained using %s",
sklearn_version, model_sklearn)
self.model = model_data.get("model", None)
self.training_features = model_data.get("training_features",
pd.DataFrame())
self.training_labels = model_data.get("training_labels",
pd.DataFrame())
self.trimmed_features = model_data.get("trimmed_features", [])
self.structure_ids = model_data.get("structure_ids", [])
self.drivers = model_data.get("drivers", [])
self.reporters = model_data.get("reporters", [])
self.other_appended_labels = model_data.get("other_appended_labels",
[])
# this is a harsh score for multilabel because it requires ALL
# labels predicted
self.cross_validation_scores = model_data.get(
"cross_validation_scores", None)
self.unexpected_features = []
@property
def model_data(self):
'''The classifier properties as a dictionary.'''
data = {"model": self.model,
"training_features": self.training_features,
"training_labels": self.training_labels,
"trimmed_features": self.trimmed_features,
"structure_ids": self.structure_ids,
"drivers": self.drivers,
"reporters": self.reporters,
"other_appended_labels": self.other_appended_labels,
"sklearn_version": self.sklearn_version,
"cross_validation_scores": self.cross_validation_scores}
return data
@property
def label_names(self):
'''Return label names for the classifier.'''
return self.training_labels.columns
[docs] def create_feature_array(self, object_data, depth, structure_id, drivers,
reporters):
'''Creates feature array from input data.
See Also
--------
create_feature_array : Create a feature array given model and inputs
'''
features = create_feature_array(self.model_data, object_data, depth,
structure_id, drivers, reporters)
[docs] def get_labels(self, object_data, depth, structure_id, drivers,
reporters):
'''Generate labels from input data.
See Also
--------
ROIClassifier.create_feature_array
'''
features = create_feature_array(self.model_data, object_data, depth,
structure_id, drivers, reporters)
self.unexpected_features = get_unexpected_features(
self.model_data, object_data, structure_id, drivers, reporters)
return self.predict(features)
[docs] def fit(self, features, labels):
'''Fit model to data.
Parameters
----------
features : pandas.DataFrame
Training feature set.
labels : pandas.DataFrame
Training labels.
'''
self.training_features = features
self.training_labels = labels
self.model.fit(features, labels)
[docs] def score(self, features, labels):
'''Calculate classifier score on data.'''
return self.model.score(features, labels)
[docs] def predict(self, features):
'''Generate classification labels given features.'''
return self.model.predict(features)
[docs] def cross_validate(self, features, labels, n_folds=5, n_jobs=1):
'''Generate cross-validation scores for the classifier.
Parameters
----------
features : pandas.DataFrame
Set of features for classification.
labels : pandas.DataFrame
Set of ground truth labels for training and evaluation.
n_folds : int
Number of folds for K-Fold cross-validation.
n_jobjs : int
Number of CPUs to use.
Returns
-------
numpy.ndarray
`n_folds` cross-validation scores.
'''
self.cross_validation_scores = cross_val_score(
self.model, features, labels, cv=n_folds, n_jobs=n_jobs)
return self.cross_validation_scores
[docs] def save(self, filename):
'''Save the classifier to file by pickling.'''
with open(filename, "wb") as f:
cPickle.dump(self.model_data, f)
[docs] @staticmethod
def from_file(filename):
'''Load an ROIClassifier from file.'''
with open(filename, "rb") as f:
return ROIClassifier(cPickle.load(f))
[docs]def mean_gray_to_sigma(meanInt0, snpoffsetstdv):
'''Calculate intensity variation used in prior code.
Parameters
----------
meanInt0 : pandas.Series
Array of intensity averages.
snpoffsetstdv : pandas.Series
Array of soma-neuropil standard deviations.
Returns
-------
pandas.Series
meanInt0/snpoffsetstdv, preventing Inf (returns as 0).
'''
mean_gray_to_sigma = meanInt0 / snpoffsetstdv.astype(float)
mean_gray_to_sigma[snpoffsetstdv == 0.0] = 0
return mean_gray_to_sigma
[docs]def create_feature_array(model_data, object_data, depth, structure_id,
drivers, reporters):
'''Create feature array from input data.
This creates the feature array with column ordering matching what
the classifier was trained on.
Parameters
----------
model_data : dictionary
Dictionary containing information about the machine learning
model and training set.
object_data : pandas.DataFrame
Object list data.
depth : float
Imaging depth of the experiment.
structure_id : string
Targeted structure id.
drivers : list
List of drivers for the mouse.
reporters : list
List of reporters for the mouse.
'''
training_features = model_data["training_features"].columns
if np.isnan(depth):
depth = 0
meanGrayToSigma = mean_gray_to_sigma(
object_data["meanInt0"], object_data["snpoffsetstdv"])
features = pd.DataFrame()
for column in training_features:
if column == "depth":
features[column] = depth
# special case that isn't in object list
elif column == "meanGrayToSigma":
features[column] = meanGrayToSigma
elif column in model_data["structure_ids"]:
features[column] = int(structure_id == column)
elif column in model_data["drivers"]:
features[column] = int(column in drivers)
elif column in model_data["reporters"]:
features[column] = int(column in reporters)
elif column in object_data.columns:
features[column] = object_data[column]
else:
logging.error("Feature %s missing from input data", column)
raise KeyError(
"Feature {} missing from input data".format(column))
return features
[docs]def get_unexpected_features(model_data, object_data, structure_id, drivers,
reporters):
'''Get list of incoming features that weren't in traning data.
Parameters
----------
model_data : dictionary
Dictionary containing information about the machine learning
model and training set.
object_data : pandas.DataFrame
Object list data.
structure_id : string
Targeted structure id.
drivers : list
List of drivers for the mouse.
reporters : list
List of reporters for the mouse.
'''
training_features = model_data["training_features"].columns
trimmed_features = model_data["trimmed_features"]
inputs = list(itertools.chain(object_data.columns, [structure_id],
drivers, reporters))
unexpected_features = []
for feature in inputs:
if (feature not in training_features) and \
(feature not in trimmed_features):
unexpected_features.append(feature)
return unexpected_features
[docs]def label_unions_and_duplicates(rois, overlap_threshold):
'''Detect unions and duplicates and label ROIs.'''
masks = create_roi_mask_array(rois)
valid_masks = np.ones(masks.shape[0]).astype(bool)
ms = mask_set.MaskSet(masks=masks)
# detect and label duplicates
duplicates = ms.detect_duplicates(overlap_threshold)
for duplicate in duplicates:
index = duplicate[0]
if "duplicate" not in rois[index].labels:
rois[index].labels.append("duplicate")
valid_masks[index] = False
# detect and label unions only for remaining valid masks
valid_idxs = np.where(valid_masks)
ms = mask_set.MaskSet(masks=masks[valid_idxs].astype(bool))
unions = ms.detect_unions()
if unions:
union_idxs = list(unions.keys())
idxs = valid_idxs[0][union_idxs]
for idx in idxs:
if "union" not in rois[idx].labels:
rois[idx].labels.append("union")
return rois
[docs]def apply_labels(rois, label_array, label_names):
'''Apply labels to rois.
Parameters
----------
rois : list
List of RoiMask objects sorted to `label_array` order.
label_array : numpy.ndarray
Label array output from classifier.
label_names : list
Names to apply to columns of `label_array`.
Returns
-------
list
List of ROIs with labels appended.
'''
label_df = pd.DataFrame(data=label_array, columns=label_names)
label_lists = label_df.apply(_column_match).apply(
_compress_to_list, args=(label_df.columns,), axis=1)
for i, roi in enumerate(rois):
roi.labels.extend(label_lists[i])
return rois
def _column_match(column):
return column == 1
def _compress_to_list(row, names):
'''Get names that have value 1 in row.'''
return list(names[row.values])