diff --git a/chassis/db_model.py b/chassis/db_model.py new file mode 100644 index 0000000..1be0e10 --- /dev/null +++ b/chassis/db_model.py @@ -0,0 +1,97 @@ +from sqlalchemy import Column, DateTime, Boolean, func +from sqlalchemy.ext.declarative import declarative_base +from datetime import datetime + +BaseModel = declarative_base() + + +class BaseCrud(BaseModel): + + # 硬删除 + def delete(self, session, commit=False): + session.delete(self) + if commit: + session.commit() + + # 查 + @classmethod + def get(cls, session, start=None, count=None, one=True, **kwargs): + if one: + return session.query(cls).filter().filter_by(**kwargs).first() + return session.query(cls).filter().filter_by(**kwargs).offset(start).limit(count).all() + + # 增 + @classmethod + def create(cls, session, **kwargs): + one = cls() + for key in kwargs.keys(): + if hasattr(one, key): + setattr(one, key, kwargs[key]) + session.add(one) + if kwargs.get("commit") is True: + session.commit() + return one + + def update(self, session, **kwargs): + for key in kwargs.keys(): + if hasattr(self, key): + setattr(self, key, kwargs[key]) + session.add(self) + if kwargs.get("commit") is True: + session.commit() + return self + + +# 提供软删除,及创建时间,更新时间信息的crud model + + +class InfoCrud(BaseModel): + + create_time = Column(DateTime(), default=func.now()) + update_time = Column(DateTime(), default=func.now(), onupdate=func.now()) + delete_time = Column(DateTime()) + is_deleted = Column(Boolean, nullable=False, default=False) + + # 软删除 + def delete(self, session, commit=False): + self.delete_time = datetime.now() + self.is_deleted = True + session.add(self) + if commit: + session.commit() + + # 硬删除 + def hard_delete(self, session, commit=False): + session.delete(self) + if commit: + session.commit() + + # 查 + @classmethod + def get(cls, session, start=None, count=None, one=True, **kwargs): + if kwargs.get('is_deleted') is None: + kwargs['is_deleted'] = False + if one: + return session.query(cls).filter().filter_by(**kwargs).first() + return session.query(cls).filter().filter_by(**kwargs).offset(start).limit(count).all() + + # 增 + @classmethod + def create(cls, session, **kwargs): + one = cls() + for key in kwargs.keys(): + if hasattr(one, key): + setattr(one, key, kwargs[key]) + session.add(one) + if kwargs.get('commit') is True: + session.commit() + return one + + def update(self, session, **kwargs): + for key in kwargs.keys(): + if hasattr(self, key): + setattr(self, key, kwargs[key]) + session.add(self) + if kwargs.get('commit') is True: + session.commit() + return self \ No newline at end of file diff --git a/chassis/flask_nameko/__init__.py b/chassis/flask_nameko/__init__.py new file mode 100644 index 0000000..0d4d6f8 --- /dev/null +++ b/chassis/flask_nameko/__init__.py @@ -0,0 +1,4 @@ +# -*- coding: utf-8 -*- + +from .errors import * +from .proxies import FlaskPooledClusterRpcProxy \ No newline at end of file diff --git a/chassis/flask_nameko/connection_pool.py b/chassis/flask_nameko/connection_pool.py new file mode 100644 index 0000000..203f1a3 --- /dev/null +++ b/chassis/flask_nameko/connection_pool.py @@ -0,0 +1,110 @@ +from __future__ import absolute_import, unicode_literals + +from datetime import datetime, timedelta +from threading import Lock + +from queue import Queue, Empty + +from .errors import ClientUnavailableError + + +class Connection(object): + def __init__(self, connection): + self.connection = connection + self._created_at = datetime.now() + + def is_stale(self, recycle_interval): + return (datetime.now() - self._created_at) > recycle_interval + + def __getattr__(self, attr): + return getattr(self.connection, attr) + + +class ConnectionPool(object): + def __init__( + self, get_connection, initial_connections=2, max_connections=8, + recycle=None + ): + """ + Create a new pool + :param func get_connection: The function that returns a connection + :param int initial_connections: The initial number of connection + objects to create + :param int max_connections: The maximum amount of connections + to create. These + connections will only be created on demand and will potentially be + destroyed once they have been returned via a call to + :meth:`release_connection` + constructor + """ + self._get_connection = get_connection + self._queue = Queue() + self._current_connections = 0 + self._max_connections = max_connections + self._recycle = timedelta(seconds=recycle) if recycle else False + self._lock = Lock() + + for x in range(initial_connections): + connection = self._make_connection() + self._queue.put(connection) + + def _make_connection(self): + ret = Connection(self._get_connection()) + self._current_connections += 1 + return ret + + def _delete_connection(self, connection): + del connection + self._current_connections -= 1 + + def _recycle_connection(self, connection): + self._lock.acquire() + self._delete_connection(connection) + connection = self._make_connection() + self._lock.release() + return connection + + def _get_connection_from_queue(self, initial_timeout, next_timeout): + try: + return self._queue.get(True, initial_timeout) + except Empty: + try: + self._lock.acquire() + if self._current_connections == self._max_connections: + raise ClientUnavailableError("Too many connections in use") + cb = self._make_connection() + return cb + except ClientUnavailableError as ex: + try: + return self._queue.get(True, next_timeout) + except Empty: + raise ex + finally: + self._lock.release() + + def get_connection(self, initial_timeout=0.05, next_timeout=1): + """ + Wait until a connection instance is available + :param float initial_timeout: + how long to wait initially for an existing connection to complete + :param float next_timeout: + if the pool could not obtain a connection during the + initial timeout, and we have allocated the maximum available + number of connections, wait this long until we can retrieve + another one + :return: A connection object + """ + connection = self._get_connection_from_queue( + initial_timeout, next_timeout) + + if self._recycle and connection.is_stale(self._recycle): + connection = self._recycle_connection(connection) + + return connection + + def release_connection(self, cb): + """ + Return a Connection object to the pool + :param Connection cb: the connection to release + """ + self._queue.put(cb, True) \ No newline at end of file diff --git a/chassis/flask_nameko/errors.py b/chassis/flask_nameko/errors.py new file mode 100644 index 0000000..4aeb573 --- /dev/null +++ b/chassis/flask_nameko/errors.py @@ -0,0 +1,10 @@ +class BadConfigurationError(Exception): + pass + + +class ClientUnavailableError(Exception): + pass + + +class ClusterNotConfiguredError(Exception): + pass \ No newline at end of file diff --git a/chassis/flask_nameko/proxies.py b/chassis/flask_nameko/proxies.py new file mode 100644 index 0000000..0ea2a69 --- /dev/null +++ b/chassis/flask_nameko/proxies.py @@ -0,0 +1,107 @@ +from __future__ import absolute_import, unicode_literals + +import re + +from flask import g +from nameko.standalone.rpc import ClusterRpcClient + +from .connection_pool import ConnectionPool +from .errors import ( + BadConfigurationError, + ClusterNotConfiguredError +) + + +class PooledClusterRpcProxy(object): + + _pool = None + _config = None + + def __init__(self, config=None): + if config: + self.configure(config) + + def configure(self, config): + if not config.get('AMQP_URI'): + raise BadConfigurationError( + "Please provide a valid configuration.") + + self._config = config + self._pool = ConnectionPool( + self._get_nameko_connection, + initial_connections=config.get('INITIAL_CONNECTIONS', 2), + max_connections=config.get('MAX_CONNECTIONS', 8), + recycle=config.get('POOL_RECYCLE') + ) + + def _get_nameko_connection(self): + proxy = ClusterRpcClient( + self._config, + timeout=self._config.get('RPC_TIMEOUT', None) + ) + return proxy.start() + + def get_connection(self): + if not self._pool: + raise ClusterNotConfiguredError( + "Please configure your cluster beore requesting a connection.") + return self._pool.get_connection() + + def release_connection(self, connection): + return self._pool.release_connection(connection) + + +class LazyServiceProxy(object): + def __init__(self, get_connection, service): + self.get_connection = get_connection + self.service = service + + def __getattr__(self, name): + return getattr(getattr(self.get_connection(), self.service), name) + + +class FlaskPooledClusterRpcProxy(PooledClusterRpcProxy): + def __init__(self, app=None, connect_on_method_call=True): + self._connect_on_method_call = connect_on_method_call + if app: + self.init_app(app) + + def init_app(self, app): + config = dict() + for key, val in app.config.items(): + match = re.match(r"NAMEKO\_(?P.*)", key) + if match: + config[match.group('name')] = val + self.configure(config) + + self._connect_on_method_call = config.get( + 'NAMEKO_CONNECT_ON_METHOD_CALL', + self._connect_on_method_call + ) + + app.teardown_appcontext(self._teardown_nameko_connection) + + def get_connection(self): + connection = getattr(g, '_nameko_connection', None) + if not connection: + connection = super( + FlaskPooledClusterRpcProxy, self).get_connection() + g._nameko_connection = connection + return connection + + def _teardown_nameko_connection(self, exception): + connection = getattr(g, '_nameko_connection', None) + if connection is not None: + self.release_connection(connection) + + def _get_service(self, service): + if self._connect_on_method_call: + return LazyServiceProxy(lambda: self.get_connection(), service) + else: + return getattr(self.get_connection(), service) + + def __getattr__(self, name): + return self._get_service(name) + + def __getitem__(self, name): + return self._get_service(name) \ No newline at end of file diff --git a/setup.py b/setup.py index 088e24e..f3f3bcc 100644 --- a/setup.py +++ b/setup.py @@ -52,8 +52,7 @@ setup( 'shortuuid==1.0.11' ], extras_require={ - 'nameko': ['nameko==2.14.1', - 'nameko-tracer==1.3.0', + 'nameko': ['nameko==3.0.0rc11', 'nameko-sentry==1.0.0'], 'apiflask': ['apiflask>=1.3.1', 'gevent>=22.10.2', 'gunicorn==20.1.0'], 'rocketry': ['rocketry==2.4.0'],