更新 nameko 依赖版本;添加 base model;添加 flask_nameko

This commit is contained in:
BryantHe 2023-06-01 02:15:57 +08:00
parent e4442bbb73
commit 4211be09be
6 changed files with 329 additions and 2 deletions

97
chassis/db_model.py Normal file
View File

@ -0,0 +1,97 @@
from sqlalchemy import Column, DateTime, Boolean, func
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
BaseModel = declarative_base()
class BaseCrud(BaseModel):
# 硬删除
def delete(self, session, commit=False):
session.delete(self)
if commit:
session.commit()
# 查
@classmethod
def get(cls, session, start=None, count=None, one=True, **kwargs):
if one:
return session.query(cls).filter().filter_by(**kwargs).first()
return session.query(cls).filter().filter_by(**kwargs).offset(start).limit(count).all()
# 增
@classmethod
def create(cls, session, **kwargs):
one = cls()
for key in kwargs.keys():
if hasattr(one, key):
setattr(one, key, kwargs[key])
session.add(one)
if kwargs.get("commit") is True:
session.commit()
return one
def update(self, session, **kwargs):
for key in kwargs.keys():
if hasattr(self, key):
setattr(self, key, kwargs[key])
session.add(self)
if kwargs.get("commit") is True:
session.commit()
return self
# 提供软删除及创建时间更新时间信息的crud model
class InfoCrud(BaseModel):
create_time = Column(DateTime(), default=func.now())
update_time = Column(DateTime(), default=func.now(), onupdate=func.now())
delete_time = Column(DateTime())
is_deleted = Column(Boolean, nullable=False, default=False)
# 软删除
def delete(self, session, commit=False):
self.delete_time = datetime.now()
self.is_deleted = True
session.add(self)
if commit:
session.commit()
# 硬删除
def hard_delete(self, session, commit=False):
session.delete(self)
if commit:
session.commit()
# 查
@classmethod
def get(cls, session, start=None, count=None, one=True, **kwargs):
if kwargs.get('is_deleted') is None:
kwargs['is_deleted'] = False
if one:
return session.query(cls).filter().filter_by(**kwargs).first()
return session.query(cls).filter().filter_by(**kwargs).offset(start).limit(count).all()
# 增
@classmethod
def create(cls, session, **kwargs):
one = cls()
for key in kwargs.keys():
if hasattr(one, key):
setattr(one, key, kwargs[key])
session.add(one)
if kwargs.get('commit') is True:
session.commit()
return one
def update(self, session, **kwargs):
for key in kwargs.keys():
if hasattr(self, key):
setattr(self, key, kwargs[key])
session.add(self)
if kwargs.get('commit') is True:
session.commit()
return self

View File

@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
from .errors import *
from .proxies import FlaskPooledClusterRpcProxy

View File

@ -0,0 +1,110 @@
from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from threading import Lock
from queue import Queue, Empty
from .errors import ClientUnavailableError
class Connection(object):
def __init__(self, connection):
self.connection = connection
self._created_at = datetime.now()
def is_stale(self, recycle_interval):
return (datetime.now() - self._created_at) > recycle_interval
def __getattr__(self, attr):
return getattr(self.connection, attr)
class ConnectionPool(object):
def __init__(
self, get_connection, initial_connections=2, max_connections=8,
recycle=None
):
"""
Create a new pool
:param func get_connection: The function that returns a connection
:param int initial_connections: The initial number of connection
objects to create
:param int max_connections: The maximum amount of connections
to create. These
connections will only be created on demand and will potentially be
destroyed once they have been returned via a call to
:meth:`release_connection`
constructor
"""
self._get_connection = get_connection
self._queue = Queue()
self._current_connections = 0
self._max_connections = max_connections
self._recycle = timedelta(seconds=recycle) if recycle else False
self._lock = Lock()
for x in range(initial_connections):
connection = self._make_connection()
self._queue.put(connection)
def _make_connection(self):
ret = Connection(self._get_connection())
self._current_connections += 1
return ret
def _delete_connection(self, connection):
del connection
self._current_connections -= 1
def _recycle_connection(self, connection):
self._lock.acquire()
self._delete_connection(connection)
connection = self._make_connection()
self._lock.release()
return connection
def _get_connection_from_queue(self, initial_timeout, next_timeout):
try:
return self._queue.get(True, initial_timeout)
except Empty:
try:
self._lock.acquire()
if self._current_connections == self._max_connections:
raise ClientUnavailableError("Too many connections in use")
cb = self._make_connection()
return cb
except ClientUnavailableError as ex:
try:
return self._queue.get(True, next_timeout)
except Empty:
raise ex
finally:
self._lock.release()
def get_connection(self, initial_timeout=0.05, next_timeout=1):
"""
Wait until a connection instance is available
:param float initial_timeout:
how long to wait initially for an existing connection to complete
:param float next_timeout:
if the pool could not obtain a connection during the
initial timeout, and we have allocated the maximum available
number of connections, wait this long until we can retrieve
another one
:return: A connection object
"""
connection = self._get_connection_from_queue(
initial_timeout, next_timeout)
if self._recycle and connection.is_stale(self._recycle):
connection = self._recycle_connection(connection)
return connection
def release_connection(self, cb):
"""
Return a Connection object to the pool
:param Connection cb: the connection to release
"""
self._queue.put(cb, True)

