# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2019. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import logging
import numpy as np
from scipy.signal import decimate, butter, filtfilt
logger = logging.getLogger(__name__)
[docs]def select_channels(total_channels,
surface_channel,
surface_padding,
start_channel_offset,
channel_stride,
channel_order,
noisy_channels=np.array([]),
remove_noisy_channels=False,
reference_channels=np.array([]),
remove_references=False):
"""
Selects a subset of channels for spatial downsampling
Parameters:
----------
total_channels : int
Number of channels in the original data file
surface_channel : int
Index of channel at brain surface
surface_padding : int
Number of channels above surface to save
start_channel_offset : int
First channel to save
channel_stride : int
Number of channels to skip in output
channel_order : np.ndarray
Actual order of LFP channels (needed to account for the bug in NPX extraction)
noisy_channels : numpy.ndarray
Array indicating noisy channels
remove_noisy_channels : bool
Flag to remove noisy channels
reference_channels : numpy.ndarray
Array indicating refence channels
remove_references : bool
Flag to remove reference channels
Returns:
--------
selected_channels : numpy.ndarray
Indices of channels to select (relative to non-remapped data)
actual_channel_numbers : numpy.ndarray
Actual probe channels in subsampled data
"""
assert surface_channel <= total_channels
max_channel = np.min([total_channels, surface_channel + surface_padding])
selected_channels = channel_order[start_channel_offset:max_channel:channel_stride]
actual_channel_numbers = np.arange(total_channels)
actual_channel_numbers = actual_channel_numbers[start_channel_offset:max_channel:channel_stride]
if remove_references or remove_noisy_channels:
# TODO: Is there a case that reference/noisy channels won't be removed? If not then we should remove flags and
# just check if the arrays are empty
logger.info("Before:")
logger.info(actual_channel_numbers)
# create mask to filter out reference channels
mask = (not remove_references) or np.isin(actual_channel_numbers, reference_channels, assume_unique=True,
invert=True)
# mask to remove noisy channels
mask &= (not remove_noisy_channels) or np.isin(actual_channel_numbers, noisy_channels, assume_unique=True,
invert=True)
actual_channel_numbers = actual_channel_numbers[mask]
selected_channels = selected_channels[mask]
logger.info("After:")
logger.info(actual_channel_numbers)
return selected_channels, actual_channel_numbers
[docs]def subsample_timestamps(timestamps, subsampling_factor):
"""
Subsamples an array of timestamps
Parameters:
----------
timestamps : numpy.ndarray
1D array of timestamp values
downsampling_factor : int
Factor by which to subsample the timestamps
Returns:
timestamps_sub : numpy.ndarray
New 1D array of timestamps
"""
return timestamps[::subsampling_factor]
[docs]def subsample_lfp(lfp_raw, selected_channels, subsampling_factor):
"""
Subsamples LFP data
Parameters:
----------
lfp_raw : numpy.ndarray
2D array of LFP values (time x channels)
selected_channels : numpy.ndarray
Indices of channels to select (spatial subsampling)
downsampling_factor : int
Factor by which to subsample in time
Returns:
lfp_subsampled : numpy.ndarray
New 2D array of LFP values
"""
num_samples = len(lfp_raw[::subsampling_factor, 0]) # np.round(lfp_raw.shape[0] / subsampling_factor).astype('int')
num_channels = selected_channels.size
lfp_subsampled = np.zeros((num_samples, num_channels), dtype='int16')
for new_ch, old_ch in enumerate(selected_channels):
tmp = decimate(lfp_raw[:, old_ch], subsampling_factor, ftype='iir', zero_phase=True)
assert(len(tmp) == num_samples)
lfp_subsampled[:, new_ch] = tmp.astype('int16')
return lfp_subsampled
[docs]def remove_lfp_offset(lfp, sampling_frequency, cutoff_frequency, filter_order):
"""
High-pass filters LFP data to remove offset
Parameters:
----------
lfp : numpy.ndarray
2D array of LFP values (time x channels)
sampling_frequency : float
Sampling frequency in Hz
cutoff_frequency : float
Cutoff frequency for highpass filter
filter_order : int
Butterworth filter order
Returns:
lfp_filtered : numpy.ndarray
New 2D array of LFP values
"""
lfp_filtered = np.zeros(lfp.shape, dtype='int16')
b, a = butter(filter_order, cutoff_frequency / (sampling_frequency/2), btype='high')
for ch in range(lfp.shape[1]):
tmp = filtfilt(b, a, lfp[:, ch])
lfp_filtered[:, ch] = tmp.astype('int16')
return lfp_filtered
[docs]def remove_lfp_noise(lfp, surface_channel, channel_numbers, channel_max=384, channel_limit=380):
"""
Subtract mean of channels out of brain to remove noise
Parameters:
----------
lfp : numpy.ndarray
2D array of LFP values (time x channels)
surface_channel : int
Surface channel (relative to original probe)
channel_numbers : numpy.ndarray
Channel numbers in 'lfp' array (relative to original probe)
Returns:
lfp_noise_removed : numpy.ndarray
New 2D array of LFP values
"""
lfp_noise_removed = np.zeros(lfp.shape, dtype='int16')
surface_channel = channel_limit if surface_channel >= channel_max else surface_channel
channel_selection = np.where(channel_numbers > surface_channel)[0]
median_signal_out_of_brain = np.median(lfp[:, channel_selection], 1)
for ch in range(lfp.shape[1]):
tmp = lfp[:, ch] - median_signal_out_of_brain
lfp_noise_removed[:, ch] = tmp.astype('int16')
return lfp_noise_removed