Source code for allensdk.mouse_connectivity.grid.subimage.base_subimage

from __future__ import division
import logging
import sys
import functools

import numpy as np
from scipy.ndimage.interpolation import zoom
from six import iteritems

from allensdk.mouse_connectivity.grid.utilities import image_utilities as iu


#==============================================================================


[docs]class SubImage(object): @property def pixel_counter(self): if not hasattr(self, '_pixel_counter'): self._pixel_counter = self.make_pixel_counter() return self._pixel_counter def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs): self.reduce_level = reduce_level self.in_dims = np.around(in_dims / 2**reduce_level).astype(int) self.in_spacing = in_spacing * 2**reduce_level self.coarse_spacing = coarse_spacing self.blocks, self.coarse_dims = iu.grid_image_blocks( self.in_dims, self.in_spacing, self.coarse_spacing) self.images = {} self.accumulators = {}
[docs] def setup_images(self): pass
[docs] def compute_coarse_planes(self): raise NotImplementedError()
[docs] def binarize(self, image_name): logging.info('binarizing {0}'.format(image_name)) self.images[image_name][np.nonzero(self.images[image_name])] = 1
[docs] def apply_mask(self, image_name, mask_name, positive=True): logging.info('applying {0} mask to {1}'.format(mask_name, image_name)) mask = self.images[mask_name] if not positive: mask = np.logical_not(mask) mask = mask.astype(np.uint8) self.images[image_name] = np.multiply(self.images[image_name], mask)
[docs] def make_pixel_counter(self): fn = lambda x: np.sum(x) * 2 ** ( self.reduce_level + 1) # additional x2 <- is an area return functools.partial(iu.block_apply, out_shape=self.coarse_dims, dtype=np.float32, blocks=self.blocks, fn=fn)
[docs] def apply_pixel_counter(self, accumulator_name, image): self.accumulators[accumulator_name] = self.pixel_counter(image)
#==============================================================================
[docs]class SegmentationSubImage(SubImage): required_segmentations = [] def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing, segmentation_paths, *args, **kwargs): super(SegmentationSubImage, self).__init__( reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs) self.segmentation_paths = segmentation_paths if 'filter_bit' in kwargs and kwargs['filter_bit'] is not None: self.filter = 2 ** kwargs['filter_bit']
[docs] def setup_images(self): super(SegmentationSubImage, self).setup_images() self.get_segmentation()
[docs] def get_segmentation(self): for name in self.__class__.required_segmentations: self.read_segmentation_image(name) self.process_segmentation()
[docs] def process_segmentation(self): pass
[docs] def extract_signal_from_segmentation(self, segmentation_name='segmentation', signal_name='signal'): ''' Notes ----- Currently, the segmentation uses a series of codes to map 8-bit values onto meaningful classifications. The code for signal pixels is a 1 in the leftmost bit. In some cases, bit 5 indicates that the pixel was not removed in a posfiltering process. Optionally, this postfilter can be applied in gridding. ''' logging.info('extracting {0} mask'.format(signal_name)) self.images[signal_name] = np.right_shift(self.images[segmentation_name], 7) signal_count = np.count_nonzero(self.images[signal_name]) logging.info('{0} signal pixels were detected'.format(signal_count)) if hasattr(self, 'filter'): filter_mask = np.bitwise_and(self.images[segmentation_name], self.filter) self.images[signal_name][filter_mask == 0] = 0 filter_count = np.count_nonzero(self.images[signal_name]) logging.info('{0} / {1} pixels passed the signal filter'.format(filter_count, signal_count))
[docs] def extract_injection_from_segmentation(self, segmentation_name='segmentation', injection_name='injection'): ''' Notes ----- Currently, the segmentation uses a series of codes to map 8-bit values onto meaningful classifications. The code for signal pixels is a 1 in at least one of of the 5 rightmost bits. ''' logging.info('extracting {0} mask'.format(injection_name)) self.images[injection_name] = np.bitwise_and(self.images[segmentation_name], 31) self.images[injection_name][self.images[injection_name] > 0] = 1
[docs] def read_segmentation_image(self, segmentation_name='segmentation'): ''' Notes ----- We downsample in memory rather than using the jp2 pyramid because the segmentation is a label image. ''' path = self.segmentation_paths[segmentation_name] logging.info('loading {} from {}'.format(segmentation_name, path)) segmentation = iu.read_segmentation_image(path) logging.info('{} shape: {}'.format(segmentation_name, segmentation.shape)) if self.reduce_level > 0: logging.info('downsampling {0}'.format(segmentation_name)) segmentation = zoom(segmentation, 1.0 / 2**self.reduce_level, order=0) self.images[segmentation_name] = segmentation
#==============================================================================
[docs]class IntensitySubImage(SubImage): required_intensities = [] def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing, intensity_paths, *args, **kwargs): super(IntensitySubImage, self).__init__(reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs) self.intensity_paths = intensity_paths
[docs] def get_intensity(self): for name in self.__class__.required_intensities: info = self.intensity_paths[name] logging.info('loading {} intensities from {}'.format(name, info['path'])) self.images[name] = iu.read_intensity_image(info['path'], self.reduce_level, info['channel']) logging.info('loaded {} intensities to image of shape: {}'.format(name, self.images[name].shape))
[docs] def setup_images(self): super(IntensitySubImage, self).setup_images() self.get_intensity()
#==============================================================================
[docs]class PolygonSubImage(SubImage): required_polys = [] optional_polys = [] def __init__(self, reduce_level, in_dims, in_spacing, coarse_spacing, polygon_info, *args, **kwargs): super(PolygonSubImage, self).__init__( reduce_level, in_dims, in_spacing, coarse_spacing, *args, **kwargs) self.polygon_info = polygon_info
[docs] def setup_images(self): super(PolygonSubImage, self).setup_images() self.get_polygons()
[docs] def get_polygons(self): polygon_keys = [] polygon_keys.extend(self.__class__.optional_polys) polygon_keys.extend(self.__class__.required_polys) for key in polygon_keys: logging.info('rasterizing {0} polygon'.format(key)) points = self.polygon_info[key] self.images[key] = iu.rasterize_polygons(self.in_dims.astype(int)[::-1], [1.0 / 2**self.reduce_level, 1.0 / 2**self.reduce_level], points).T
#==============================================================================
[docs]def run_subimage(input_data): # TODO: not propagating the log level from the calling thread logging.getLogger('').setLevel(logging.INFO) index = input_data.pop('specimen_tissue_index') cls = input_data.pop('cls') logging.info('handling {0} at index {1}'.format(cls.__name__, index)) si = cls(**input_data) si.setup_images() si.compute_coarse_planes() return index, si.accumulators