Source code for aiothrift.pool

"""
The connection pool implementation is heavily borrowed from `aioredis`
"""
import asyncio
import collections
import sys

from .connection import create_connection
from .log import logger
from .util import async_task
from .errors import PoolClosedError

PY_35 = sys.version_info >= (3, 5)


@asyncio.coroutine
[docs]def create_pool(service, address=('127.0.0.1', 6000), *, minsize=1, maxsize=10, loop=None, timeout=None): """ Create a thrift connection pool. This function is a :ref:`coroutine <coroutine>`. :param service: service object defined by thrift file :param address: (host, port) tuple, default is ('127.0.0.1', 6000) :param minsize: minimal thrift connection, default is 1 :param maxsize: maximal thrift connection, default is 10 :param loop: targeting :class:`eventloop <asyncio.AbstractEventLoop>` :param timeout: default timeout for each connection, default is None :return: :class:`ThriftPool` instance """ pool = ThriftPool(service, address, minsize=minsize, maxsize=maxsize, loop=loop, timeout=timeout) try: yield from pool.fill_free(override_min=False) except Exception: pool.close() raise return pool
[docs]class ThriftPool: """Thrift connection pool. """ def __init__(self, service, address, *, minsize, maxsize, loop=None, timeout=None): assert isinstance(minsize, int) and minsize >= 0, ( "minsize must be int >= 0", minsize, type(minsize)) assert maxsize is not None, "Arbitrary pool size is disallowed." assert isinstance(maxsize, int) and maxsize > 0, ( "maxsize must be int > 0", maxsize, type(maxsize)) assert minsize <= maxsize, ( "Invalid pool min/max sizes", minsize, maxsize) if loop is None: loop = asyncio.get_event_loop() self._address = address self.minsize = minsize self.maxsize = maxsize self._loop = loop self._pool = collections.deque(maxlen=maxsize) self._used = set() self._acquiring = 0 self._cond = asyncio.Condition(loop=loop) self._service = service self._timeout = timeout self.closed = False self._release_tasks = set() @property def size(self): """Current connection total num, acquiring connection num is counted""" return self.freesize + len(self._used) + self._acquiring @property def freesize(self): """Current number of free connections.""" return len(self._pool) @asyncio.coroutine
[docs] def clear(self): """Clear pool connections. Close and remove all free connections. this pattern is interesting """ while self._pool: conn = self._pool.popleft() conn.close()
[docs] def close(self): """Close all free and in-progress connections and mark pool as closed. """ self.closed = True conn_num = 0 while self._pool: conn = self._pool.popleft() conn.close() conn_num += 1 for conn in self._used: conn.close() conn_num += 1 logger.debug("Closed %d connections", conn_num)
@asyncio.coroutine def wait_closed(self): for task in self._release_tasks: yield from asyncio.shield(task, loop=self._loop) @asyncio.coroutine
[docs] def acquire(self): """Acquires a connection from free pool. Creates new connection if needed. """ if self.closed: raise PoolClosedError('Pool is closed') with (yield from self._cond): if self.closed: raise PoolClosedError('Pool is closed') while True: yield from self.fill_free(override_min=True) # new connection has been added to the pool if self.freesize: conn = self._pool.popleft() assert not conn.closed, conn assert conn not in self._used, (conn, self._used) # each acquire would move a conn from `self._pool` to `self._used` self._used.add(conn) return conn else: # wait when no available connection yield from self._cond.wait()
[docs] def release(self, conn): """Returns used connection back into pool. When queue of free connections is full the connection will be dropped. """ assert conn in self._used, 'Invalid connection, maybe from other pool' self._used.remove(conn) if not conn.closed: assert self.freesize < self.maxsize, 'max connection size should not exceed' self._pool.append(conn) if not self._loop.is_closed(): tasks = set() for task in self._release_tasks: if not task.done(): tasks.add(task) self._release_tasks = tasks future = async_task(self._notify_conn_returned(), loop=self._loop) self._release_tasks.add(future)
def _drop_closed(self): for i in range(self.freesize): conn = self._pool[0] if conn.closed: self._pool.popleft() else: self._pool.rotate(1) @asyncio.coroutine
[docs] def fill_free(self, *, override_min): """ make sure at least `self.minsize` amount of connections in the pool if `override_min` is True, fill to the `self.maxsize`. """ # drop closed connections first, in case that the user closed the connection manually self._drop_closed() while self.size < self.minsize: self._acquiring += 1 try: conn = yield from self._create_new_connection() self._pool.append(conn) finally: self._acquiring -= 1 self._drop_closed() if self.freesize: return if override_min: # when self.size >= minsize and no available connection while not self._pool and self.size < self.maxsize: self._acquiring += 1 try: conn = yield from self._create_new_connection() self._pool.append(conn) finally: self._acquiring -= 1
def _create_new_connection(self): return create_connection(self._service, self._address, loop=self._loop, timeout=self._timeout) @asyncio.coroutine def _notify_conn_returned(self): with (yield from self._cond): self._cond.notify() def __enter__(self): raise RuntimeError( "'yield from' should be used as a context manager expression") def __exit__(self, *args): pass # pragma: nocover def __iter__(self): # this method is needed to allow `yield`ing from pool conn = yield from self.acquire() return _ConnectionContextManager(self, conn) if PY_35: def __await__(self): # To make `with await pool` work conn = yield from self.acquire() return _ConnectionContextManager(self, conn) def get(self): """ Return async context manager for working with connection:: async with pool.get() as conn: await conn.get(key) """ return _AsyncConnectionContextManager(self)
class _ConnectionContextManager: __slots__ = ('_pool', '_conn') def __init__(self, pool, conn): self._pool = pool self._conn = conn def __enter__(self): return self._conn def __exit__(self, exc_type, exc_value, tb): try: self._pool.release(self._conn) finally: self._pool = None self._conn = None if PY_35: class _AsyncConnectionContextManager: __slots__ = ('_pool', '_conn') def __init__(self, pool): self._pool = pool self._conn = None @asyncio.coroutine def __aenter__(self): self._conn = yield from self._pool.acquire() return self._conn @asyncio.coroutine def __aexit__(self, exc_type, exc_value, tb): try: self._pool.release(self._conn) finally: self._pool = None self._conn = None