Source code for allensdk.internal.api
from typing import Optional
import psycopg2
import psycopg2.extras
import pandas as pd
from allensdk import one, OneResultExpectedError
from allensdk.core.authentication import DbCredentials, credential_injector
[docs]class OneOrMoreResultExpectedError(RuntimeError):
pass
[docs]def psycopg2_select(query, database, host, port, username, password):
connection = psycopg2.connect(
host=host, port=port, dbname=database,
user=username, password=password,
cursor_factory=psycopg2.extras.RealDictCursor
)
cursor = connection.cursor()
try:
cursor.execute(query)
response = cursor.fetchall()
finally:
cursor.close()
connection.close()
return pd.DataFrame(response)
[docs]class PostgresQueryMixin(object):
def __init__(self, *, dbname, user, host, password, port):
self.dbname = dbname
self.user = user
self.host = host
self.password = password
self.port = port
[docs] def get_cursor(self):
return self.get_connection().cursor()
[docs] def get_connection(self):
return psycopg2.connect(dbname=self.dbname, user=self.user,
host=self.host, password=self.password,
port=self.port)
[docs] def fetchone(self, query, strict=True):
response = one(list(self.select(query).to_dict().values()))
if strict is True and (len(response) != 1 or response[0] is None):
raise OneResultExpectedError
return response[0]
[docs] def fetchall(self, query, strict=True):
response = self.select(query)
return [one(x) for x in response.values.flat]
[docs] def select(self, query):
return psycopg2_select(
query,
database=self.dbname,
host=self.host,
port=self.port,
username=self.user,
password=self.password
)
[docs] def select_one(self, query):
data = self.select(query).to_dict('record')
if len(data) == 1:
return data[0]
return {}
[docs]def db_connection_creator(credentials: Optional[DbCredentials] = None,
fallback_credentials: Optional[dict] = None,
) -> PostgresQueryMixin:
"""Create a db connection using credentials. If credentials are not
provided then use fallback credentials (which attempt to read from
shell environment variables).
Note: Must provide one of either 'credentials' or 'fallback_credentials'.
If both are provided, 'credentials' will take precedence.
Parameters
----------
credentials : Optional[DbCredentials], optional
User specified credentials, by default None
fallback_credentials : dict
Fallback credentials to use for creating the DB connection in the
case that no 'credentials' are provided, by default None.
Fallback credentials will attempt to get db connection info from
shell environment variables.
Some examples of environment variables that fallback credentials
will try to read from can be found in allensdk.core.auth_config.
Returns
-------
PostgresQueryMixin
A DB connection instance which can execute queries to the DB
specified by credentials or fallback_credentials.
Raises
------
RuntimeError
If neither 'credentials' nor 'fallback_credentials' were provided.
"""
if credentials:
db_conn = PostgresQueryMixin(
dbname=credentials.dbname, user=credentials.user,
host=credentials.host, port=credentials.port,
password=credentials.password)
elif fallback_credentials:
db_conn = (credential_injector(fallback_credentials)
(PostgresQueryMixin)())
else:
raise RuntimeError(
"Must provide either credentials or fallback credentials in "
"order to create a db connection!")
return db_conn