Source code for allensdk.brain_observatory.ecephys.file_io.continuous_file

# Allen Institute Software License - This software license is the 2-clause BSD
# license plus a third clause that prohibits redistribution for commercial
# purposes without further permission.
#
# Copyright 2019. Allen Institute. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# 3. Redistributions for commercial purposes are not permitted without the
# Allen Institute's written permission.
# For purposes of this license, commercial purposes is the incorporation of the
# Allen Institute's software into anything for which you will charge fees or
# other compensation. Contact terms@alleninstitute.org for commercial licensing
# opportunities.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
import numpy as np
from pathlib import Path
import logging


[docs]class ContinuousFile(): """ Represents a continuous (.dat) file, and its associated timestamps """ def __init__(self, data_path, timestamps_path, total_num_channels=384, dtype=np.int16): """ data_path : str Path to file containing LFP data. The file is expected to be a raw binary with channels as its fast axis and samples as its slow axis. timestamps_path : str Path to file containing timestamps for the associated LFP samples. The file is expected to be a .npy file. total_num_channels : int, optional Count of channels on this probe. dtype : type, optional The data array will be interpreted as containing samples of this type. """ self.data_path = data_path self.timestamps_path = timestamps_path self.total_num_channels = total_num_channels self.dtype = dtype
[docs] def load(self, memmap=False, memmap_thresh = 10e9): """ Reads lfp data and timestamps from the filesystem Parameters: ---------- memmap : bool, optional If True, the returned data array will be a memory map of the file on disk. Default is True. memmap_thresh : float, optional Files above this size in bytes will be memory-mapped, regardless of memmap setting Returns: -------- lfp_raw : numpy.ndarray Contains LFP data read directly off of disk. Dimensions are samples X channels. timestamps : numpy.ndarray 1D array defining the times at which each LFP sample was taken. """ logging.info('loading timestamps from {}'.format(self.timestamps_path)) timestamps = np.load(self.timestamps_path, allow_pickle=False) logging.info('done loading timestamps from {}. Count: {}'.format(self.timestamps_path, timestamps.size)) bytes_per_sample = self.dtype(0).nbytes num_samples = timestamps.size * self.total_num_channels expected_num_bytes = num_samples * bytes_per_sample logging.info('calculated LFP filesize: {} bytes'.format(expected_num_bytes)) num_bytes = Path(self.data_path).stat().st_size if not expected_num_bytes == num_bytes: raise IOError('expected LFP data filesize to be {} bytes, but its size was {} bytes'.format(expected_num_bytes, num_bytes)) shape = (timestamps.size, self.total_num_channels) logging.info('calculated LFP data shape: {}'.format(shape)) if memmap or num_bytes > memmap_thresh: logging.info('memmaping LFP file at {}'.format(self.data_path)) lfp_raw = np.memmap(self.data_path, dtype=self.dtype, shape=shape, mode='r') logging.info('done memmaping LFP file at {}'.format(self.data_path)) else: with open(self.data_path, 'rb') as data_file: logging.info('reading LFP file at {}'.format(self.data_path)) lfp_raw = np.frombuffer(data_file.read(), dtype=self.dtype) logging.info('done reading LFP file at {}'.format(self.data_path)) lfp_raw = lfp_raw.reshape(shape) return lfp_raw, timestamps
[docs] def get_lfp_channel_order(self): """ Returns the channel ordering for LFP data extracted from NPX files. Parameters: ---------- None Returns: --------- channel_order : numpy.ndarray Contains the actual channel ordering. """ remapping_pattern = np.array([0, 12, 1, 13, 2, 14, 3, 15, 4, 16, 5, 17, 6, 18, 7, 19, 8, 20, 9, 21, 10, 22, 11, 23, 24, 36, 25, 37, 26, 38, 27, 39, 28, 40, 29, 41, 30, 42, 31, 43, 32, 44, 33, 45, 34, 46, 35, 47]) channel_order = np.concatenate([remapping_pattern + 48*i for i in range(0,8)]) return channel_order