import os, platform, re, logging
from allensdk.core.json_utilities import read_url_get
HDF5_FILE_TYPE_ID = 306905526
NWB_FILE_TYPE_ID = 475137571
NWB_UNCOMPRESSED_FILE_TYPE_ID = 478840678
NWB_DOWNLOAD_FILE_TYPE_ID = 481007198
METHOD_CONFIG_FILE_TYPE_ID = 324440685
MODEL_PARAMETERS_FILE_TYPE_ID = 329230374
BIOPHYS_MODEL_PARAMETERS_FILE_TYPE_ID = 329230374
[docs]def get_well_known_files_by_type(wkfs, wkf_type_id):
out = [ os.path.join( wkf['storage_directory'], wkf['filename'] )
for wkf in wkfs
if wkf.get('well_known_file_type_id',None) == wkf_type_id ]
if len(out) == 0:
raise IOError("Could not find well known files with type %d." % wkf_type_id)
return out
[docs]def get_well_known_file_by_type(wkfs, wkf_type_id):
out = get_well_known_files_by_type(wkfs, wkf_type_id)
nout = len(out)
if nout != 1:
raise IOError("Expected single well known file with type %d. Got %d." % (wkf_type_id, nout))
return out[0]
[docs]def get_well_known_files_by_name(wkfs, filename):
out = [ os.path.join( wkf['storage_directory'], wkf['filename'] )
for wkf in wkfs
if wkf['filename'] == filename ]
if len(out) == 0:
raise IOError("Could not find well known files with name %s." % filename)
return out
[docs]def get_well_known_file_by_name(wkfs, filename):
out = get_well_known_files_by_name(wkfs, filename)
nout = len(out)
if nout != 1:
raise IOError("Expected single well known file with name %s. Got %d." % (filename, nout))
return out[0]
[docs]def append_well_known_file(wkfs, path, wkf_type_id=None, content_type=None):
record = {
'filename': os.path.basename(path),
'storage_directory': os.path.dirname(path)
}
if wkf_type_id is not None:
record['well_known_file_type_id'] = wkf_type_id
if content_type is not None:
record['content_type'] = content_type
for wkf in wkfs:
if wkf['filename'] == record['filename']:
logging.debug("found existing well known file record for %s, updating", path)
wkf.update(record)
return
logging.debug("could not find existing well known file record for %s, appending", path)
wkfs.append(record)
def _connect(user="limsreader", host="limsdb2", database="lims2", password="limsro", port=5432):
import pg8000
conn = pg8000.connect(user=user, host=host, database=database, password=password, port=port)
return conn, conn.cursor()
def _select(cursor, query):
cursor.execute(query)
columns = [ d[0].decode("utf-8") for d in cursor.description ]
return [ dict(zip(columns, c)) for c in cursor.fetchall() ]
[docs]def select(cursor, query):
raise DeprecationWarning("lims_utilities.select is deprecated. Please use lims_utilities.query instead.")
[docs]def connect(user="limsreader", host="limsdb2", database="lims2", password="limsro", port=5432):
raise DeprecationWarning("lims_utilities.connect is deprecated. Please use lims_utilities.query instead.")
[docs]def query(query, user="limsreader", host="limsdb2", database="lims2", password="limsro", port=5432):
conn, cursor = _connect(user, host, database, password, port)
# Guard against non-ascii characters in query
query = ''.join([i if ord(i) < 128 else ' ' for i in query])
try:
results = _select(cursor, query)
finally:
cursor.close()
conn.close()
return results
[docs]def safe_system_path(file_name):
if platform.system() == "Windows":
return linux_to_windows(file_name)
else:
return convert_from_titan_linux(os.path.normpath(file_name))
[docs]def convert_from_titan_linux(file_name):
# Lookup table mapping project to program
project_to_program= {
"neuralcoding": "braintv",
'0378': "celltypes",
'conn': "celltypes",
'ctyconn': "celltypes",
'humancelltypes': "celltypes",
'mousecelltypes': "celltypes",
'shotconn': "celltypes",
'synapticphys': "celltypes",
'whbi': "celltypes",
'wijem': "celltypes"
}
# Tough intermediary state where we have old paths
# being translated to new paths
m = re.match('/projects/([^/]+)/vol1/(.*)', file_name)
if m:
newpath = os.path.normpath(os.path.join(
'/allen',
'programs',
project_to_program.get(m.group(1),'undefined'),
'production',
m.group(1),
m.group(2)
))
return newpath
return file_name
[docs]def linux_to_windows(file_name):
# Lookup table mapping project to program
project_to_program= {
"neuralcoding": "braintv",
'0378': "celltypes",
'conn': "celltypes",
'ctyconn': "celltypes",
'humancelltypes': "celltypes",
'mousecelltypes': "celltypes",
'shotconn': "celltypes",
'synapticphys': "celltypes",
'whbi': "celltypes",
'wijem': "celltypes"
}
# Simple case for new world order
m = re.match('/allen', file_name)
if m:
return "\\" + file_name.replace('/','\\')
# /data/ paths are being retained (for now)
# this will need to be extended to map directories to
# /allen/{programs,aibs}/workgroups/foo
m = re.match('/data/([^/]+)/(.*)', file_name)
if m:
return os.path.normpath(os.path.join('\\\\aibsdata', m.group(1), m.group(2)))
# Tough intermediary state where we have old paths
# being translated to new paths
m = re.match('/projects/([^/]+)/vol1/(.*)', file_name)
if m:
newpath = os.path.normpath(os.path.join(
'\\\\allen',
'programs',
project_to_program.get(m.group(1),'undefined'),
'production',
m.group(1),
m.group(2)
))
return newpath
# No matches found. Clean up and return path given to us
return os.path.normpath(file_name)