Código fonte para asgard.db.session

import sqlalchemy
from aiopg.sa import create_engine


class _EngineWrapper:
    def __init__(self, coro_engine):
        self._coro_engine = coro_engine
        self._connected = False

    async def engine(self):
        if not self._connected:
            self._engine = await self._coro_engine
            self._connected = True
        return self._engine


[documentos]class AsgardDBConnection: def __init__(self, engine, conn, session): self.engine = engine self.conn = conn self.session = session self._query = None
[documentos] def query(self, *args) -> "AsgardDBConnection": prepared_query_params = [] for item in args: if type(item) is sqlalchemy.ext.declarative.api.DeclarativeMeta: prepared_query_params.append(item.__table__) else: prepared_query_params.append(item) self._query = sqlalchemy.select(prepared_query_params) return self
[documentos] def filter(self, *args): self._query = self._query.where(*args) return self
[documentos] def join(self, join_clause): self._query = self._query.select_from(join_clause) return self
[documentos] def begin(self): return self.conn.begin()
[documentos] async def execute(self, *args, **kwargs): return await self.conn.execute(*args, **kwargs)
[documentos] def release(self): self.engine.release(self.conn)
[documentos] async def all(self): result = await self.execute(self._query) return await result.fetchall()
[documentos] async def one(self): self._query.limit(2) result = await self.execute(self._query) result_list = list(await result.fetchall()) if len(result_list) > 1: raise sqlalchemy.orm.exc.MultipleResultsFound if not len(result_list): raise sqlalchemy.orm.exc.NoResultFound return result_list[0]
[documentos] async def exists(self) -> bool: self._query.limit(1) result = await self.execute(self._query) return len(await result.fetchall()) > 0
[documentos]class Session: def __init__(self, engine_wrapper): self._engine_wrapper = engine_wrapper
[documentos] async def engine(self): return await self._engine_wrapper.engine()
[documentos] async def connection(self): engine = await self._engine_wrapper.engine() self.conn = await engine._acquire() return AsgardDBConnection(engine, self.conn, session=self)
async def __aenter__(self) -> AsgardDBConnection: return await self.connection() async def __aexit__(self, exc_type, exc, tb): engine = await self._engine_wrapper.engine() engine.release(self.conn)
class _SessionMaker: def __init__(self, dsn, *args, **kwargs): self._dsn = dsn self._engine_wrapper = _EngineWrapper(create_engine(dsn=dsn, **kwargs)) self._connected = False def __call__(self) -> Session: return Session(self._engine_wrapper) async def engine(self): return await self._engine_wrapper.engine()