Source code for allensdk.internal.api

import psycopg2
import psycopg2.extras
import pandas as pd

from allensdk import one, OneResultExpectedError
from allensdk.core.authentication import credential_injector

[docs]class OneOrMoreResultExpectedError(RuntimeError): pass
[docs]def psycopg2_select(query, database, host, port, username, password): connection = psycopg2.connect( host=host, port=port, dbname=database, user=username, password=password, cursor_factory=psycopg2.extras.RealDictCursor ) cursor = connection.cursor() try: cursor.execute(query) response = cursor.fetchall() finally: cursor.close() connection.close() return pd.DataFrame(response)
[docs]class PostgresQueryMixin(object): def __init__(self, *, dbname, user, host, password, port): self.dbname = dbname self.user = user self.host = host self.password = password self.port = port
[docs] def get_cursor(self): return self.get_connection().cursor()
[docs] def get_connection(self): return psycopg2.connect(dbname=self.dbname, user=self.user, host=self.host, password=self.password, port=self.port)
[docs] def fetchone(self, query, strict=True): response = one(list(self.select(query).to_dict().values())) if strict is True and (len(response) != 1 or response[0] is None): raise OneResultExpectedError return response[0]
[docs] def fetchall(self, query, strict=True): response = self.select(query) return [one(x) for x in response.values.flat]
[docs] def select(self, query): return psycopg2_select(query, database=self.dbname, host=self.host, port=self.port, username=self.user, password=self.password )
[docs] def select_one(self, query): data = self.select(query).to_dict('record') if len(data) == 1: return data[0] return {}