fix: flask_nameko add timeout error catch

This commit is contained in:
BryantHe 2023-10-26 15:06:27 +08:00
parent 5df773c958
commit a5b6535451
3 changed files with 27 additions and 10 deletions

View File

@ -5,7 +5,7 @@ from threading import Lock
from queue import Queue, Empty from queue import Queue, Empty
from .errors import ClientUnavailableError from .errors import ClientUnavailableError, ClientConnectionTimeoutError
class Connection(object): class Connection(object):
@ -49,20 +49,28 @@ class ConnectionPool(object):
self._queue.put(connection) self._queue.put(connection)
def _make_connection(self): def _make_connection(self):
ret = Connection(self._get_connection()) try:
self._current_connections += 1 connection = self._get_connection()
return ret ret = Connection(connection)
self._current_connections += 1
return ret
except Exception as e:
raise ClientConnectionTimeoutError()
def _delete_connection(self, connection): def _delete_connection(self, connection):
del connection del connection
self._current_connections -= 1 self._current_connections -= 1
def _recycle_connection(self, connection): def _recycle_connection(self, connection):
self._lock.acquire() try:
self._delete_connection(connection) self._lock.acquire()
connection = self._make_connection() self._delete_connection(connection)
self._lock.release() connection = self._make_connection()
return connection return connection
except ClientConnectionTimeoutError as e:
pass
finally:
self._lock.release()
def _get_connection_from_queue(self, initial_timeout, next_timeout): def _get_connection_from_queue(self, initial_timeout, next_timeout):
try: try:
@ -79,6 +87,8 @@ class ConnectionPool(object):
return self._queue.get(True, next_timeout) return self._queue.get(True, next_timeout)
except Empty: except Empty:
raise ex raise ex
except ClientConnectionTimeoutError as e:
raise ClientUnavailableError("making connection but timeout")
finally: finally:
self._lock.release() self._lock.release()
@ -98,7 +108,9 @@ class ConnectionPool(object):
initial_timeout, next_timeout) initial_timeout, next_timeout)
if self._recycle and connection.is_stale(self._recycle): if self._recycle and connection.is_stale(self._recycle):
connection = self._recycle_connection(connection) recycled_connection = self._recycle_connection(connection)
if recycled_connection:
connection = recycled_connection
return connection return connection

View File

@ -7,4 +7,8 @@ class ClientUnavailableError(Exception):
class ClusterNotConfiguredError(Exception): class ClusterNotConfiguredError(Exception):
pass
class ClientConnectionTimeoutError(Exception):
pass pass

View File

@ -36,6 +36,7 @@ class PooledClusterRpcProxy(object):
def _get_nameko_connection(self): def _get_nameko_connection(self):
proxy = ClusterRpcClient( proxy = ClusterRpcClient(
timeout=6,
uri=self._config.get('uri') uri=self._config.get('uri')
) )
return proxy.start() return proxy.start()