from __future__ import division
import logging
import sys
import functools
import numpy as np
from scipy.ndimage.interpolation import zoom
from six import iteritems
from allensdk.mouse_connectivity.grid.utilities import image_utilities as iu
#==============================================================================
[docs]class SubImage(object):
@property
def pixel_counter(self):
if not hasattr(self, '_pixel_counter'):
self._pixel_counter = self.make_pixel_counter()
return self._pixel_counter
def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing,
*args, **kwargs):
self.reduce_level = reduce_level
self.in_dims = np.around(in_dims / 2**reduce_level).astype(int)
self.in_spacing = in_spacing * 2**reduce_level
self.coarse_spacing = coarse_spacing
self.blocks, self.coarse_dims = iu.grid_image_blocks(
self.in_dims, self.in_spacing, self.coarse_spacing)
self.images = {}
self.accumulators = {}
[docs] def setup_images(self):
pass
[docs] def compute_coarse_planes(self):
raise NotImplementedError()
[docs] def binarize(self, image_name):
logging.info('binarizing {0}'.format(image_name))
self.images[image_name][np.nonzero(self.images[image_name])] = 1
[docs] def apply_mask(self, image_name, mask_name, positive=True):
logging.info('applying {0} mask to {1}'.format(mask_name, image_name))
mask = self.images[mask_name]
if not positive:
mask = np.logical_not(mask)
mask = mask.astype(np.uint8)
self.images[image_name] = np.multiply(self.images[image_name], mask)
[docs] def make_pixel_counter(self):
fn = lambda x: np.sum(x) * 2 ** ( self.reduce_level + 1) # additional x2 <- is an area
return functools.partial(iu.block_apply, out_shape=self.coarse_dims,
dtype=np.float32, blocks=self.blocks,
fn=fn)
[docs] def apply_pixel_counter(self, accumulator_name, image):
self.accumulators[accumulator_name] = self.pixel_counter(image)
#==============================================================================
[docs]class SegmentationSubImage(SubImage):
required_segmentations = []
def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing,
segmentation_paths, *args, **kwargs):
super(SegmentationSubImage, self).__init__(
reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs)
self.segmentation_paths = segmentation_paths
if 'filter_bit' in kwargs and kwargs['filter_bit'] is not None:
self.filter = 2 ** kwargs['filter_bit']
[docs] def setup_images(self):
super(SegmentationSubImage, self).setup_images()
self.get_segmentation()
[docs] def get_segmentation(self):
for name in self.__class__.required_segmentations:
self.read_segmentation_image(name)
self.process_segmentation()
[docs] def process_segmentation(self):
pass
[docs] def read_segmentation_image(self, segmentation_name='segmentation'):
'''
Notes
-----
We downsample in memory rather than using the jp2 pyramid because the
segmentation is a label image.
'''
path = self.segmentation_paths[segmentation_name]
logging.info('loading {} from {}'.format(segmentation_name, path))
segmentation = iu.read_segmentation_image(path)
logging.info('{} shape: {}'.format(segmentation_name, segmentation.shape))
if self.reduce_level > 0:
logging.info('downsampling {0}'.format(segmentation_name))
segmentation = zoom(segmentation, 1.0 / 2**self.reduce_level, order=0)
self.images[segmentation_name] = segmentation
#==============================================================================
[docs]class IntensitySubImage(SubImage):
required_intensities = []
def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing, intensity_paths,
*args, **kwargs):
super(IntensitySubImage, self).__init__(reduce_level, in_dims,
in_spacing, coarse_spacing, *args, **kwargs)
self.intensity_paths = intensity_paths
[docs] def get_intensity(self):
for name in self.__class__.required_intensities:
info = self.intensity_paths[name]
logging.info('loading {} intensities from {}'.format(name, info['path']))
self.images[name] = iu.read_intensity_image(info['path'], self.reduce_level, info['channel'])
logging.info('loaded {} intensities to image of shape: {}'.format(name, self.images[name].shape))
[docs] def setup_images(self):
super(IntensitySubImage, self).setup_images()
self.get_intensity()
#==============================================================================
[docs]class PolygonSubImage(SubImage):
required_polys = []
optional_polys = []
def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing,
polygon_info, *args, **kwargs):
super(PolygonSubImage, self).__init__(
reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs)
self.polygon_info = polygon_info
[docs] def setup_images(self):
super(PolygonSubImage, self).setup_images()
self.get_polygons()
[docs] def get_polygons(self):
polygon_keys = []
polygon_keys.extend(self.__class__.optional_polys)
polygon_keys.extend(self.__class__.required_polys)
for key in polygon_keys:
logging.info('rasterizing {0} polygon'.format(key))
points = self.polygon_info[key]
self.images[key] = iu.rasterize_polygons(self.in_dims.astype(int)[::-1],
[1.0 / 2**self.reduce_level,
1.0 / 2**self.reduce_level],
points).T
#==============================================================================
[docs]def run_subimage(input_data):
# TODO: not propagating the log level from the calling thread
logging.getLogger('').setLevel(logging.INFO)
index = input_data.pop('specimen_tissue_index')
cls = input_data.pop('cls')
logging.info('handling {0} at index {1}'.format(cls.__name__, index))
si = cls(**input_data)
si.setup_images()
si.compute_coarse_planes()
return index, si.accumulators