# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2019. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import numpy as np
from pathlib import Path
import logging
[docs]class ContinuousFile():
"""
Represents a continuous (.dat) file, and its associated timestamps
"""
def __init__(self, data_path, timestamps_path, total_num_channels=384, dtype=np.int16):
"""
data_path : str
Path to file containing LFP data. The file is expected to be a raw binary with channels as its fast axis and samples as its slow axis.
timestamps_path : str
Path to file containing timestamps for the associated LFP samples. The file is expected to be a .npy file.
total_num_channels : int, optional
Count of channels on this probe.
dtype : type, optional
The data array will be interpreted as containing samples of this type.
"""
self.data_path = data_path
self.timestamps_path = timestamps_path
self.total_num_channels = total_num_channels
self.dtype = dtype
[docs] def load(self, memmap=False, memmap_thresh = 10e9):
"""
Reads lfp data and timestamps from the filesystem
Parameters:
----------
memmap : bool, optional
If True, the returned data array will be a memory map of the file on disk. Default is True.
memmap_thresh : float, optional
Files above this size in bytes will be memory-mapped, regardless of memmap setting
Returns:
--------
lfp_raw : numpy.ndarray
Contains LFP data read directly off of disk. Dimensions are samples X channels.
timestamps : numpy.ndarray
1D array defining the times at which each LFP sample was taken.
"""
logging.info('loading timestamps from {}'.format(self.timestamps_path))
timestamps = np.load(self.timestamps_path, allow_pickle=False)
logging.info('done loading timestamps from {}. Count: {}'.format(self.timestamps_path, timestamps.size))
bytes_per_sample = self.dtype(0).nbytes
num_samples = timestamps.size * self.total_num_channels
expected_num_bytes = num_samples * bytes_per_sample
logging.info('calculated LFP filesize: {} bytes'.format(expected_num_bytes))
num_bytes = Path(self.data_path).stat().st_size
if not expected_num_bytes == num_bytes:
raise IOError('expected LFP data filesize to be {} bytes, but its size was {} bytes'.format(expected_num_bytes, num_bytes))
shape = (timestamps.size, self.total_num_channels)
logging.info('calculated LFP data shape: {}'.format(shape))
if memmap or num_bytes > memmap_thresh:
logging.info('memmaping LFP file at {}'.format(self.data_path))
lfp_raw = np.memmap(self.data_path, dtype=self.dtype, shape=shape, mode='r')
logging.info('done memmaping LFP file at {}'.format(self.data_path))
else:
with open(self.data_path, 'rb') as data_file:
logging.info('reading LFP file at {}'.format(self.data_path))
lfp_raw = np.frombuffer(data_file.read(), dtype=self.dtype)
logging.info('done reading LFP file at {}'.format(self.data_path))
lfp_raw = lfp_raw.reshape(shape)
return lfp_raw, timestamps
[docs] def get_lfp_channel_order(self):
"""
Returns the channel ordering for LFP data extracted from NPX files.
Parameters:
----------
None
Returns:
---------
channel_order : numpy.ndarray
Contains the actual channel ordering.
"""
remapping_pattern = np.array([0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17, 6, 18, 7, 19,
8, 20, 9, 21, 10, 22, 11, 23, 24, 36, 25, 37, 26, 38,
27, 39, 28, 40, 29, 41, 30, 42, 31, 43, 32, 44, 33, 45, 34, 46, 35, 47])
channel_order = np.concatenate([remapping_pattern + 48*i for i in range(0,8)])
return channel_order