View File

@ -0,0 +1,10 @@
class BadConfigurationError(Exception):
pass
class ClientUnavailableError(Exception):
pass
class ClusterNotConfiguredError(Exception):
pass

View File

@ -0,0 +1,107 @@
from __future__ import absolute_import, unicode_literals
import re
from flask import g
from nameko.standalone.rpc import ClusterRpcClient
from .connection_pool import ConnectionPool
from .errors import (
BadConfigurationError,
ClusterNotConfiguredError
)
class PooledClusterRpcProxy(object):
_pool = None
_config = None
def __init__(self, config=None):
if config:
self.configure(config)
def configure(self, config):
if not config.get('AMQP_URI'):
raise BadConfigurationError(
"Please provide a valid configuration.")
self._config = config
self._pool = ConnectionPool(
self._get_nameko_connection,
initial_connections=config.get('INITIAL_CONNECTIONS', 2),
max_connections=config.get('MAX_CONNECTIONS', 8),
recycle=config.get('POOL_RECYCLE')
)
def _get_nameko_connection(self):
proxy = ClusterRpcClient(
self._config,
timeout=self._config.get('RPC_TIMEOUT', None)
)
return proxy.start()
def get_connection(self):
if not self._pool:
raise ClusterNotConfiguredError(
"Please configure your cluster beore requesting a connection.")
return self._pool.get_connection()
def release_connection(self, connection):
return self._pool.release_connection(connection)
class LazyServiceProxy(object):
def __init__(self, get_connection, service):
self.get_connection = get_connection
self.service = service
def __getattr__(self, name):
return getattr(getattr(self.get_connection(), self.service), name)
class FlaskPooledClusterRpcProxy(PooledClusterRpcProxy):
def __init__(self, app=None, connect_on_method_call=True):
self._connect_on_method_call = connect_on_method_call
if app:
self.init_app(app)
def init_app(self, app):
config = dict()
for key, val in app.config.items():
match = re.match(r"NAMEKO\_(?P<name>.*)", key)
if match:
config[match.group('name')] = val
self.configure(config)
self._connect_on_method_call = config.get(
'NAMEKO_CONNECT_ON_METHOD_CALL',
self._connect_on_method_call
)
app.teardown_appcontext(self._teardown_nameko_connection)
def get_connection(self):
connection = getattr(g, '_nameko_connection', None)
if not connection:
connection = super(
FlaskPooledClusterRpcProxy, self).get_connection()
g._nameko_connection = connection
return connection
def _teardown_nameko_connection(self, exception):
connection = getattr(g, '_nameko_connection', None)
if connection is not None:
self.release_connection(connection)
def _get_service(self, service):
if self._connect_on_method_call:
return LazyServiceProxy(lambda: self.get_connection(), service)
else:
return getattr(self.get_connection(), service)
def __getattr__(self, name):
return self._get_service(name)
def __getitem__(self, name):
return self._get_service(name)

View File

@ -52,8 +52,7 @@ setup(
'shortuuid==1.0.11'
],
extras_require={
'nameko': ['nameko==2.14.1',
'nameko-tracer==1.3.0',
'nameko': ['nameko==3.0.0rc11',
'nameko-sentry==1.0.0'],
'apiflask': ['apiflask>=1.3.1', 'gevent>=22.10.2', 'gunicorn==20.1.0'],
'rocketry': ['rocketry==2.4.0'],