# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2017. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
from __future__ import division, print_function, absolute_import
from collections import defaultdict
import operator as op
import functools
import os
import csv
from scipy.ndimage.interpolation import zoom
import numpy as np
import nrrd
import pandas as pd
from allensdk.core.structure_tree import StructureTree
[docs]class ReferenceSpace(object):
@property
def direct_voxel_map(self):
if not hasattr(self, '_direct_voxel_map'):
self.direct_voxel_counts()
return self._direct_voxel_map
@direct_voxel_map.setter
def direct_voxel_map(self, data):
self._direct_voxel_map = data
@property
def total_voxel_map(self):
if not hasattr(self, '_total_voxel_map'):
self.total_voxel_counts()
return self._total_voxel_map
@total_voxel_map.setter
def total_voxel_map(self, data):
self._total_voxel_map = data
def __init__(self, structure_tree, annotation, resolution):
'''Handles brain structures in a 3d reference space
Parameters
----------
structure_tree : StructureTree
Defines the heirarchy and properties of the brain structures.
annotation : numpy ndarray
3d volume whose elements are structure ids.
resolution : length-3 tuple of numeric
Resolution of annotation voxels along each dimension.
'''
self.structure_tree = structure_tree
self.resolution = resolution
self.annotation = np.ascontiguousarray(annotation)
[docs] def direct_voxel_counts(self):
'''Determines the number of voxels directly assigned to one or more
structures.
Returns
-------
dict :
Keys are structure ids, values are the number of voxels directly
assigned to those structures.
'''
uniques = np.unique(self.annotation, return_counts=True)
found = {k: v for k, v in zip(*uniques) if k != 0}
self._direct_voxel_map = {k: (found[k] if k in found else 0) for k
in self.structure_tree.node_ids()}
[docs] def total_voxel_counts(self):
'''Determines the number of voxels assigned to a structure or its
descendants
Returns
-------
dict :
Keys are structure ids, values are the number of voxels assigned
to structures' descendants.
'''
self._total_voxel_map = {}
for stid in self.structure_tree.node_ids():
desc_ids = self.structure_tree.descendant_ids([stid])[0]
self._total_voxel_map[stid] = sum([self.direct_voxel_map[dscid]
for dscid in desc_ids])
[docs] def remove_unassigned(self, update_self=True):
'''Obtains a structure tree consisting only of structures that have
at least one voxel in the annotation.
Parameters
----------
update_self : bool, optional
If True, the contained structure tree will be replaced,
Returns
-------
list of dict :
elements are filtered structures
'''
structures = self.structure_tree.filter_nodes(
lambda x: self.total_voxel_map[x['id']] > 0)
if update_self:
self.structure_tree = StructureTree(structures)
return structures
[docs] def make_structure_mask(self, structure_ids, direct_only=False):
'''Return an indicator array for one or more structures
Parameters
----------
structure_ids : list of int
Make a mask that indicates the union of these structures' voxels
direct_only : bool, optional
If True, only include voxels directly assigned to a structure in
the mask. Otherwise include voxels assigned to descendants.
Returns
-------
numpy ndarray :
Same shape as annotation. 1 inside mask, 0 outside.
'''
if direct_only:
mask = np.zeros(self.annotation.shape, dtype=np.uint8, order='C')
for stid in structure_ids:
if self.direct_voxel_map[stid] == 0:
continue
mask[self.annotation == stid] = True
return mask
else:
structure_ids = self.structure_tree.descendant_ids(structure_ids)
structure_ids = set(functools.reduce(op.add, structure_ids))
return self.make_structure_mask(structure_ids, direct_only=True)
[docs] def many_structure_masks(self, structure_ids, output_cb=None,
direct_only=False):
'''Build one or more structure masks and do something with them
Parameters
----------
structure_ids : list of int
Specify structures to be masked
output_cb : function, optional
Must have the following signature: output_cb(structure_id, fn).
On each requested id, fn will be curried to make a mask for that
id. Defaults to returning the structure id and mask.
direct_only : bool, optional
If True, only include voxels directly assigned to a structure in
the mask. Otherwise include voxels assigned to descendants.
Yields
-------
Return values of output_cb called on each structure_id, structure_mask
pair.
Notes
-----
output_cb is called on every yield, so any side-effects (such as
writing to a file) will be carried out regardless of what you do with
the return values. You do actually have to iterate through the output,
though.
'''
if output_cb is None:
output_cb = ReferenceSpace.return_mask_cb
for stid in structure_ids:
yield output_cb(stid, functools.partial(self.make_structure_mask,
[stid], direct_only))
[docs] def check_coverage(self, structure_ids, domain_mask):
'''Determines whether a spatial domain is completely covered by
structures in a set.
Parameters
----------
structure_ids : list of int
Specifies the set of structures to check.
domain_mask : numpy ndarray
Same shape as annotation. 1 inside the mask, 0 out. Specifies
spatial domain.
Returns
-------
numpy ndarray :
1 where voxels are missing from the candidate, 0 where the
candidate exceeds the domain
'''
candidate_mask = self.make_structure_mask(structure_ids)
return domain_mask - candidate_mask
[docs] def validate_structures(self, structure_ids, domain_mask):
'''Determines whether a set of structures produces an exact and
nonoverlapping tiling of a spatial domain
Parameters
----------
structure_ids : list of int
Specifies the set of structures to check.
domain_mask : numpy ndarray
Same shape as annotation. 1 inside the mask, 0 out. Specifies
spatial domain.
Returns
-------
set :
Ids of structures that are the ancestors of other structures in
the supplied set.
numpy ndarray :
Indicator for missing voxels.
'''
return [self.structure_tree.has_overlaps(structure_ids),
self.check_coverage(structure_ids, domain_mask)]
[docs] def downsample(self, target_resolution):
'''Obtain a smaller reference space by downsampling
Parameters
----------
target_resolution : tuple of numeric
Resolution in microns of the output space.
interpolator : string
Method used to interpolate the volume. Currently only 'nearest'
is supported
Returns
-------
ReferenceSpace :
A new ReferenceSpace with the same structure tree and a
downsampled annotation.
'''
factors = [ float(ii / jj) for ii, jj in zip(self.resolution,
target_resolution)]
target = zoom(self.annotation, factors, order=0)
return ReferenceSpace(self.structure_tree, target, target_resolution)
[docs] def get_slice_image(self, axis, position, cmap=None):
'''Produce a AxBx3 RGB image from a slice in the annotation
Parameters
----------
axis : int
Along which to slice the annotation volume. 0 is coronal, 1 is
horizontal, and 2 is sagittal.
position : int
In microns. Take the slice from this far along the specified axis.
cmap : dict, optional
Keys are structure ids, values are rgb triplets. Defaults to
structure rgb_triplets.
Returns
-------
np.ndarray :
RGB image array.
Notes
-----
If you assign a custom colormap, make sure that you take care of the
background in addition to the structures.
'''
if cmap is None:
cmap = self.structure_tree.get_colormap()
cmap[0] = [0, 0, 0]
position = int(np.around(position / self.resolution[axis]))
image = np.squeeze(self.annotation.take([position], axis=axis))
return np.reshape([cmap[point] for point in image.flat],
list(image.shape) + [3]).astype(np.uint8)
[docs] def export_itksnap_labels(self, id_type=np.uint16, label_description_kwargs=None):
'''Produces itksnap labels, remapping large ids if needed.
Parameters
----------
id_type : np.integer, optional
Used to determine the type of the output annotation and whether ids need to be remapped to smaller values.
label_description_kwargs : dict, optional
Keyword arguments passed to StructureTree.export_label_description
Returns
-------
np.ndarray :
Annotation volume, remapped if needed
pd.DataFrame
label_description dataframe
'''
if label_description_kwargs is None:
label_description_kwargs = {}
label_description = self.structure_tree.export_label_description(**label_description_kwargs)
if np.any(label_description['IDX'].values > np.iinfo(id_type).max):
label_description = label_description.sort_values(by='LABEL')
label_description = label_description.reset_index(drop=True)
new_annotation = np.zeros(self.annotation.shape, dtype=id_type)
id_map = {}
for ii, idx in enumerate(label_description['IDX'].values):
id_map[idx] = ii + 1
new_annotation[self.annotation == idx] = ii + 1
label_description['IDX'] = label_description.apply(lambda row: id_map[row['IDX']], axis=1)
return new_annotation, label_description
return self.annotation, label_description
[docs] def write_itksnap_labels(self, annotation_path, label_path, **kwargs):
'''Generate a label file (nrrd) and a label_description file (csv) for use with ITKSnap
Parameters
----------
annotation_path : str
write generated label file here
label_path : str
write generated label_description file here
**kwargs :
will be passed to self.export_itksnap_labels
'''
annotation, labels = self.export_itksnap_labels(**kwargs)
nrrd.write(annotation_path, annotation, header={'spacings': self.resolution})
labels.to_csv(label_path, sep=' ', index=False, header=False, quoting=csv.QUOTE_NONNUMERIC)
[docs] @staticmethod
def return_mask_cb(structure_id, fn):
'''A basic callback for many_structure_masks
'''
return structure_id, fn()
[docs] @staticmethod
def check_and_write(base_dir, structure_id, fn):
'''A many_structure_masks callback that writes the mask to a nrrd file
if the file does not already exist.
'''
mask_path = os.path.join(base_dir,
'structure_{0}.nrrd'.format(structure_id))
if not os.path.exists(mask_path):
nrrd.write(mask_path, fn())
return structure_id