import socket import os import platform import psutil from apiflask import APIBlueprint from flask import session from app import rpc from app.api.v1.exception.api import ImageUploadError, UserInfoError, ImageNotFound, DietCreationError from app.api.v1.schema.api import ImageIn, ImagePreSignUrlOut, ImagePreSignUrlIn, DietImageIn, ProfilingIn from app.util.auth import login_required api = APIBlueprint('api', __name__) @api.post('/images') @api.doc(summary='上传头像图片', description='上传头像图片') @api.input(ImageIn, location='files') @api.output(ImagePreSignUrlOut) @login_required def upload_avatar_image(files_data): f = files_data['image'] try: result = rpc.storage.upload(file_name=str(session['user_id'] + f'_{f.filename}'), file_binary=f.read(), bucket='bodyrecord', folder='avatar', app='bodyrecord') except Exception as e: raise ImageUploadError(extra_data={'error_docs': str(e)}) if result.get('status') and result['status'] == 'UPLOADED': try: rpc.admin.set_user_info(session['user_id'], avatar_id=result['_id']) except Exception as e: raise UserInfoError(extra_data={'error_docs': str(e)}) try: presign_url = rpc.storage.get_presign_url(result['_id'], 'bodyrecord', bucket='bodyrecord') except Exception as e: raise ImageNotFound(extra_data={'error_docs': str(e)}) return {'image_presign_url': presign_url} else: raise ImageUploadError() @api.post('/images/diet') @api.doc(summary='上传食物图片', description='上传食物图片') @api.input(DietImageIn, location='files') @api.output(ImagePreSignUrlOut) @login_required def upload_diet_image(files_data): f = files_data['image'] category_dict = { 0: '早餐', 1: '午餐', 2: '晚餐', 3: '零食', 4: '下午茶', 5: '夜宵', } category = category_dict[files_data['category']] # 食物类别 try: result = rpc.storage.upload(file_name=str(session['user_id'] + f'_{f.filename}'), file_binary=f.read(), bucket='bodyrecord', folder='diet', app='bodyrecord') except Exception as e: raise ImageUploadError(extra_data={'error_docs': str(e)}) if result.get('status') and result['status'] == 'UPLOADED': # 图片上传成功,处理业务逻辑 try: rpc.diet.add(**{ 'user_id': session['user_id'], 'category': category, 'diet_image_id': result['_id'], }) except Exception as e: raise DietCreationError(extra_data={'error_docs': str(e)}) try: presign_url = rpc.storage.get_presign_url(result['_id'], 'bodyrecord', bucket='bodyrecord') except Exception as e: raise ImageNotFound(extra_data={'error_docs': str(e)}) return {'image_presign_url': presign_url} else: raise ImageUploadError() @api.get('/images/presign_url') @api.input(ImagePreSignUrlIn, location='query') @api.output(ImagePreSignUrlOut) @api.doc(summary='获取图片预签名链接', description='获取图片预签名链接') def get_image_presign_url(query_data): try: presign_url = rpc.storage.get_presign_url(query_data['avatar_id'], 'bodyrecord', bucket='bodyrecord', expire_time=query_data.get('expire_time', 3600)) except Exception as e: raise ImageNotFound(extra_data={'error_docs': str(e)}) return {'image_presign_url': presign_url} @api.get('/profiling') @api.input(ProfilingIn, location='query') @api.doc(summary='此接口用于性能测试', description='此接口用于性能测试') def profiling(query_data): result = rpc.admin.identify(query_data.get('token', 'xxxxxx')) return result @api.get('/hello') def hello(): result_one = f'我是您的专属接口提供服务器:' result_two = f'我的机器参数如下' result_three = f'操作系统:{platform.system()}' text = """

您好,PZX 大人!
""" + result_one + """
""" + f'{os.environ["HOSTNAME"]} ' + f'进程 {socket.gethostname()}' + """

""" + result_two + """
""" + result_three + """
""" + f'CPU 核数:{psutil.cpu_count()},目前 CPU 占用率: {psutil.cpu_percent()}' + """
""" + f'总内存 {round(psutil.virtual_memory().total / 1024 / 1024, 2)} MB,使用中内存:{round(psutil.virtual_memory().used / 1024 / 1024, 2)} MB' + """
""" + f'磁盘总空间 {round(psutil.disk_usage("/").total / 1024 / 1024 / 1024, 2)} G,磁盘使用情况:{round(psutil.disk_usage("/").used / 1024 / 1024 / 1024, 2)} G' + """

""" return text