Source code for asyncpg_simpleorm.connection_managers
import asyncpg
from .abstract import AsyncContextManagerABC
[docs]class ConnectionManager(AsyncContextManagerABC):
"""An async context manager that mimics the :func:`asyncpg.connect`
function, used with subclasses of :class:`AsyncModel`.
:param args: Passed to :func:`asyncpg.connect` function.
:param kwargs: Passed to :func:`asyncpg.connect` function.
"""
__slots__ = ('_args', '_kwargs', '_connection')
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
self._connection = None
async def __aenter__(self):
if self._connection is None:
self._connection = await asyncpg.connect(*self._args,
**self._kwargs)
return self._connection
async def __aexit__(self, exctype, excval, traceback):
pass
[docs]class PoolManager(AsyncContextManagerABC):
"""An async context manager that mimics the :func:`asyncpg.create_pool`
function, used with subclasses of :class:`AsyncModel`.
:param args: Passed to :func:`asyncpg.create_pool` function.
:param kwargs: Passed to :func:`asyncpg.create_pool` function.
"""
__slots__ = ('_args', '_kwargs', '_connection', '_pool')
def __init__(self, *args, **kwargs):
self._args = args
self._kwargs = kwargs
self._connection = None
self._pool = None
async def __aenter__(self):
if self._pool is None:
self._pool = await asyncpg.create_pool(*self._args,
**self._kwargs)
self._connection = await self._pool.acquire()
return self._connection
async def __aexit__(self, exctype, excval, traceback):
await self._pool.release(self._connection)
self._connection = None