Commit a1da9e43 by 曾维佳

feat: add some new fast-api of flask.

parents
.gitignore
.git*
venv
.run
# 忽略所有 .pyc 文件
*.pyc
# 忽略所有 .o 文件
*.o
# 忽略所有 .so 文件
*.so
# 忽略所有 .log 文件
*.log
# 忽略 .idea 目录
.idea/
# 忽略所有的 __init__.py 文件
**/__init__.py
# 忽略 tmp 目录
tmp/
FROM harbor.airqualitychina.cn:2229/public/python:3.8.10-centos7.8
#ENV ENVIRONMENT prod
ENV PATH=$PATH:/data/deploy:/data/deploy/bin
WORKDIR /data/deploy
RUN mkdir -p /data/deploy/tmp/log
COPY requirements.txt ./
RUN pip3 install --upgrade pip -i https://pypi.tuna.tsinghua.edu.cn/simple \
&& pip3 install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
RUN rm -f /etc/yum.repos.d/*repo \
&& curl https://mirrors.aliyun.com/repo/Centos-7.repo -o /etc/yum.repos.d/Centos-7.repo 2>/dev/null
RUN yum -y install crontabs
ADD cron /etc/cron.d/cron_docker
RUN chmod 0644 /etc/cron.d/cron_docker
RUN crontab /etc/cron.d/cron_docker
RUN yum install glibc-common -y
RUN localedef -c -f UTF-8 -i zh_CN zh_CN.utf8
ENV LANG zh_CN.UTF-8
ENV LC_ALL zh_CN.UTF-8
COPY . .
RUN chmod +x -R /data/deploy/bin
EXPOSE 9091
CMD [ "/bin/bash", "docker_run.sh" ]
# 子系统脚手架
## 关于运行环境的说明
1. 普通项目可以直接使用yard-base中的Dockerfile,有特殊需求的项目请自行修改Dockerfile中的镜像信息;
2. 开发环境推荐使用PyCharm作为IDE,运行项目如果报找不到src的错误,请在src上单击右键,选择设置为代码根目录即可
3. 如果没有使用IDE,可以把src目录添加到环境变量中(如果只是运行,可以忽略此步骤,直接按第4步执行)
mac 添加环境变量 vi ~/bash profile 然后把src的绝对路径追加到PATH中即可;
Windows: 同理,把src的绝对路径添加PATH中;
4. 快速运行脚手架
- 安装Python3.8
- 执行git clone ssh://git@10.10.8.64:9922/root/yard-base.git
- cd yard-base
- python src/runserver.py
## 概览
### 从api-framework到yard-base
第一版脚手架称为api-framework,功能如其名,意为用于api开发的框架;
第二版脚手架称为yard-base,yard即庭院,base即基础。
从api-framework到yard-base主要做了如下变更。
- 重新整理目录结构,主要目录限定为bin、src、config、test、tmp;
- 原来的Python 3.7.5 升级为Python 3.8.12;
原因:经常会用到pandas库,而pandas的某些功能对Python3.8以下支持不友好,而且Python3.8对协程支持更友好。
- 原来的Flask==1.1.2升级为Flask==2.0.2
- 把部分功能性代码从各个包中的``__init__.py``中迁移出来;
- 添加自动注册路由和蓝图的功能;
- 修改Docker镜像,新镜像基于官方python3.8-alpine3.15制作,镜像大小由原来的1.03G变为0.32G
- 镜像内置脚手架所需环境,在没有特殊需求的情况下,完成镜像构建仅需3秒
- 优化数据库驱动型能,生成环境只采用mysqlclient,不再使用pymysql
- 去掉framework中的DatetimeUtil.py,推荐使用Arrow,安装:pip install -U arrow
### 目录结构
![image-20211216145105862](doc/image/Sipaster_image_structure.png)
### 生产环境、测试环境和开发环境的控制
prod:生产环境
dev:开发环境
test:测试环境,用于单元测试、预发测试。
![image-20211216145414880](doc/image/Snipaste_prod_environment.png)
![image-20211216145609747](doc/image/Snipaste_dev_environment.png)
![image-20211216145813226](doc/image/Snipaste_pytest_unit_test_environment.png)
### requirement说明
requirements-dev.txt 是开发环境的依赖,特点是安装顺利。
requirements.txt 是生产环境的依赖,特点是性能高。
### 开发服务器的运行方式
```shell
git clone ssh://git@10.10.8.64:9922/root/yard-default.git
pip install -r requirements-dev.txt
cd yard-default/src
python runserver.py
```
### Docker镜像说明
Dockerfile中所使用的镜像是harbor.airqualitychina.cn:2229/public/python3.8:flask-yard,330.88 MB python3.8:flask-yard镜像已经预装以下库:
- Flask==2.0.2
- Flask-RESTful==0.3.9
- Flask-SQLAlchemy==2.5.1
- Flask-Cors==3.0.10
- mysqlclient==2.1.0
- gevent==21.12.0
- greenlet==1.1.2
- gunicorn==20.1.0
- PyYAML==6.0
- demjson==2.2.4
- jsonschema==4.2.1
- requests==2.26.0
- pytest==6.2.5
- pandas==1.4.1
镜像构建速度测试:仅需3秒左右。
### 生产环境运行方式
生产环境使用Docker运行,数据库仅使用mysqlclient,不再支持pymysql,pymysql仅在开发环境中使用。
### 在Docker容器中,对gunicorn进行管理
```sh
/data/deploy/bin # ls
gunicorn.pid restart_gunicron start_gunicron stop_gunicron
/data/deploy/bin #
```
start_gunicron: 启动gunicorn
stop_gunicron: 关闭gunicorn
restart_gunicron:重启gunicorn
### 如何执行单元测试
在项目根目录下,执行pytest即可。
![执行单元测试](doc/image/Snipaste_pytest_unit_test.png)
## 快速上手
### 如何处理前端的get请求
假如前端发送的get请求的方式有如下两种:
1)通过旧的json={}的形式:
http://some-domain:9920/demo/json={"city":"beijing"}的get请求
2)把json放到body中
URL:http://some-domain:9920/demo
格式:application/json
请求方式:GET
body:
```json
{
"city":"beijing"
}
```
后端代码:
```python
class Demo(AbstractApi):
def handle_get_request(self):
return self.get_params()
```
当前端发送请求时,将看到如下响应信息:
```json
{
"code": 2000,
"msg": "访问成功",
"result": {
"city": "beijing"
}
}
```
@echo off
:loop
REM Run the python.py script
echo Running the Python script...
git pull
git push gitee
if %errorlevel% neq 0 (
echo Error occurred while running the Python script.
) else (
echo pull executed successfully.
)
REM Wait for 1 minute
echo Waiting 1 minute before running the script again...
timeout /t 60 > nul
goto loop
\ No newline at end of file
"""
自动生成接口文档
"""
import importlib
import inspect
import os
import re
from docxtpl import DocxTemplate
from werkzeug.utils import find_modules
from framework.interface.abstract_api import AbstractApi
from framework.the_path import ThePath
def get_friendly_type(type):
types = {
'int': '整数',
'str': '字符串',
'float': '小数',
'decimal': '小数',
}
return types.get(type, '字符串')
def _build_class_info_item(method, rule_data:dict, urls, cls_name, cls, title):
params = []
for k, param in rule_data.items():
param['name'] = k
param['_type'] = get_friendly_type(param['type'])
param['required'] = '必须' if param['required'] else ''
params.append(param)
return {'url': urls, 'cls_name': cls_name, 'cls': cls, 'title': title, 'method': method, 'params': params}
def _is_function_override(func):
return not str(func.__module__).endswith('abstract_api')
def get_class_info():
class_info = []
modules = find_modules('api.app', recursive=True)
for mod in modules:
package_name = mod.split('.')[2]
module = importlib.import_module(mod)
cls_list = inspect.getmembers(module, inspect.isclass)
for cls_name, cls in cls_list:
if cls != AbstractApi and issubclass(cls, AbstractApi):
if not cls.url_names():
slug = re.compile(r'([a-z]|\d)([A-Z])').sub(r'\1-\2', cls_name).lower()
urls = f"/{package_name}/{slug}"
else:
slugs = cls.url_names()
urls = [f"/{package_name}/{slug}" for slug in slugs]
title = cls.__doc__.strip()
if _is_function_override(cls.handle_get_request):
class_info.append(_build_class_info_item('GET', cls.get_param_rules, urls, cls_name, cls, title))
if _is_function_override(cls.handle_post_request):
class_info.append(_build_class_info_item('POST', cls.get_param_rules, urls, cls_name, cls, title))
return class_info
def api_doc(project: str):
# FIXME 同一个接口有多个method,只能生成一个method的文档
template_file = os.path.join(ThePath.doc(), 'tpl/api_template.docx')
tpl = DocxTemplate(template_file)
tpl.render({'project': project, 'items': get_class_info()})
save_file_path = os.path.join(ThePath.tmp('doc'), f'{project}.docx')
tpl.save(save_file_path)
if __name__ == '__main__':
api_doc('Yard-base演示项目')
#!/usr/bin/env sh
for i in `cat /data/deploy/bin/gunicorn.pid`;do
kill -HUP $i
done
\ No newline at end of file
#!/usr/bin/env sh
gunicorn --config=/data/deploy/config/api/prod/gunicorn_config.py wsgi:app
echo 'Started successfully!'
sleep infinity
\ No newline at end of file
#!/usr/bin/env sh
for i in `cat /data/deploy/bin/gunicorn.pid`;do
kill -9 $i
done
rm -f /data/deploy/bin/gunicorn.pid
\ No newline at end of file
"""
APP 配置
"""
import urllib
from sqlalchemy.pool import QueuePool
from framework.the_path import ThePath
ENV = 'development'
HOST = '0.0.0.0'
#HOST = ''
PORT = 9091
DEBUG = True
SECRET_KEY = b'e\xa8n\xcf\xc3\xc9\xd9Y\xb1\xdd\xe4B\x95Xh\xeb'
SEND_FILE_MAX_AGE_DEFAULT = 43200
import platform
if platform.system() == 'Linux':
SQLALCHEMY_DATABASE_URI = 'mysql://electricity_data:EF2zUl1GHss3yqay@192.168.195.202:3317/electricity_data',
SQLALCHEMY_BINDS = {
'elec_db_2' : 'mysql+pymysql://electricity_data:EF2zUl1GHss3yqay@192.168.195.201:3317/electricity_data',
}
else:
SQLALCHEMY_DATABASE_URI = 'mysql://electricity_data:EF2zUl1GHss3yqay@10.10.31.166:33176/electricity_data'
SQLALCHEMY_BINDS = {
'elec_db_2': 'mysql+pymysql://electricity_data:EF2zUl1GHss3yqay@10.10.31.166:33176/electricity_data',
}
SQLALCHEMY_ECHO = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_ENGINE_OPTIONS = {
'connect_args': {
},
'echo_pool': True,
'poolclass': QueuePool,
'pool_pre_ping': True,
'pool_size': 20,
'pool_recycle': 300,
'pool_timeout': 5
}
_API_NICE_RESP=False
REDIS_HOST = 'electricity-service.redis.hotgrid.cn'
REDIS_PORT = 6379
REDIS_PWD = 'phWOgP05ymg01GaR'
REDIS_DB = 2
"""
APP 配置
"""
import urllib
from sqlalchemy.pool import QueuePool
from framework.the_path import ThePath
ENV = 'development'
HOST = '0.0.0.0'
PORT = 9091
DEBUG = True
# 新项目使用 python -c 'import os; print(os.urandom(16))' 生成SECRET_KEY
SECRET_KEY = b'e\xa8n\xcf\xc3\xc9\xd9Y\xb1\xdd\xe4B\x95Xh\xeb'
SEND_FILE_MAX_AGE_DEFAULT = 43200
# SQLALCHEMY_DATABASE_URI = 'mysql://electricity:X1w4G3Hdl7VCNqhs@36.110.47.24:3317/electricity_data_test'
SQLALCHEMY_DATABASE_URI = 'mysql://electricity_api_service:GJlfh7&#jg@mmservice-05.mysql.hotgrid.cn:3306/electricity_data'
import platform
if platform.system() == 'Linux':
SQLALCHEMY_BINDS = {
'elec_db_2' : 'mysql://electricity_api_service:GJlfh7&#jg@mmservice-05.mysql.hotgrid.cn:3306/electricity_data',
}
else:
SQLALCHEMY_BINDS = {
'elec_db_2': 'mysql://electricity_data:EF2zUl1GHss3yqay@10.20.7.227:33176/electricity_data',
}
SQLALCHEMY_ECHO = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_ENGINE_OPTIONS = {
'connect_args': {
# 解决 sqlite3 线程报错问题
# 'check_same_thread':False
},
'echo_pool': True,
'poolclass': QueuePool,
'pool_pre_ping': True,
'pool_size': 20,
'pool_recycle': 300,
'pool_timeout': 5
}
_API_NICE_RESP=False
REDIS_HOST = 'electricity-service.redis.hotgrid.cn'
REDIS_PORT = 6379
REDIS_PWD = 'phWOgP05ymg01GaR'
REDIS_DB = 2
"""
APP 配置
"""
import platform
import urllib
from sqlalchemy.pool import QueuePool
ENV = 'production'
HOST = '0.0.0.0'
PORT = 9091
DEBUG = False
# 新项目使用 python -c 'import os; print(os.urandom(16))' 生成SECRET_KEY
SECRET_KEY = b'e\xa8n\xcf\xc3\xc9\xd9Y\xb1\xdd\xe4B\x95Xh\xeb'
SEND_FILE_MAX_AGE_DEFAULT = 43200
# 默认使用用电内网1的数据库,流水数据使用下面elec_db_2
if platform.system() == 'Linux':
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://electricity_data:EF2zUl1GHss3yqay@192.168.195.201:3317/electricity_data'
SQLALCHEMY_BINDS = {
'elec_db_2' : 'mysql+pymysql://electricity_data:EF2zUl1GHss3yqay@192.168.195.201:3317/electricity_data',
}
else:
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://electricity_data:EF2zUl1GHss3yqay@10.20.7.227:33176/electricity_data'
SQLALCHEMY_BINDS = {
'elec_db_2': 'mysql://electricity_data:EF2zUl1GHss3yqay@10.20.7.227:33176/electricity_data',
}
SQLALCHEMY_ECHO = False
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_ENGINE_OPTIONS = {
'echo_pool': True,
'poolclass': QueuePool,
'pool_pre_ping': True,
'pool_size': 20,
'pool_recycle': 300,
'pool_timeout': 5
}
_API_NICE_RESP = False
REDIS_HOST = 'electricity-service.redis.hotgrid.cn'
REDIS_PORT = 6379
REDIS_PWD = 'phWOgP05ymg01GaR'
REDIS_DB = 2
"""
https://docs.gunicorn.org/en/stable/
"""
bind = '0.0.0.0:9091'
backlog = 512
chdir = '/data/deploy/src/'
timeout = 150
worker_class = 'gevent'
workers = 4
threads = 2
loglevel = 'info'
pidfile = "/data/deploy/bin/gunicorn.pid"
access_log_format = '%(t)s %(p)s %(h)s "%(r)s" %(s)s %(L)s %(b)s %(f)s" "%(a)s"'
accesslog = "/data/deploy/tmp/log/gunicorn_access.log"
errorlog = "/data/deploy/tmp/log/gunicorn_error.log"
daemon = True
\ No newline at end of file
"""
APP 配置
"""
import urllib
from sqlalchemy.pool import QueuePool
from framework.the_path import ThePath
ENV = 'development'
HOST = '0.0.0.0'
PORT = 9091
DEBUG = True
# 新项目使用 python -c 'import os; print(os.urandom(16))' 生成SECRET_KEY
SECRET_KEY = b'e\xa8n\xcf\xc3\xc9\xd9Y\xb1\xdd\xe4B\x95Xh\xeb'
SEND_FILE_MAX_AGE_DEFAULT = 43200
# 数据库配置
# SQLALCHEMY_DATABASE_URI = 'mysql://electricity:X1w4G3Hdl7VCNqhs@36.110.47.24:3317/electricity_data_test'
SQLALCHEMY_DATABASE_URI = 'mysql://electricity_api_service:GJlfh7&#jg@mmservice-05.mysql.hotgrid.cn:3306/electricity_data'
SQLALCHEMY_BINDS = {
'olympic_db': f'mysql://olympic:{urllib.parse.quote("olympic_ysrd@2021")}@rm-2zem57smz83811158bo.mysql.rds.aliyuncs.com:3306/olympic_db',
'ysrd_y': 'mysql://weiyanjie:ysrd_PASSW0RD_2023@ysrd_y_master.mysql.internal.airqualitychina.cn:3317/ysrd_y',
'elec_db_2': 'mysql://electricity_api_service:GJlfh7&#jg@mmservice-05.mysql.hotgrid.cn:3306/electricity_data',
}
SQLALCHEMY_ECHO = True
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_RECORD_QUERIES = True
SQLALCHEMY_ENGINE_OPTIONS = {
'connect_args': {
# 解决 sqlite3 线程报错问题
# 'check_same_thread':False
},
'echo_pool': True,
'poolclass': QueuePool,
'pool_pre_ping': True,
'pool_size': 20,
'pool_recycle': 300,
'pool_timeout': 5
}
_API_NICE_RESP=False
REDIS_HOST = 'electricity-service.redis.hotgrid.cn'
REDIS_PORT = 6379
REDIS_PWD = 'phWOgP05ymg01GaR'
REDIS_DB = 2
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
import time
import platform
import pandas as pd
class ClickHouseHelper:
def __init__(self):
if platform.system()=='Linux':
self.host = "192.168.253.195"
else:
self.host = "10.10.31.166"
self.port = 8123
self.user = "default"
self.password = "Ysrd2024!"
self.db = "electrcity_data"
self.engine = None
self.Session = None
self._create_engine()
def _create_engine(self):
# 使用 clickhouse+http:// 作为连接前缀
connection_string = f"clickhouse+http://{self.user}:{self.password}@{self.host}:{self.port}/{self.db}"
# 配置连接池
pool_size = 10 # 设置最大连接数量为10
self.engine = create_engine(
connection_string,
pool_size=pool_size,
max_overflow=3000, # 设置最大溢出数量为0,即不允许超出 pool_size 的连接
pool_timeout=30, # 设置连接池超时时间为30秒
pool_recycle=-1 # 设置连接回收时间,-1 表示不回收
)
self.Session = sessionmaker(bind=self.engine)
def get_session(self):
# 获取数据库会话
return self.Session()
def _df_res_to_db(self, df, table_name):
df.to_sql(
name=table_name,
con=self.engine.connect(),
index=False,
chunksize=2000,
if_exists='append')
def execute_select(self, sql):
time_start = time.time()
with self.engine.connect() as conn:
# 使用 text() 函数将 SQL 语句转换为可执行对象
sql_text = text(sql)
result = conn.execute(sql_text)
data = []
for row in result:
# 使用 row._mapping 将 RowProxy 转换为字典
data.append(dict(row._mapping))
return data
def execute_edit(self, sql):
with self.engine.connect() as conn:
sql_text = text(sql)
result = conn.execute(sql_text)
conn.commit()
return result.rowcount
version: 1
formatters:
common:
format: "[%(asctime)s] %(levelname)s -- %(filename)s %(funcName)s thread:%(thread)d [line:%(lineno)d]-> %(message)s"
handlers:
console:
class: logging.StreamHandler
level: DEBUG
formatter: common
stream: ext://sys.stdout
timed_rotating_file:
class: logging.handlers.TimedRotatingFileHandler
level: DEBUG
formatter: common
filename: /data/deploy/tmp/log/api.log
root:
level: DEBUG
handlers: [ console, timed_rotating_file ]
\ No newline at end of file
import pandas as pd
from sqlalchemy import create_engine
import datetime
import sys
import os
import threading
sys.path.append(os.path.abspath(os.path.join(sys.path[0], '..')))
from src.framework.the_path import ThePath
def run():
# engine = create_engine('sqlite:///:memory:')
engine_sanguo = create_engine(f'sqlite:///{ThePath.demo()}/sanguo.db')
engine_shuihu = create_engine(f'sqlite:///{ThePath.demo()}/shuihu.db')
df1 = pd.DataFrame([
{
'id': ';',
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '张飞'
},
{
'id': 2,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '张辽'
},
{
'id': 3,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '刘备'
},
{
'id': 4,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '关羽'
},
])
df2 = pd.DataFrame([
{
'id': 1,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '关圣'
},
{
'id': 1,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '卢俊义'
},
{
'id': 1,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '武松'
},
{
'id': 1,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': '鲁智深'
}
]
)
df1.to_sql(name=f't_sanguo', con=engine_sanguo, if_exists='replace')
df2.to_sql(name=f't_shuihu', con=engine_shuihu, if_exists='replace')
result = pd.read_sql('select * from t_sanguo', con=engine_sanguo.connect())
print(result)
result = pd.read_sql('select * from t_shuihu', con=engine_shuihu.connect())
print(result)
def test_threading():
def test(id):
df = pd.DataFrame([
{
'id': id,
'created_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'updated_at': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'name': str(id),
},
])
engine = create_engine("mysql+pymysql://yard_base_test:AEWTC^&fhj13@10.10.40.220:3306/for_test")
with engine.begin() as connection:
df.to_sql(name=f't_user_for_test', con=connection, if_exists='append')
for i in range(50):
t = threading.Thread(target=test, args=(i,))
t.start()
if __name__ == '__main__':
run()
# test_threading()
File added
File added
### 目的
### 目的
> 开发者在构建系统时会用到很多库,有很多选择与组合,为了统一、便于协同工作,减少沟通成本,特此创建本框架
1. 轻松的创建api接口
2. 轻松的创建定时任务
3. 针对实际情况进行底层封装,在一定层度上保证编码的最佳实践
4. 保证在一般情况下,通过此框架构建的系统能稳定运行,较少出错
### 目标用户
1. 有Flask基础知识
2. 会使用SQL查询数据
3. 会用Python编写业务逻辑
### 不需要
1. 不需要Flask的高级知识
2. 大部分情况下不需要构建路由
3. 不需要SQLAlchemy的高级知识
4. 不需要定时任务框架(例如APScheduler、XXL-Job)的高级知识
### 兼容
框架无法解决一切问题
1. 兼容需要使用Flask高级功能的情况
2. 兼容需要使用SQLAlchemy高级功能的情况
3. 兼容需要使用定时任务框架高级功能的情况
\ No newline at end of file
#!/usr/bin/env sh
gunicorn --config=/data/deploy/config/api/prod/gunicorn_config.py wsgi:app
echo 'Started successfully!'
sleep infinity
aniso8601==9.0.1
appdirs==1.4.4
xlsxwriter==3.2.0
async-timeout==4.0.2
atomicwrites==1.4.1
attrs==22.1.0
certifi==2022.6.15
charset-normalizer==2.0.12
click==8.1.3
colorama==0.4.5
cycler==0.11.0
docxcompose==1.3.5
docxtpl==0.16.3
et-xmlfile==1.1.0
Flask==2.1.3
Flask-Cors==3.0.10
Flask-RESTful==0.3.9
Flask-SQLAlchemy==2.5.1
greenlet==1.1.3
idna==3.3
importlib-metadata==4.12.0
importlib-resources==5.9.0
iniconfig==1.1.1
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.2.1
kiwisolver==1.4.4
lxml==4.9.1
MarkupSafe==2.1.1
marshmallow==3.15.0
matplotlib==3.4.2
numpy==1.21.6
openpyxl==3.0.10
packaging==21.3
pandas==1.3.5
Pillow==9.4.0
pluggy==1.0.0
prettytable==3.6.0
py==1.11.0
pyecharts==2.0.2
pyecharts-snapshot==0.2.0
pyee==8.2.2
PyMySQL==1.0.2
pyparsing==3.0.9
pyppeteer==1.0.2
pyrsistent==0.18.1
pytest==6.2.5
python-dateutil==2.8.2
python-docx==0.8.11
pytz==2022.2.1
PyYAML==6.0
requests==2.28.0
simplejson==3.18.4
six==1.16.0
snapshot-phantomjs==0.0.3
SQLAlchemy==1.4.32
toml==0.10.2
tqdm==4.65.0
typing_extensions>=4.6.1
urllib3==1.26.12
wcwidth==0.2.6
websockets==10.4
Werkzeug==2.0.2
zipp==3.8.1
DingtalkChatbot==1.5.3
sqlacodegen
tablib
redis==3.2.0
scikit-learn==1.3.2
retry
aniso8601==9.0.1
appdirs==1.4.4
async-timeout==4.0.2
atomicwrites==1.4.1
attrs==22.1.0
xlsxwriter==3.2.0
certifi==2022.6.15
charset-normalizer==2.0.12
click==8.1.3
colorama==0.4.5
cycler==0.11.0
docxcompose==1.3.5
docxtpl==0.16.3
et-xmlfile==1.1.0
Flask==2.1.3
Flask-Cors==3.0.10
Flask-RESTful==0.3.9
Flask-SQLAlchemy==2.5.1
greenlet==1.1.3
idna==3.3
importlib-metadata==4.12.0
importlib-resources==5.9.0
iniconfig==1.1.1
itsdangerous==2.1.2
Jinja2==3.1.2
jsonschema==4.2.1
kiwisolver==1.4.4
lxml==4.9.1
MarkupSafe==2.1.1
marshmallow==3.15.0
matplotlib==3.7.5
seaborn==0.12.2
numpy==1.21.6
openpyxl==3.0.10
jieba
packaging==21.3
pandas==1.3.5
Pillow==9.4.0
pluggy==1.0.0
prettytable==3.6.0
py==1.11.0
pyecharts==2.0.2
pyecharts-snapshot==0.2.0
pyee==8.2.2
PyMySQL==1.0.2
pyparsing==3.0.9
pyppeteer==1.0.2
pyrsistent==0.18.1
pytest==6.2.5
python-dateutil==2.8.2
python-docx==0.8.11
pytz==2022.2.1
PyYAML==6.0
requests==2.28.0
simplejson==3.18.4
six==1.16.0
snapshot-phantomjs==0.0.3
SQLAlchemy==1.4.47
toml==0.10.2
tqdm==4.65.0
typing_extensions>=4.6.1
urllib3==1.26.12
wcwidth==0.2.6
websockets==10.4
Werkzeug==2.0.2
zipp==3.8.1
DingtalkChatbot==1.5.3
sqlacodegen
tablib
redis==3.2.0
tsmoothie==1.0.5
arrow==1.3.0
scikit-learn==1.3.2
retry
asynch==0.2.4
clickhouse-sqlalchemy==0.2.7
clickhouse-driver==0.2.9
geopandas==0.9.0
shapely==2.0.7
\ No newline at end of file
# -*- coding:utf-8 -*-
# @Author: Gaiting
# @Time: 2023/5/20 16:18
# @Function:
from framework.util.sql_db import SqlDb
from app.data_model.user_table import ElecUser
def auth(func):
def wrapper(self, userid, *args, **kwargs): #--》改了这里!
db_session = SqlDb().orm_session()
emp = db_session.query(ElecUser).filter(ElecUser.userid == userid).first()
try:
if emp.userid: pass
except Exception as e:
return -1
return func(self, userid, *args, **kwargs)
return wrapper
\ No newline at end of file
# coding: utf-8
from sqlalchemy import Column, DateTime, String, text
from sqlalchemy.dialects.mysql import INTEGER, TINYINT
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class ElecWarnClueAssignRecord(Base):
__tablename__ = 'elec_warn_clue_assign_records'
__table_args__ = {'comment': '报警线索下发记录'}
clue_code = Column(String(255), nullable=False)
from_user = Column(String(255), nullable=False)
created_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP"))
is_deleted = Column(TINYINT(4), server_default=text("'0'"))
id = Column(INTEGER(11), primary_key=True)
from_adm = Column(INTEGER(11), comment='发送该线索的管理机构id')
to_adm = Column(INTEGER(11), comment='接收该线索的管理机构id')
description = Column(String(1024), server_default=text(""))
# coding: utf-8
from sqlalchemy import Column, DateTime, String, text
from sqlalchemy.dialects.mysql import TINYINT
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
class ElecWarnClueList(Base):
__tablename__ = 'elec_warn_clue_list'
__table_args__ = {'comment': '报警线索表'}
code = Column(String(255), primary_key=True)
is_deleted = Column(TINYINT(4))
is_closed = Column(TINYINT(4))
created_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP"))
closed_time = Column(DateTime)
from_user = Column(String(255), nullable=False)
# coding: utf-8
from sqlalchemy import Column, DateTime, String, text
from sqlalchemy.dialects.mysql import INTEGER, TINYINT
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
metadata = Base.metadata
# sqlacodegen mysql://electricity_api_service:'GJlfh7&#jg'@mmservice-05.mysql.hotgrid.cn:3306/electricity_data?charset=utf8 --outfile=user_table.py --tables elec_user
class ElecUser(Base):
__tablename__ = 'elec_user'
__table_args__ = {'comment': '用电平台用户表'}
userid = Column(String(255), primary_key=True)
user_name = Column(String(255), comment='用户名')
password = Column(String(255), comment='密码')
role_level = Column(TINYINT(4), comment='角色')
adm = Column(String(255), comment='用户归属的平台名称')
is_deleted = Column(TINYINT(4), server_default=text("'0'"))
created_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP"))
updated_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP"))
adm_id = Column(INTEGER(11), nullable=False, comment='用户所属管理机构id')
phone = Column(String(11), nullable=False, comment='手机号')
is_send = Column(INTEGER(1), nullable=False, comment='是否发送短信进行验证码验证,0:不用,1:发送')
# -*- coding:utf-8 -*-
# @Time : 2022/8/26 16:05
import hashlib
import re
import json
import pandas as pd
import jieba
from framework.interface.abstract_api import AbstractApi
from marshmallow import Schema, fields
class EmergencyFileList(AbstractApi):
class GetSchema(Schema):
pass
def handle_get_request(self):
search_sql_data = f"""
select
IFNULL(ep.start_time,"未绑定") as plan_time,
IFNULL(ecc.created_at,'--') as 'create_time',
IFNULL(ep.plan_name,"未绑定") as plan_name,
plan_id as plan_id,
file_name,
count(*) as couonts_ents,
IFNULL(ep.plan_type,"未绑定") as plan_type
from elec_control_center ecc
left join elec_plan ep on ep.id = ecc.plan_id and ep.is_deleted=0
where file_name is not null
group by file_name
order by ecc.created_at desc;
;
"""
data_df = pd.DataFrame(self.sql_db.list(search_sql_data,"elec_db_2"))
return data_df.to_dict(orient='records')
\ No newline at end of file
# -*- coding:utf-8 -*-
# @Time : 2023/7/25 16:00
# @Author : Fushuyue
# @Owner : YSRD (Insights Value)
import datetime
from dateutil.rrule import rrule, MONTHLY
from dateutil.relativedelta import relativedelta
def get_between_datetime(beginDate, endDate, type, format_str="%Y-%m-%d %H:%M:%S"):
dateList = []
beginDate = datetime.datetime.strptime(beginDate, "%Y-%m-%d %H:%M:%S")
endDate = datetime.datetime.strptime(endDate, "%Y-%m-%d %H:%M:%S")
if type == 1: # 分钟
while beginDate <= endDate:
dateStr = beginDate.strftime(format_str)
dateList.append(dateStr)
beginDate += relativedelta(minutes=15)
elif type == 2: # 小时
while beginDate <= endDate:
dateStr = beginDate.strftime(format_str)
dateList.append(dateStr)
beginDate += relativedelta(hours=1)
elif type == 3: # 日
while beginDate <= endDate:
dateStr = beginDate.strftime(format_str)
dateList.append(dateStr)
beginDate += relativedelta(days=1)
elif type == 4: # 月
beginDate = datetime.datetime(beginDate.year, beginDate.month, 1)
while beginDate <= endDate:
dateStr = beginDate.strftime(format_str)
dateList.append(dateStr)
beginDate += relativedelta(months=1)
elif type == 5: # 季度
quarters = list(rrule(MONTHLY, dtstart=beginDate, until=endDate, bymonth=(1, 4, 7, 10)))
return [i.strftime(format_str) for i in quarters]
# return [f'{i.year}年-第{(i.month - 1) // 3 + 1}季度' for i in quarters]
elif type == 6: # 年
beginDate = datetime.datetime(beginDate.year, 1, 1)
while beginDate <= endDate:
dateStr = beginDate.strftime(format_str)
dateList.append(dateStr)
beginDate += relativedelta(years=1)
return dateList
def get_format_str(time_type):
if time_type == 1: # 刻钟
return "%m-%d %H:%M"
elif time_type == 2: # 小时
return '%m-%d %H'
elif time_type == 3: # 日
return '%Y-%m-%d'
elif time_type in (4, 5): # 月 or 季度
return '%Y-%m'
elif time_type == 6: # 年
return '%Y'
\ No newline at end of file
# -*- coding:utf-8 -*-
# @Time : 2023/7/5 9:37
# @Author : Fushuyue
# @Owner : YSRD (Insights Value)
import pandas as pd
def calc_total_duration(warn_df):
days, hours, minutes, max_duration, max_duration_data = 0, 0, 0, 0, None
def split_duration(x):
nonlocal days, hours, minutes, max_duration, max_duration_data
x_duration_minutes = 0
duration = x['duration']
if '天' in duration:
x_list = duration.split('天')
days += int(x_list[0])
x_duration_minutes += int(x_list[0]) * 24 * 60
duration = x_list[1]
if '小时' in duration:
x_list = duration.split('小时')
hours += int(x_list[0])
x_duration_minutes += int(x_list[0]) * 60
duration = x_list[1]
if '分' in duration:
x_list = duration.split('分')
minutes += int(x_list[0])
x_duration_minutes += int(x_list[0])
# 找出持续时长最大的一条报警
if x_duration_minutes > max_duration:
max_duration_data = x.to_dict()
max_duration = x_duration_minutes
warn_df.apply(func=split_duration, axis=1)
hours += minutes // 60
minutes = minutes % 60
days += hours // 24
hours = hours % 24
duration = f'{minutes}分'
if hours:
duration = f'{hours}小时' + duration
if days:
duration = f'{days}天' + duration
return duration, max_duration_data
def format_data(warn_list):
if not warn_list:
return list()
# 分组统计每个点位的:报警类型、最高报警级别、报警总时长、报警次数
warn_level_map = {1: '一级', 2: '二级', 3: '三级'}
res = list()
for key, val_df in pd.DataFrame(warn_list).groupby(
by=["ent_id", "dev_id", "dev_name", "warn_type_id", "warn_type_name"]):
total_duration, max_duration_data = calc_total_duration(val_df) # 计算报警总持续时长和持续时长最大的一条报警
res.append({
'ent_id': 0 if not key[0] else int(key[0]),
'dev_id': 0 if not key[1] else int(key[1]),
'dev_name': str(key[2]), # 报警设备
'warn_type_id': 0 if not key[3] else int(key[3]), # 报警类型
'warn_type_name': str(key[4]), # 报警类型
'warn_count': len(val_df), # 报警次数
'highest_level': warn_level_map[int(val_df['warn_level'].min())] if val_df['warn_level'].min() else 0, # 报警最高等级(一级为最高)
'total_duration': total_duration,
'warn_code': [str(i) for i in val_df['warn_code'].values],
'start_time': str(val_df['warn_time'].min()), # 最早开始报警时间
'end_time': str(val_df['end_time'].max()), # 最晚结束报警时间
'warn_data': sorted(val_df.to_dict(orient='records'), key=lambda x: x['warn_time'], reverse=True),
'max_duration_data': max_duration_data # 最大持续时长的一条报警信息
})
return res
# -*- coding:utf-8 -*-
# @Author: Gaiting
# @Time: 2022/9/12 17:40
# @Function:
from flask import request
import requests
from functools import wraps
from numpy import random
def fake_api(is_list=False):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
suffix=args[1] if len(args)>=2 else ""
fakeBaseUrl = "http://rap2api.taobao.org/app/mock/306541"
url = fakeBaseUrl + request.path + str(suffix)
rep = requests.request(method=request.method, url=url)
if is_list:
return rep.json().get('data')
else:
# 自定义修改动态返回值
if 'diy' in kwargs.keys() and kwargs['diy'] == True:
res = diy_response(kwargs, rep)
return res
return rep.json()
return rep.json()
return wrapper
return decorator
def diy_response(kwargs, rep):
if 'diy' in kwargs.keys() and kwargs['diy'] == True and kwargs['type'] == 'chart':
chart_data = {}
name = f'{kwargs["name"]}_chart_data'
value = list(map(lambda x: round(x * 100, 2), list(random.rand(24))))
chart_data[name] = {
'time': kwargs['time_list'],
'data': value
}
res = rep.json()
res.update(chart_data)
return res
elif 'diy' in kwargs.keys() and kwargs['diy'] == True and kwargs['type'] == 'location':
value = list(map(lambda x: round(x * 100, 2), list(random.rand(24))))
res = kwargs['data']
for i in res:
i['value'] = value[res.index(i)]
return res
import json
import requests
import xml.dom.minidom as xmldom
import os
import uuid
import zipfile
import shutil
from openpyxl import load_workbook
def isfile_exist(file_path):
if not os.path.isfile(file_path):
print("It's not a file or no such file exist ! %s" % file_path)
return False
else:
return True
# 复制并修改指定目录下的文件类型名,将excel后缀名修改为.zip
def copy_change_file_name(file_path, new_type='.zip'):
if not isfile_exist(file_path):
return ''
extend = os.path.splitext(file_path)[1] # 获取文件拓展名
if extend != '.xlsx' and extend != '.xls':
print("It's not a excel file! %s" % file_path)
return False
file_name = os.path.basename(file_path) # 获取文件名
new_name = str(file_name.split('.')[0]) + new_type # 新的文件名,命名为:xxx.zip
dir_path = os.path.dirname(file_path) # 获取文件所在目录
new_path = os.path.join(dir_path, new_name) # 新的文件路径
if os.path.exists(new_path):
os.remove(new_path)
return shutil.copyfile(file_path, new_path) # 返回新的文件路径,压缩包
# 解压文件
def unzip_file(zipfile_path):
if not isfile_exist(zipfile_path):
return False
if os.path.splitext(zipfile_path)[1] != '.zip':
print("It's not a zip file! %s" % zipfile_path)
return False
file_zip = zipfile.ZipFile(zipfile_path, 'r')
file_name = os.path.basename(zipfile_path) # 获取文件名
zipdir = os.path.join(os.path.dirname(zipfile_path), str(file_name.split('.')[0])) # 获取文件所在目录
for files in file_zip.namelist():
file_zip.extract(files, zipdir) # 解压到指定文件目录
file_zip.close()
return zipdir
# 读取解压后的文件夹,打印图片路径
def read_img(unzip_file_path):
img_dict = dict()
pic_dir = 'xl' + os.sep + 'media' # excel变成压缩包后,再解压,图片在media目录
pic_path = os.path.join(unzip_file_path, pic_dir) # 解压后图片所在绝对目录
file_list = os.listdir(pic_path) # 找到图片目录下的所有图片名称
for file in file_list:
# img_url = os.path.join(pic_path, file) # 获取图片本地完整路径
img_url = file_upload(img_path=os.path.join(pic_path, file)) # 将图片保存到服务服务器上
img_dict[file] = img_url
return img_dict
# 保存数据到服务器
def file_upload(img_path):
data = {'subsystem_code': 'electricity-service', 'subsystem_api': 'elec-picture'}
file_name = str(uuid.uuid4()) + '.' + img_path.split('.')[-1]
try:
with open(img_path, 'rb') as f:
r = requests.post(url="https://api-file-manage.airqualitychina.cn:9998/api/hold-filename-upload",
data=data,
files=[('files', (file_name, f.read()))])
file_url = "https://f.hotgrid.cn/" + json.loads(r.content)['result'][0]['url']
print(file_url)
except Exception as e:
print(e)
file_url = ''
return file_url
def get_img_pos_info(unzip_file_path, img_dict, drawing_idx):
"""解析xml 文件,获取图片在excel表格中的索引位置信息"""
# 1. 查询rid与图片的关联关系
xml_rel_dir = 'xl' + os.sep + 'drawings' + os.sep + '_rels' + os.sep + f'drawing{drawing_idx}.xml.rels'
xml_rel_path = os.path.join(unzip_file_path, xml_rel_dir)
img_info = parse_xml_rel(xml_rel_path)
# 2. 解析xml 文件, 返回图片索引位置信息
xml_dir = 'xl' + os.sep + 'drawings' + os.sep + f'drawing{drawing_idx}.xml'
xml_path = os.path.join(unzip_file_path, xml_dir)
image_info_dict = parse_xml(xml_path, img_dict, img_info)
return image_info_dict
# 解析xml关联文件并获取对应图片位置
def parse_xml_rel(file_name):
# 得到文档对象
image_info = dict()
dom_obj = xmldom.parse(file_name)
# 得到元素对象
element = dom_obj.documentElement
relationship = element.getElementsByTagName("Relationship")
for rel in relationship:
embed = rel.getAttribute('Id')
img = rel.getAttribute('Target').replace('../media/', '')
image_info[embed] = img
return image_info
# 解析xml文件并获取对应图片位置
def parse_xml(file_name, img_dict, img_info):
# 得到文档对象
new_image_info = dict()
dom_obj = xmldom.parse(file_name)
# 得到元素对象
element = dom_obj.documentElement
sub_twoCellAnchor = element.getElementsByTagName("xdr:twoCellAnchor")
if len(sub_twoCellAnchor) == 0:
sub_twoCellAnchor = element.getElementsByTagName("xdr:oneCellAnchor")
for anchor in sub_twoCellAnchor:
# xdr_from = anchor.getElementsByTagName('xdr:from')[0]
try:
col = anchor.getElementsByTagName('xdr:col')[0].firstChild.nodeValue # 获取标签间的数据
row = anchor.getElementsByTagName('xdr:row')[0].firstChild.nodeValue
embed = anchor.getElementsByTagName('a:blip')[0].getAttribute('r:embed') # 获取属性
# col = xdr_from.childNodes[0].firstChild.data # 获取标签间的数据
# row = xdr_from.childNodes[2].firstChild.data
# embed = \
# anchor.getElementsByTagName('xdr:pic')[0].getElementsByTagName('xdr:blipFill')[0].getElementsByTagName(
# 'a:blip')[0].getAttribute('r:embed') # 获取属性
if (int(row) + 1, int(col) + 1) in new_image_info:
new_image_info[(int(row) + 1, int(col) + 1)] += ',' + img_dict.get(img_info.get(embed, ''), '')
else:
new_image_info[(int(row) + 1, int(col) + 1)] = img_dict.get(img_info.get(embed, ''), '')
except Exception as e:
print(e)
return new_image_info
def parse_image(file):
# 先将文件保存到本地
file_path = os.path.join(os.getcwd(), file.filename)
file.save(file_path)
# 0. 获取压缩包文件路径
zip_file_path = copy_change_file_name(file_path)
if zip_file_path != '':
# 1. 获取解压文件路径
unzip_file_path = unzip_file(zip_file_path)
if isinstance(unzip_file_path, str):
# 2. 获取解压目录下所有的图片信息
img_dict = read_img(unzip_file_path)
import pandas as pd
df = pd.DataFrame(list(img_dict))
# 3. 加载excel文件
book = load_workbook(file_path)
drawing_idx = 1 # 用于累计drawing(i).xml.rels的数量
# 4. 将每个sheet中的图片替换为图片的url
for sheet_name in book.sheetnames:
sheet = book[sheet_name]
# 判断sheet中是否有图片,若有再替换
if hasattr(sheet, '_images') and sheet._images:
img_info_dict = get_img_pos_info(unzip_file_path, img_dict, drawing_idx=drawing_idx)
for key, val in img_info_dict.items():
sheet.cell(row=key[0], column=key[1], value=val)
drawing_idx += 1
# 删除旧的excel文件,保存替换图片url后新的excel文件
if os.path.exists(file_path):
os.remove(file_path)
book.save(file_path)
# 删除解压路径
if os.path.exists(unzip_file_path):
shutil.rmtree(unzip_file_path)
# 删除压缩包
if os.path.exists(zip_file_path):
os.remove(zip_file_path)
return file_path
if __name__ == '__main__':
# res = parse_image(
# file_path='C:\\Users\\admin\\Desktop\\reproject\\electricity-api-service\\src\\app\\v1\\file_upload\\顺义区用电企业数据问题(3).xlsx')
print(os.getcwd())
file_path = r'C:\Users\admin\Desktop\reproject\electricity-api-service\src\用电项目后台数据库录入用表.xlsx'
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
from framework.the_path import ThePath
from framework.environment import Environment
class ConfigLoader:
def __init__(self, env=None):
# env的优先级,参数 > Dockerfile中的ENV
# self.env = os.getenv('ENVIRONMENT', 'prod') if not env else env
self.env = Environment.env() if not env else env
# 统一从配置文件里读,摒弃获取环境变量和start_up直接赋予参数的方式的方式?
self._data = {
'log': ThePath.config(['default', 'log.yml']),
'flask': ThePath.config(['api', self.env, 'flask.py'])
}
def load(self):
env_log_path = ThePath.config(['api', self.env, 'log.yml'])
if os.path.exists(env_log_path):
self._data['log'] = env_log_path
env_flask_path = ThePath.config(['api', self.env, 'flask.py'])
if os.path.exists(env_flask_path):
self._data['flask'] = env_flask_path
@property
def data(self):
return self._data
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import importlib
import inspect
import os
import re
from flask import Flask, Blueprint
from flask_restful import Api
from framework.interface.abstract_api import AbstractApi
from framework.the_path import ThePath
class RegisterRouter:
def __get_default_url_name(self, cls_name):
p = re.compile(r'([a-z]|\d)([A-Z])')
return re.sub(p, r'\1-\2', cls_name).lower()
def __get_package_cls_data(self, file_name, package_name):
pkg_cls_data = []
package_name = package_name.replace('\\', '/')
if file_name.endswith('.py') and not file_name.startswith('__init__'):
module = importlib.import_module(f"{package_name.replace('/','.')}.{file_name[:-3]}")
cls_list = inspect.getmembers(module, inspect.isclass)
for cls_name, cls in cls_list:
if cls != AbstractApi and issubclass(cls, AbstractApi):
pkg_cls_data.append({'pkg_name': package_name, 'cls_name': cls_name, 'cls': cls})
return pkg_cls_data
def __register_api(self, cls_datum, restful_api: Api):
cls = cls_datum.get('cls')
url_names = getattr(cls, 'url_names')()
if url_names:
if isinstance(url_names, str):
restful_api.add_resource(cls, f"/{url_names}")
else:
restful_api.add_resource(cls, *[f"/{url_name}" for url_name in url_names])
else:
restful_api.add_resource(cls, f"/{self.__get_default_url_name(cls_datum.get('cls_name'))}")
def create_init_file(self, path):
init_file = os.path.join(path, '__init__.py')
if not os.path.exists(init_file):
with open(os.path.join(path, '__init__.py'), 'a') as f:
f.write('')
def register_router(self, app: Flask):
"""
@Time : 2022-05-24 12:00:00
@Author : Feihong Wang
@Owner : YSRD (Insights Value)
曹科在运行pytest时,
如果app下某个文件夹中没有__init__.py文件
Blueprint(blueprint_name, package_name)会报错:
TypeError: expected str, bytes or os.PathLike object, not NoneType
而执行runserver文件时不会如此
梳理一遍原因:
Blueprint中获取root_path的关键代码:
mod = sys.modules.get(import_name)
if mod is not None and hasattr(mod, "__file__"):
return os.path.dirname(os.path.abspath(mod.__file__))
if hasattr(loader, "get_filename"):
filepath = loader.get_filename(import_name)
原因是,pytest的每个测试文件都会通过contest.py执行create_app('test')
所以会执行多次register_router函数
进而多次执行这个函数:Blueprint(blueprint_name, package_name),
查看 Blueprint 源码,发现初始化时需要 运算root_path变量
第一次 Blueprint 检测到 package_name 未被import
sys.modules.get(package_name)返回None,
最终Blueprint.root_path 会得到loader.get_filename(import_name),且不报错
接下来执行 self.__get_package_cls_data(file, package_name),模块被导入
sys.modules.get(package_name)得到这种类型的数据:
模块文件夹有__init__.py文件会获得这样的root_path:
<module 'app.demo' from '/Users/wangfeihong/Desktop/yard-base/src/app/demo/__init__.py'>
如果文件夹中没有__init__.py会获得这样的root_path:
<module 'app.vtestt.action' (namespace)>,
这种由于没有__init__.py文件也不会被self.__get_package_cls_data当成模块引入
第二次再执行 Blueprint(blueprint_name, package_name),
此时 package_name 已经被导入,mod = sys.modules.get(import_name)!=None,
root_path 为 os.path.dirname(os.path.abspath(mod.__file__))
由于 <module 'app.vtestt.action' (namespace)>(命名空间的模块,和普通模块的区别应该就是没有__init__.py)
这种类型没有__file__属性,所以报错 TypeError: expected str, bytes or os.PathLike object, not NoneType
两种解决方法
1. Blueprint(blueprint_name, package_name, root_path=''),暂时不知道root_path有什么用,不过程序运行正常
2. 检测文件夹有没有__init__.py文件,如果没有则新建
"""
app_path = ThePath.app()
src_path = ThePath.src()
for dir_path, dir_names, files in os.walk(app_path):
package_name = dir_path.replace(src_path, '').replace('\\', '.').replace('/', '.')[1:]
blueprint_name = dir_path.replace(app_path, '').replace('\\', '/')
if '__pycache__' in blueprint_name or blueprint_name == '':
continue
self.create_init_file(dir_path)
blueprint = Blueprint(blueprint_name, package_name)
restful_api = Api(blueprint)
for file in files:
pkg_cls_data = self.__get_package_cls_data(file, package_name)
if len(pkg_cls_data) == 0:
continue
for cls_datum in pkg_cls_data:
self.__register_api(cls_datum, restful_api)
if len(restful_api.endpoints) > 0:
app.register_blueprint(blueprint, url_prefix=f"/{blueprint_name}")
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
class MimeType:
@staticmethod
def images():
return (
MimeType.IMAGE_BMP,
MimeType.IMAGE_JPG,
MimeType.IMAGE_GIF,
MimeType.IMAGE_TIFF,
MimeType.IMAGE_PNG
)
IMAGE_BMP = 'image/bmp'
IMAGE_JPG = 'image/jpeg'
IMAGE_GIF = 'image/gif'
IMAGE_TIFF = 'image/tiff'
IMAGE_PNG = 'image/png'
AUDIO_MP4 = 'audio/mp4'
AUDIO_MPEG = 'audio/mpeg'
AUDIO_OGG = 'audio/ogg'
TEXT_CSV = 'text/csv'
VIDEO_MP4 = 'video/mp4'
VIDEO_OGG = 'video/ogg'
VIDEO_QUICKTIME = 'video/quicktime'
APPLICATION_PDF = 'application/pdf'
APPLICATION_GZIP = 'application/gzip'
APPLICATION_MSWORD = 'application/msword'
APPLICATION_ZIP = 'application/zip'
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from enum import Enum
class RequestMethod(Enum):
GET = 'get',
POST = 'post'
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from enum import Enum, unique
@unique
class RespCode(Enum):
"""
状态码常量类
系统错误: 1000
正常:2000
通用异常:3000
数据异常:4000
"""
INNER_ERROR = 1001
PLEASE_TRY_LATER = 1002
OK = 2000
TOKEN_INVALID = 3001
TOKEN_EXPIRED = 3002
PARAM_LACK = 3003
PARAM_WRONG_FORMAT = 3004
UNAUTHORIZED = 3005
POST_REPEAT = 3007
REQUEST_METHOD_INVALID = 3008
INVALID_PARAM = 3009
NOT_SUPPORT = 3010
NOT_FOUND_DATA = 3011
INVALID_OPERATION = 3012
INVALID_JSON_FORMAT = 3013
DATA_ERROR = 4001
NOT_FOUND_USER = 4004
def desc(self):
return _RespCodeDesc.desc_dict.get(self)
class _RespCodeDesc:
desc_dict = {
RespCode.INNER_ERROR: '内部错误',
RespCode.PLEASE_TRY_LATER: '操作失败,请稍后重试',
RespCode.OK: "访问成功",
RespCode.TOKEN_INVALID: '非法的token',
RespCode.TOKEN_EXPIRED: 'token已失效',
RespCode.PARAM_LACK: '缺失参数',
RespCode.PARAM_WRONG_FORMAT: '参数格式不正确',
RespCode.UNAUTHORIZED: '未授权',
RespCode.POST_REPEAT: '重复提交',
RespCode.REQUEST_METHOD_INVALID: '非法的请求方式',
RespCode.INVALID_PARAM: '非法的请求参数',
RespCode.NOT_SUPPORT: '不支持该操作',
RespCode.NOT_FOUND_DATA: '找不到数据',
RespCode.INVALID_OPERATION: '无效操作',
RespCode.INVALID_JSON_FORMAT: '错误的JSON格式',
RespCode.DATA_ERROR: '数据错误',
RespCode.NOT_FOUND_USER: 'user_code 未知'
}
# @Time : 2022/7/6 10:02
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import logging
def deprecated(func, after_version=None, help_msg=''):
"""
用于提醒开发人员,使用的函数将会被废弃
Args:
func:
after_version: 在某个版本后废弃
help_msg: 用于告诉开发人员替代的方法
Returns:
"""
def wrap_func(*args, **kwargs):
if after_version:
logging.warning(f"函数{func.__name__}将会在版本{after_version}后废弃,请谨慎使用!{help_msg}")
else:
logging.warning(f"函数{func.__name__}未来将会废弃,请谨慎使用!{help_msg}")
return func(*args, **kwargs)
return wrap_func
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import threading
def synchronized(func):
func.__lock__ = threading.Lock()
def lock_func(*args, **kwargs):
with func.__lock__:
return func(*args, **kwargs)
return lock_func
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
import yaml
from framework.the_path import ThePath
from framework.decorator.synchronized import synchronized
class Environment(object):
@synchronized
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self.env = os.getenv('ENVIRONMENT', 'prod')
# with open(ThePath.config(['default', 'env.yml'])) as f:
# self._config = yaml.load(f,Loader=yaml.FullLoader)
@staticmethod
def is_prod() -> bool:
return Environment().env == 'prod'
@staticmethod
def is_pre() -> bool:
return Environment().env == 'pre'
@staticmethod
def is_dev() -> bool:
return Environment().env == 'dev'
@staticmethod
def env() -> str:
return Environment().env
# class Environment(object):
#
# @synchronized
# def __new__(cls, *args, **kwargs):
# if not hasattr(cls, '_instance'):
# cls._instance = super().__new__(cls)
# return cls._instance
#
# def __init__(self):
# with open(ThePath.config(['default', 'env.yml'])) as f:
# self._config = yaml.load(f,Loader=yaml.FullLoader)
#
# @staticmethod
# def is_prod() -> bool:
# return Environment()._config.get('env') == 'prod'
#
# @staticmethod
# def is_pre() -> bool:
# return Environment()._config.get('env') == 'pre'
#
# @staticmethod
# def is_dev() -> bool:
# return Environment()._config.get('env') == 'dev'
#
# @staticmethod
# def env() -> str:
# return Environment()._config.get('env')
# print(Environment.is_prod())
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
class UnimplementedException(Exception):
"""
未实现异常
"""
pass
\ No newline at end of file
import os
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from flask import Response, g
from framework.interface.abstract_after_request import AbstractAfterRequest
class ClearInterceptor(AbstractAfterRequest):
@staticmethod
def priority():
return 10
def handle(self, resp: Response) -> Response:
if hasattr(g, 'upload_tmp_files'):
for item in g.upload_tmp_files:
tmp_file_path = item.get('tmp_file_path')
if item.get('clear_after_request', False) and os.path.exists(tmp_file_path):
os.remove(tmp_file_path)
return resp
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import importlib
import inspect
import os
from framework.interface.abstract_after_request import AbstractAfterRequest
from framework.interface.abstract_before_request import AbstractBeforeRequest
from framework.the_path import ThePath
class InterceptorLoader:
INTERCEPTOR_PKG_NAME = 'interceptor'
@staticmethod
def __get_file_instance_list(file_name, package_name):
before_list = []
after_list = []
if file_name.endswith('.py') and not file_name.startswith('__init__'):
module = importlib.import_module(f"{package_name}.{file_name[:-3]}")
cls_list = inspect.getmembers(module, inspect.isclass)
for cls_name, cls in cls_list:
if cls != AbstractBeforeRequest and issubclass(cls, AbstractBeforeRequest):
before_list.append(cls())
if cls != AbstractAfterRequest and issubclass(cls, AbstractAfterRequest):
after_list.append(cls())
return before_list, after_list
@staticmethod
def load_interceptors():
src_path = ThePath.src()
before_list = []
after_list = []
for dir_path, dir_names, files in os.walk(src_path):
dir_path = dir_path.replace('\\','/')
if InterceptorLoader.INTERCEPTOR_PKG_NAME in dir_names:
pkg_name = f"{dir_path.replace(src_path, '').replace('/', '.')[1:]}.{InterceptorLoader.INTERCEPTOR_PKG_NAME}"
for file in os.listdir(os.path.join(dir_path, InterceptorLoader.INTERCEPTOR_PKG_NAME)):
in_file_before_list, in_file_after_list = InterceptorLoader.__get_file_instance_list(file, pkg_name)
before_list = before_list + in_file_before_list
after_list = after_list + in_file_after_list
return sorted(before_list, key=lambda x: x.priority(), reverse=True), sorted(after_list, key=lambda x: x.priority(), reverse=True)
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import json
from flask import request, g
from framework.interface.abstract_before_request import AbstractBeforeRequest
class ParamInterceptor(AbstractBeforeRequest):
@staticmethod
def priority():
return 10
def handle(self):
# ParamInterceptor.__fill_params_to_g()
pass
@staticmethod
def __fill_params_to_g():
params = {}
for key, value in request.args.items():
if key == 'json':
data = json.loads(value)
if isinstance(data, dict):
params.update(data)
else:
params['error'] = '不是合法的json'
raise ValueError('不是合法的json')
else:
params[key] = ParamInterceptor.__decode_if_json(value)
# request.json: body中的raw,格式为application/json的数据
if request.json:
params.update(request.json)
# request.form: body中的form-data数据
if request.form:
params.update(request.form)
g.params = params
@staticmethod
def __decode_if_json(value: str):
try:
data = json.loads(value)
return data
except ValueError:
return value
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import abc
from abc import ABCMeta
from flask import Response
class AbstractAfterRequest(metaclass=ABCMeta):
@abc.abstractmethod
def handle(self, resp: Response) -> Response:
return resp
@staticmethod
def priority():
"""
执行优先级,从大到小执行
@return:
"""
return 0
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
import re
import json
import logging
from flask import g, Response, request, current_app
import redis
from flask_restful import Resource
from jsonschema.exceptions import ValidationError
from framework.constant.request_method import RequestMethod
from framework.constant.resp_code import RespCode
from framework.decorator.deprecated import deprecated
from framework.interface.abstract_view_model import AbstractViewModel
from framework.util.api_util import ApiUtil
from framework.util.request_json_encoder import RequestJsonEncoder
from framework.util.resp_util import RespUtil
from framework.util.sql_db import SqlDb
from framework.vo.resp_result import RespResult
from werkzeug import exceptions
import werkzeug
from marshmallow import Schema, fields, ValidationError
class AbstractApi(Resource):
"""
API抽象类,所有api都应继承此类
"""
# 定义参数规则
class GetSchema(Schema):
pass
class PostSchema(Schema):
pass
def redis_conn(self):
return redis.Redis(
host=current_app.config['REDIS_HOST'],
port=current_app.config['REDIS_PORT'],
password=current_app.config['REDIS_PWD'],
db=current_app.config['REDIS_DB'])
def get_domain(self):
domain_map = {
'pre': 'https://electricity-api-service-pre.airqualitychina.cn',
'test': 'http://electricity-api-service.test.hotgrid.cn',
# 'test': 'https://electricity-api-service-test.airqualitychina.cn', # 已弃用
'prod': 'http://electricity-api-service.bjmemc.hotgrid.cn'
}
if os.getenv('ENV'):
domain = domain_map[os.getenv('ENV')]
else:
from flask import request
domain = request.host_url # 本地运行
return domain
def __init__(self, *args, **kwargs):
self.sql_db = SqlDb()
self.redis = self.redis_conn()
self._logger = logging.getLogger(__name__)
self.domain = self.get_domain() # 获取当前环境的域名
super().__init__(*args, **kwargs)
def decode_params(self, schema):
"""
接收请求参数, 兼容?json={...} 和 key=value 格式
1、 请求参数中有json字段
(1)schema中有json字段,则接收json字段给schema进行验证
(2)schema中没有json字段,则接收json中的字段给schema进行验证
2、请求参数中没有json字段
(1)接收所有请求参数给schema进行验证
"""
if request.method == "GET":
if 'json' in request.values:
if 'json' in schema._declared_fields.keys():
params = request.values
else:
params = request.args.get("json")
params = re.sub('\'', '\"', params)
params = json.loads(params)
else:
params = dict(request.values)
else:
if request.json:
params = request.get_json()
else:
params = dict(request.values)
return params
def __handle_and_check_params(self):
if request.method == 'GET':
schema = self.GetSchema
else:
schema = self.PostSchema
params = self.decode_params(schema)
# params = request.values
# params = {}
# try:
# params = request.json
# # 以json为格式的请求
# except:
# if request.values:
# params = request.values
# if params == None:
# """
# 这里因为在k8s环境上
# g.params = schema().load(params)无法处理params=None的情况
# 会报错 '_schema': ['Invalid input type.']
# 目前不知道是哪个包的版本问题,所以暂时用这个方法解决
# """
# params = {}
try:
g.params = schema().load(params)
except ValidationError as error:
self._logger.warning(str(error))
return RespUtil.invalid_param(str(error))
@staticmethod
def url_names():
return None
def get(self):
return self.__handle_and_resp(RequestMethod.GET)
def post(self):
return self.__handle_and_resp(RequestMethod.POST)
def __handle_and_resp(self, method: RequestMethod):
# 先对请求进行参数校验,非法则返回,后续添加报错msg
check_params_result = self.__handle_and_check_params()
if isinstance(check_params_result, RespResult):
return check_params_result.build()
try:
if method == RequestMethod.POST:
if current_app.config['_API_NICE_RESP']:
result = self.nice_handle_post_req()
else:
result = self.handle_post_request()
if result is None:
result = {}
else:
if current_app.config['_API_NICE_RESP']:
result = self.nice_handle_get_req()
else:
result = self.handle_get_request()
if isinstance(result, RespResult):
return result.build()
if isinstance(result, Response):
return result
if isinstance(result, werkzeug.wrappers.response.Response):
return result
return RespUtil.ok(json.loads(json.dumps(result, cls=RequestJsonEncoder))).build()
except ValidationError as e:
self._logger.error(RespCode.INVALID_JSON_FORMAT.desc(), exc_info=True)
return RespUtil.invalid_json_format(e.args[0]).build()
except Exception as e:
self._logger.error("内部异常", exc_info=True)
return RespUtil.inner_error(e.args[0]).build()
@deprecated
def handle_get_request(self):
return RespUtil.request_method_invalid()
@deprecated
def handle_post_request(self):
return RespUtil.request_method_invalid()
def nice_handle_get_req(self) -> AbstractViewModel:
log = f'{self.__class__.__name__}类nice_handle_get_req方法未重写'
logging.error(log)
raise Exception(log)
def nice_handle_post_req(self) -> AbstractViewModel:
log = f'{self.__class__.__name__}类nice_handle_post_req方法未重写'
logging.error(log)
raise Exception(f'{self.__class__.__name__}类nice_handle_post_req方法未重写')
@staticmethod
def get_params():
if hasattr(g, 'params'):
return g.params
return {}
@staticmethod
def get_param(key, default=None):
return AbstractApi.get_params().get(key, default)
@staticmethod
def tmp_save_req_files(allowed_mimetypes=None, clear_after_request=True):
return ApiUtil.tmp_save_req_files(allowed_mimetypes, clear_after_request)
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import abc
from abc import ABCMeta
class AbstractBeforeRequest(metaclass=ABCMeta):
@abc.abstractmethod
def handle(self):
pass
@staticmethod
def priority():
"""
执行优先级,从大到小执行
@return:
"""
return 0
# @Time : 2022/7/6 10:32
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import logging
from abc import ABCMeta
from flask import current_app
from framework.exception import UnimplementedException
class AbstractViewModel(metaclass=ABCMeta):
"""
视图模型
"""
def __init__(self, raw_result):
self._logger = logging.getLogger(__name__)
self.raw_result = raw_result
self.result = self.nice_result()
def nice_result(self):
if current_app.config['_API_NICE_RESP']:
return self.process(self.raw_result)
self._logger.warning(f"未实现process方法,view model层不起作用!")
return self.raw_result
def process(self, raw_result):
"""
之类应重写该方法以返回良好的结果
Args:
raw_result:
Returns:
"""
self._logger.error("需实现__process()函数以对返回给前端的结果进行处理!!!")
raise UnimplementedException
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
from abc import ABCMeta
from collections.abc import Iterable
class ThePath(metaclass=ABCMeta):
@staticmethod
def root():
return os.path.dirname(os.path.dirname(os.path.dirname(__file__))).replace('\\', '/')
@staticmethod
def src():
return os.path.join(ThePath.root(), 'src').replace('\\', '/')
@staticmethod
def test():
return os.path.join(ThePath.root(), 'test').replace('\\', '/')
@staticmethod
def app():
return os.path.join(ThePath.src(), 'app').replace('\\', '/')
@staticmethod
def bin():
return os.path.join(ThePath.root(), 'bin').replace('\\', '/')
@staticmethod
def demo():
return os.path.join(ThePath.root(), 'demo').replace('\\', '/')
@staticmethod
def config(path=None):
config_path = os.path.join(ThePath.root(), 'config')
if path:
if isinstance(path, str):
return os.path.join(config_path, path)
elif isinstance(path, Iterable):
for p in path:
config_path = os.path.join(config_path, p)
return config_path
return config_path.replace('\\', '/')
@staticmethod
def tmp(path=None):
root = ThePath.root()
tmp_path = os.path.join(root, 'tmp')
if path:
result = os.path.join(tmp_path, path)
else:
result = tmp_path
if not os.path.exists(result):
os.makedirs(result)
return result.replace('\\', '/')
@staticmethod
def doc():
return os.path.join(ThePath.root(), 'doc').replace('\\', '/')
@staticmethod
def font(name):
return os.path.join(ThePath.root(), f'font/{name}').replace('\\', '/')
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
import uuid
from flask import request, g
from framework.the_path import ThePath
class ApiUtil:
@staticmethod
def tmp_save_req_files(allowed_mimetypes=None, clear_after_request=True):
"""
把本次请求中上传的文件存到临时目录中
@param allowed_mimetypes:
允许上传的mimetype,可在api/framework/constant/mimetype.py中找到定义好的值,如果为None,则所有类型的文件都将被保存到临时目录中
@param clear_after_request:
是否在请求结束后自动删除临时文件,True表示删除
@return:
返回一个列表,列表中以dict的方式返回每个上传文件的:
1. 原始名称(filename)
2. 临时文件名(tmp_filename)
3. 临时文件路径(tmp_file_path)
4. 文件大小(size)
"""
results = []
if not request.files:
return results
file_path = ThePath.tmp('post_files')
file_storage_list = request.files
for key in file_storage_list.keys():
for file_storage in file_storage_list.getlist(key):
filename = file_storage.filename
mimetype = file_storage.mimetype
if allowed_mimetypes and mimetype not in allowed_mimetypes:
continue
tmp_filename = str(uuid.uuid4())
tmp_file_path = os.path.join(file_path, tmp_filename).replace('\\', '/')
file_storage.save(tmp_file_path)
results.append({
'filename': filename,
'tmp_filename': tmp_filename,
'tmp_file_path': tmp_file_path,
'size': os.path.getsize(tmp_file_path),
'clear_after_request': clear_after_request
})
g.upload_tmp_files = results
return results
@staticmethod
def get_raw_file_storage_list(allowed_mimetypes=None):
"""
获取本次请求中上传文件的原始FileStorage列表
@param allowed_mimetypes: 允许上传的mimetype,可在api/framework/constant/mimetype.py中找到定义好的值
@return: 上传文件的原始FileStorage列表
"""
results = []
if not request.files:
return results
file_storage_list = request.files
for key in file_storage_list.keys():
for file_storage in file_storage_list.getlist(key):
mimetype = file_storage.mimetype
if allowed_mimetypes and mimetype not in allowed_mimetypes:
continue
results.append(file_storage)
return results
@staticmethod
def keep_decimals(line, fields,number=2):
print(line)
for i in fields:
line[i] = round(line[i], number)
return line
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import base64
import hashlib
import hmac
import logging
from urllib import parse
import time
import requests
from configparser import ConfigParser
class DingTalkUtil:
"""
发送钉钉机器人消息
"""
def __init__(self, config_file_path):
self._logger = logging.getLogger(__name__)
cfg = ConfigParser()
cfg.read(config_file_path)
self.cfg = cfg
def send_msg(self, msg, mobiles: list = None, section='basic', is_at_all=0):
if mobiles is None:
mobiles = str(self.cfg.get(section, 'mobiles')).split(',')
timestamp = str(round(time.time() * 1000))
at = {"isAtAll": is_at_all}
if isinstance(mobiles, list) and len(mobiles) > 0:
at['atMobiles'] = mobiles
params = {
"msgtype": "text", "text": {"content": msg}, "at": at
}
url = f"{self.cfg.get(section, 'url')}&timestamp={timestamp}&sign={self.__generate_sign(timestamp, section)}"
resp = requests.post(url=url, json=params)
self._logger.info(f"Send msg successfully, resp: {resp.status_code}, {resp.text}")
def __generate_sign(self, timestamp, section):
secret = self.cfg.get(section, 'secret')
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, secret)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = parse.quote_plus(base64.b64encode(hmac_code))
return sign
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import json
from datetime import date, datetime
from decimal import Decimal
# import demjson
from sqlalchemy.ext.declarative import DeclarativeMeta
from framework.vo.view_object import ViewObject
class RequestJsonEncoder(json.JSONEncoder):
def default(self, obj):
# sl加对model对象的转换
if isinstance(obj.__class__, DeclarativeMeta):
fields = {}
for field in [x for x in dir(obj) if
not x.startswith('_') and x != 'metadata' and x != 'query' and x != 'query_class']:
data = obj.__getattribute__(field)
try:
if data is None:
fields[field] = ''
elif isinstance(data, ViewObject):
# json.dumps(data,cls=ComplexEncoder)
fields[field] = json.dumps(data, ensure_ascii=False, cls=RequestJsonEncoder)
# fields[field] =json.dumps(data, default=lambda obj: obj.__dict__)
else:
fields[field] = data
except TypeError:
fields[field] = data
return fields
# sl加对普通对象的转换
# 因为demjson的安装经常会失败,默认关闭demjson功能,有需要的子系统可以自行开启
# elif isinstance(obj, ViewObject):
# return demjson.decode(json.dumps(obj, ensure_ascii=False, default=lambda obj1: obj1.__dict__))
elif isinstance(obj, datetime):
return obj.strftime('%Y-%m-%d %H:%M:%S')
elif isinstance(obj, date):
return obj.strftime('%Y-%m-%d')
elif isinstance(obj, Decimal):
return str(obj)
else:
return json.JSONEncoder.default(self, obj)
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from framework.vo.resp_result import RespResult
from framework.constant.resp_code import RespCode
class RespUtil(object):
@staticmethod
def ok(data=None) -> RespResult:
return RespResult(RespCode.OK, {} if data is None else data)
@staticmethod
def inner_error(msg='') -> RespResult:
return RespResult(RespCode.INNER_ERROR, msg)
@staticmethod
def please_try_later() -> RespResult:
return RespResult(RespCode.PLEASE_TRY_LATER, {})
@staticmethod
def cannot_find_microdev_type_code() -> RespResult:
return RespResult(RespCode.CANNOT_FIND_MICRODEV_TYPE_CODE, {})
@staticmethod
def microdev_not_in_work_order() -> RespResult:
return RespResult(RespCode.MICRODEV_NOT_IN_WORK_ORDER, {})
@staticmethod
def microdev_unauthorize_offline() -> RespResult:
return RespResult(RespCode.MICRODEV_UNAUTHORIZE_OFFLINE, {})
@staticmethod
def devsite_not_bind_microdev() -> RespResult:
return RespResult(RespCode.DEVSITE_NOT_BIND_MICRODEV, {})
@staticmethod
def post_repeat() -> RespResult:
return RespResult(RespCode.POST_REPEAT, {})
@staticmethod
def request_method_invalid() -> RespResult:
return RespResult(RespCode.REQUEST_METHOD_INVALID, {})
@staticmethod
def param_lack(msg='') -> RespResult:
return RespResult(RespCode.PARAM_LACK, msg)
@staticmethod
def unauthorized() -> RespResult:
return RespResult(RespCode.UNAUTHORIZED, {})
@staticmethod
def not_found_user() -> RespResult:
return RespResult(RespCode.NOT_FOUND_USER, {})
@staticmethod
def invalid_param(msg='') -> RespResult:
return RespResult(RespCode.INVALID_PARAM, msg)
@staticmethod
def data_error(msg='') -> RespResult:
return RespResult(RespCode.DATA_ERROR, msg)
@staticmethod
def not_support(msg='') -> RespResult:
return RespResult(RespCode.NOT_SUPPORT, msg)
@staticmethod
def not_found_data(msg='') -> RespResult:
return RespResult(RespCode.NOT_FOUND_DATA, msg)
@staticmethod
def invalid_operation(msg='') -> RespResult:
return RespResult(RespCode.INVALID_OPERATION, msg)
@staticmethod
def invalid_json_format(msg='') -> RespResult:
return RespResult(RespCode.INVALID_JSON_FORMAT, msg)
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import logging
from flask import current_app
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import pandas as pd
from framework.decorator.synchronized import synchronized
import socket
class SqlDb:
# 获取中心地区的IP地址 用于识别中心 自动转中心数据库IP连接
@staticmethod
def get_host_ip():
try:
# 创建一个 socket 对象
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
# 不需要建立连接,只是为了让操作系统产生一个网络流量,从而获取本机IP
s.connect(("8.8.8.8", 80))
ip = s.getsockname()[0]
finally:
s.close()
return ip
@synchronized
def __new__(cls, *args, **kwargs):
if not hasattr(cls, '_instance'):
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
self._db = current_app.config['_SQLALCHEMY_INSTANCE']
self._logger = logging.getLogger(__name__)
def get_engine(self,bind=None):
'''
bind设为None,返回的是默认的数据库 app.config['SQLALCHEMY_DATABASE_URI'],否则为app.config['SQLALCHEMY_BINDS']里,
'''
return self._db.get_engine(app=current_app,bind="elec_db_2")
def __good_sql(self, sql):
if isinstance(sql, str):
return text(sql)
return sql
def __build_in_condition(self, param_prefix, values):
param_names = []
params = {}
index = 0
for value in values:
param_name = f"{param_prefix}_{index}"
param_names.append(f":{param_name}")
params[param_name] = value
index = index + 1
return ','.join(param_names), params
def __execute(self, connection, sql, params):
if params and isinstance(params, dict):
for key, value in params.copy().items():
if isinstance(sql, str) and (isinstance(value, list) or isinstance(value, tuple)):
stmt, stmt_params = self.__build_in_condition(key, value)
sql = sql.replace(f":{key}", f"({stmt})")
params.update(stmt_params)
return connection.execute(self.__good_sql(sql), params)
def list(self, sql, params=None, bind=None):
with self.get_engine(bind).begin() as connection:
result = self.__execute(connection, sql, params)
return [x._asdict() for x in result.all()]
def list2(self, sql, params=None, bind=None):
with self.get_engine(bind).begin() as connection:
result = self.__execute(connection, sql, params)
data_list = [x._asdict() for x in result.all()]
output = {}
keydict = {}
# 处理所有可能的键值对
for key_group in output.keys():
if isinstance(output[key_group], dict):
for key in output[key_group]:
new_key = output[key_group][key]
new_value = output.get('maintance_value', {}).get(key, None)
keydict[new_key] = new_value
# 将结果转换为不同key存放的list形式
combined_dict = {}
for item in data_list:
for key, value in item.items():
if key not in combined_dict:
combined_dict[key] = []
combined_dict[key].append(value)
return combined_dict
def value(self, sql, params=None, bind=None):
with self.get_engine(bind).begin() as connection:
result = self.__execute(connection, sql, params)
return result.scalar()
def orm_session(self):
my_engine = self.get_engine()
Session = sessionmaker(bind=my_engine)
db_session = Session()
return db_session
def first(self, sql, params=None, bind=None):
with self.get_engine(bind).begin() as connection:
result = self.__execute(connection, sql, params)
data = result.first()
if data is None:
return None
return data._asdict()
def __process_value(self, value):
if value is None:
return 'null'
if isinstance(value, str):
value = value.replace("'", "''")
return f"'{value}'"
return value
def __build_sql_for_insert(self, table_name, col_names, col_values, replace):
col_str = ','.join([f"`{x}`" for x in col_names])
if not (col_names and col_values):
self._logger.debug(f"col_names({col_names})和col_values({col_values})不能为空,请检查!")
return False
sql = f"{'replace' if replace else 'insert'} into {table_name} ({col_str}) values"
if isinstance(col_values[0], tuple):
sql = sql + ','.join([f"({','.join(self.__process_value(x) for x in item)})" for item in col_values])
else:
sql = sql + f"({','.join(self.__process_value(x) for x in col_values)})"
return sql
def insert(self, table_name, data, method='append', bind=None):
"""
从原有的拼接sql修改为用pandas,可以方式sql注入
Args:
table_name:表名
data:需要插入的数据,list类型
method: append,replace,fail
Returns:
如果插入成功返回True,否则返回错误信息
"""
if not isinstance(data, list):
self._logger.warning(f"data应为list时,略过此元素")
try:
df = pd.DataFrame(data)
print(df)
with self.get_engine(bind).begin() as connection:
df.to_sql(name=table_name, con=self.get_engine(bind), if_exists=method, index=False)
except Exception as e:
return e
return True
def update(self, table_name: str, data: dict, where: str, params=None, bind=None):
"""
更新数据
Args:
table_name:表名
data:需要更新的数据,字典类型
where:更新条件,严禁拼接参数构造条件字符串,使用占位符或者占位名
params:参数
Returns:
如果更新成功,返回True,否则返回False
"""
if not where:
self._logger.error("where必须非空,更新数据要传递条件")
return False
sql = f"update {table_name} set "
sql = sql + ",".join([f"`{key}`={self.__process_value(value)}" for key,value in data.items()])
sql = f"{sql} where {where}"
self._logger.debug(f"update sql: {sql}")
with self.get_engine(bind).begin() as connection:
if params is None:
params = {}
result = connection.execute(text(sql), **params)
return result.rowcount > 0
def delete(self, table_name: str, where: str, params=None, bind=None):
if not where:
self._logger.error("where必须非空,删除数据要传递条件")
sql = f"delete from {table_name} where {where}"
self._logger.debug(f"delete sql: {sql}")
with self.get_engine(bind).begin() as connection:
if params is None:
params = {}
result = connection.execute(text(sql), **params)
return result.rowcount > 0
def query(self, sql, params=None, bind=None):
with self.get_engine(bind).begin() as connection:
self.__execute(connection, sql, params)
def insert_else(self, table_name, data, bind=None):
if data:
columns = ','.join([f"`{key}`" for key in data.keys()])
values = ','.join([f"{self.__process_value(value)}" for value in data.values()])
sql = f"insert into {table_name}({columns}) values({values})"
self._logger.debug(f"insert sql: {sql}")
with self.get_engine(bind).begin() as connection:
result = connection.execute(text(sql))
return result.lastrowid
def bulk_insert(self, table_name, data, bind=None):
values = ', '.join([f"({', '.join(str(self.__process_value(value)) for value in row.values())})" for row in data])
columns = ', '.join(data[0].keys())
sql = f"REPLACE INTO {table_name}({columns}) VALUES {values}"
self._logger.debug(f"bulk insert sql: {sql}")
with self.get_engine(bind).begin() as connection:
result = connection.execute(text(sql))
return result.rowcount > 0
from framework.constant.resp_code import RespCode
class RespResult(object):
def __init__(self, resp_code: RespCode, result: object):
self._resp_code = resp_code
self._result = result
def build(self):
return {
'code': self._resp_code.value,
'msg': self._resp_code.desc(),
'result': self._result
}
class ViewObject(object):
pass
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import logging.config
import os
import traceback
import yaml
from flask import Flask, Response
from flask_cors import CORS
from flask_sqlalchemy import SQLAlchemy
from framework.bootstrap.config_loader import ConfigLoader
from framework.bootstrap.register_router import RegisterRouter
from framework.interceptor.interceptor_loader import InterceptorLoader
from framework.the_path import ThePath
def _init_interceptor(app: Flask):
"""
注册拦截器
@param app:
@return:
"""
before_list, after_list = InterceptorLoader.load_interceptors()
@app.before_request
def before_request():
for instance in before_list:
try:
instance.handle()
except Exception as e:
logging.error(f"执行before拦截器失败:\n{instance.__class__},捕获异常:\n{traceback.format_exc()}")
return str(e)
@app.after_request
def after_request(resp: Response):
for instance in after_list:
try:
resp = instance.handle(resp)
except Exception:
logging.error(f"执行after拦截器失败:\n{instance.__class__},捕获异常:\n{traceback.format_exc()}")
return resp
def _set_logging(log_config_path):
# 查找log文件夹,如果不存在则自动创建
ThePath.tmp('log')
with open(log_config_path) as f:
config = yaml.load(f, Loader=yaml.FullLoader)
config['handlers']['timed_rotating_file']['filename'] = os.path.join(ThePath.tmp('log'), 'api.log')
logging.config.dictConfig(config)
def _init(env=None):
config_loader = ConfigLoader(env)
config_loader.load()
_set_logging(config_loader.data.get('log'))
app = Flask(__name__)
app.config.from_pyfile(config_loader.data.get('flask'))
_init_interceptor(app)
RegisterRouter().register_router(app)
CORS(app, supports_credentials=True)
sql_alchemy = SQLAlchemy(app)
app.config['_SQLALCHEMY_INSTANCE'] = sql_alchemy
print('env:', config_loader.env)
return app
create_app = _init
def startup(env):
app = _init(env)
print(app.url_map)
app.run(host=app.config['HOST'], port=app.config['PORT'])
if __name__ == '__main__':
startup('dev')
from framework.interface.abstract_view_model import AbstractViewModel
class Add_Test_Key_Model(AbstractViewModel):
"""
向返回值中加一个名为test的key
"""
def process(self, raw_result):
if not isinstance(raw_result, dict):
raise Exception('返回值类型不为字典!')
raw_result['test'] = '该返回值由Add_Test_Key_Model生成'
return raw_result
import os
from runserver import create_app
# app = create_app('prod') if os.getenv('ENV') != 'pre' and os.getenv('ENV') != None else create_app('pre')
app = create_app(os.getenv('ENV')) # 目前有:正式环境 prod;预发环境 pre;测试环境 test
# @Time : 2022-03-03 17:00:00
# @Author : Sun Wen Quan
# @Owner : YSRD (Insights Value)
# content of conftest.py
import pytest
from framework.util.sql_db import SqlDb
from runserver import create_app
@pytest.fixture
def client():
app = create_app('test')
with app.test_client() as client:
with app.app_context():
db = SqlDb()
yield client
# @Time : 2022-03-31 11:00:00
# @Author : Wang Fei hong
# @Owner : YSRD (Insights Value)
from runserver import create_app
class TestApiFrameWorkApi:
def test_hander_get(self, client):
response = client.get('/demo/api-frame-work-api?json={"id":"1","name":"test"}')
assert response.json['code'] == 2000
assert response.json['result']['params'] == {"id":"1","name":"test"}
def test_hander_post(self, client):
response = client.post('/demo/api-frame-work-api')
assert response.status_code == 200
assert response.json['code'] == 2000
assert response.json['msg'] == '访问成功'
def test_missing_required_parameter(self, client):
response = client.get('/demo/api-frame-work-api')
assert response.json['code'] == 3009
def test_invalid_parameter(self, client):
response = client.get('/demo/api-frame-work-api', data={'project_name': "hello"})
assert response.json['code'] == 3009
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Sun Wen Quan
# @Owner : YSRD (Insights Value)
import requests
import datetime
class TestHelloSqldb:
def test_hello_db(self, client):
response = client.get('/demo/hello-sqldb')
assert response.json['code'] == 2000
def test_hello_db_insert(self, client):
data = {"id": 123, "name": "test1", "created_at": ["testtttt", "2022-03-23 17:57:23"],
"updated_at": "2022-03-23 17:57:23", "op": "add"}
response = client.post('/demo/hello-sqldb', data=data)
assert response.json['code'] == 2000
assert response.json['result']['result'] == True
def test_hello_db_update(self, client):
data = {"id": 123, "name": "t1", "op": "update"}
response = client.post('/demo/hello-sqldb', data=data)
assert response.json['code'] == 2000
assert response.json['result']['result'] == True
def test_hello_db_delete(self, client):
data = {"id": 123, "op": "delete"}
response = client.post('/demo/hello-sqldb', data=data)
assert response.json['code'] == 2000
assert response.json['result']['result'] == True
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Sun Wen Quan
# @Owner : YSRD (Insights Value)
class TestHelloWorld:
def test_hander_get(self, client):
response = client.get('/demo/hello-world?user_id=1&user_name=&user_email=87909@qq.com&user_age=1&user_age=1')
assert response.json['code'] == 2000
assert response.json['result']['params'].get('user_id') == 1
def test_hander_post(self, client):
response = client.post('/demo/hello-world')
assert response.status_code == 200
assert response.json['code'] == 2000
assert response.json['msg'] == '访问成功'
def test_missing_required_parameter(self, client):
response = client.get('/demo/hello-world')
assert response.json['code'] == 3009
def test_invalid_parameter(self, client):
response = client.get('/demo/hello-world', data={'project_name': "hello"})
assert response.json['code'] == 3009
\ No newline at end of file
# @Time : 2022-03-03 17:00:00
# @Author : Sun Wen Quan
# @Owner : YSRD (Insights Value)
import os
from framework.the_path import ThePath
class TestUploadTmpFile:
def test_upload_tmp_file(self, client):
filename = os.path.join(ThePath.test(), 'data/test_upload_file.txt')
data = {'file': open(filename, 'rb')}
response = client.post('/demo/upload-tmp-file', data=data)
assert response.json.get('code') == 2000
tmp_file_path = response.json.get('result')['file_result'][0]['tmp_file_path']
assert os.path.isfile(tmp_file_path)
def test_upload_tmp_file_clean(self, client):
filename = os.path.join(ThePath.test(), 'data/test_upload_file.txt')
print(filename)
data = {'file': open(filename, 'rb')}
response = client.post('/demo/upload-tmp-file-clean', data=data)
print(response.data)
assert response.json.get('code') == 2000
try:
tmp_file_path = response.json.get('result')['file_result'][0]['tmp_file_path']
assert not os.path.isfile(tmp_file_path), '文件没有被自动删除'
except IndexError:
assert False, '上传文件失败!'
except KeyError:
assert False, '响应数据数据不规范!'
else:
assert 'tmp/post_files' in tmp_file_path, '响应信息中tmp_file_path的格式有误!'
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from framework.bootstrap.config_loader import ConfigLoader
class TestConfigLoader:
@staticmethod
def log_config_verification(env):
config_loader = ConfigLoader(env)
config_loader.load()
return f'config/default/log.yml' in config_loader.data['log'].replace('\\', '/')
@staticmethod
def flask_config_verification(env):
config_loader = ConfigLoader(env)
config_loader.load()
return f'api/{env}/flask.py' in config_loader.data['flask'].replace('\\', '/')
def test_config_by_args(self):
assert self.flask_config_verification('prod')
assert self.flask_config_verification('test')
assert self.flask_config_verification('dev')
assert self.log_config_verification('prod')
assert self.log_config_verification('test')
assert self.log_config_verification('dev')
def test_config_by_env(self):
"""保证部署的时候使用的默认环境是生产环境"""
config_loader = ConfigLoader()
config_loader.load()
assert f'api/prod/flask.py' in config_loader.data['flask'].replace('\\', '/')
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from framework.constant.resp_code import RespCode
class TestRespCode:
def test1(self):
assert RespCode.OK.value == 2000
assert RespCode.OK.desc() == '访问成功'
assert RespCode.INVALID_JSON_FORMAT.desc() == '错误的JSON格式'
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
from framework.interceptor.clear_interceptor import ClearInterceptor
from framework.interceptor.interceptor_loader import InterceptorLoader
from framework.interceptor.param_interceptor import ParamInterceptor
class TestInterceptorLoader:
def test_load_interceptors(self):
before_list, after_list = InterceptorLoader.load_interceptors()
before_class_list = (obj.__class__ for obj in before_list)
after_class_list = (obj.__class__ for obj in after_list)
assert ParamInterceptor in before_class_list
assert ClearInterceptor in after_class_list
# @Time : 2022-03-03 17:00:00
# @Author : Gavin Jiang
# @Owner : YSRD (Insights Value)
import os
import pytest
from framework.the_path import ThePath
@pytest.fixture(scope='module')
def project_session_start():
base_path = os.path.dirname(os.path.abspath(__file__))
print('base_path is :', base_path)
class TestThePath:
def test_root(self):
root_path = ThePath.root()
children = os.listdir(root_path)
assert {'bin', 'config', 'src', 'test'}.issubset(children)
def test_src(self):
src_path = ThePath.src()
children = os.listdir(src_path)
assert {'app', 'component', 'framework'}.issubset(children)
def test_test(self):
test_path = ThePath.test()
children = os.listdir(test_path)
assert {'data', 'conftest.py', 'test_create_app.py','__init__.py'}.issubset(children)
# @Time : 2022-03-03 17:00:00
# @Author : Sun Wen Quan
# @Owner : YSRD (Insights Value)
def test_no_error():
try:
from runserver import create_app
app = create_app
msg = 'ok'
except ValueError as e:
msg = e
assert msg == 'ok'
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment