Source code for allensdk.internal.api

from typing import Optional

import psycopg2
import psycopg2.extras
import pandas as pd

from allensdk import one, OneResultExpectedError
from allensdk.core.authentication import DbCredentials, credential_injector


[docs]class OneOrMoreResultExpectedError(RuntimeError): pass
[docs]def psycopg2_select(query, database, host, port, username, password): connection = psycopg2.connect( host=host, port=port, dbname=database, user=username, password=password, cursor_factory=psycopg2.extras.RealDictCursor ) cursor = connection.cursor() try: cursor.execute(query) response = cursor.fetchall() finally: cursor.close() connection.close() return pd.DataFrame(response)
[docs]class PostgresQueryMixin(object): def __init__(self, *, dbname, user, host, password, port): self.dbname = dbname self.user = user self.host = host self.password = password self.port = port
[docs] def get_cursor(self): return self.get_connection().cursor()
[docs] def get_connection(self): return psycopg2.connect(dbname=self.dbname, user=self.user, host=self.host, password=self.password, port=self.port)
[docs] def fetchone(self, query, strict=True): response = one(list(self.select(query).to_dict().values())) if strict is True and (len(response) != 1 or response[0] is None): raise OneResultExpectedError return response[0]
[docs] def fetchall(self, query, strict=True): response = self.select(query) return [one(x) for x in response.values.flat]
[docs] def select(self, query): return psycopg2_select( query, database=self.dbname, host=self.host, port=self.port, username=self.user, password=self.password )
[docs] def select_one(self, query): data = self.select(query).to_dict('records') if len(data) == 1: return data[0] return {}
[docs]def db_connection_creator(credentials: Optional[DbCredentials] = None, fallback_credentials: Optional[dict] = None, ) -> PostgresQueryMixin: """Create a db connection using credentials. If credentials are not provided then use fallback credentials (which attempt to read from shell environment variables). Note: Must provide one of either 'credentials' or 'fallback_credentials'. If both are provided, 'credentials' will take precedence. Parameters ---------- credentials : Optional[DbCredentials], optional User specified credentials, by default None fallback_credentials : dict Fallback credentials to use for creating the DB connection in the case that no 'credentials' are provided, by default None. Fallback credentials will attempt to get db connection info from shell environment variables. Some examples of environment variables that fallback credentials will try to read from can be found in allensdk.core.auth_config. Returns ------- PostgresQueryMixin A DB connection instance which can execute queries to the DB specified by credentials or fallback_credentials. Raises ------ RuntimeError If neither 'credentials' nor 'fallback_credentials' were provided. """ if credentials: db_conn = PostgresQueryMixin( dbname=credentials.dbname, user=credentials.user, host=credentials.host, port=credentials.port, password=credentials.password) elif fallback_credentials: db_conn = (credential_injector(fallback_credentials) (PostgresQueryMixin)()) else: raise RuntimeError( "Must provide either credentials or fallback credentials in " "order to create a db connection!") return db_conn