折腾:
【未解决】用蓝图和工厂模式去优化现有Flask项目代码结构
期间,需要把之前的直接都写在app.py中的Celery的初始化:
celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)
#----------------------------------------
# Celery tasks
#----------------------------------------
# @celeryApp.task()
@celeryApp.task
# @celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
"""
delete tmp audio file from filename
eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
"""
global log
log.info("deleteTmpAudioFile: filename=%s", filename)
audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
# audioTmpFolder = "tmp/audio"
log.info("audioTmpFolder=%s", audioTmpFolder)
curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
log.info("curFolderAbsPath=%s", curFolderAbsPath)
audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
#'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
if os.path.isfile(tempAudioFullname):
os.remove(tempAudioFullname)
log.info("Ok to delete file %s", tempAudioFullname)
else:
log.warning("No need to remove for not exist file %s", tempAudioFullname)
# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celeryApp.tasks=%s", celeryApp.tasks)
@celeryApp.task
def celeryRefreshAzureSpeechToken():
"""celery's task: refreshAzureSpeechToken"""
refreshAzureSpeechToken()
@celeryApp.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
log.info("celerySetupPeriodicTasks: sender=%s", sender)
sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
celeryRefreshAzureSpeechToken.s(),
name="refresh ms Azure token every less than 10 minutes")
其中,celery还调用了其他一些函数,比如上面的refreshAzureSpeechToken
也要想办法,如何更好的合并进去
flask celery factory
Celery and the Flask Application Factory Pattern – miguelgrinberg.com
from celery import Celery from config import config, Config celery = Celery(__name__, broker=Config.CELERY_BROKER_URL) def create_app(config_name): # ... celery.conf.update(app.config) # ... return app
还需要导读导入config,感觉也不是很完美
而且此处的__name__,估计不一定是到时候的app.name
在Celery中使用Flask的上下文
目前写成:
resources/celery_task.py
import os
from app import app, celery, log
from resources.tts import refreshAzureSpeechToken
#----------------------------------------
# Celery tasks
#----------------------------------------
# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
"""
delete tmp audio file from filename
eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
"""
log.info("deleteTmpAudioFile: filename=%s", filename)
audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
# audioTmpFolder = "tmp/audio"
log.info("audioTmpFolder=%s", audioTmpFolder)
curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
log.info("curFolderAbsPath=%s", curFolderAbsPath)
audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
#'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
if os.path.isfile(tempAudioFullname):
os.remove(tempAudioFullname)
log.info("Ok to delete file %s", tempAudioFullname)
else:
log.warning("No need to remove for not exist file %s", tempAudioFullname)
@celery.task
def celeryRefreshAzureSpeechToken():
"""celery's task: refreshAzureSpeechToken"""
refreshAzureSpeechToken()
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
log.info("celerySetupPeriodicTasks: sender=%s", sender)
sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
celeryRefreshAzureSpeechToken.s(),
name="refresh ms Azure token every less than 10 minutes")
# log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
# log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
# log.info("celery.tasks=%s", celery.tasks)
app.py
from factory import celery
app = create_app(settings)
log.debug("celery=%s", celery)factory.py
from celery import Celery
celery = Celery()
print("celery=%" % celery)
def create_app(config_object):
app = Flask(__name__)
CORS(app)
# app.config.from_object('config.DevelopmentConfig')
# # app.config.from_object('config.ProductionConfig')
app.config.from_object(config_object)
create_extensions(app)
def create_extensions(app):
global log, mongo, fsCollection, api, celery
log = create_log(app)
...
celery = create_celery(app)
log.info("celery=%s", celery)
return app待后续调试,看看是否正常运行。
【总结】
此处,目前对于Celery改为工厂模式初始化,代码是:
import os
# from app import celery
# from app import app, log
# from flask import current_app as app
# from factory import celery
from flask import g
from resources.tts import refreshAzureSpeechToken
app = g.app
log = g.log
celery = g.celery
#----------------------------------------
# Celery tasks
#----------------------------------------
# @celery.task()
@celery.task
# @celery.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
"""
delete tmp audio file from filename
eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
"""
# log = app.logger
with celery.app.app_context():
log.info("deleteTmpAudioFile: filename=%s", filename)
audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
# audioTmpFolder = "tmp/audio"
...
@celery.task
def celeryRefreshAzureSpeechToken():
"""celery's task: refreshAzureSpeechToken"""
with celery.app.app_context():
refreshAzureSpeechToken()
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
with celery.app.app_context():
# log = app.logger
log.info("celerySetupPeriodicTasks: sender=%s", sender)
sender.add_periodic_task(app.config["CELERY_REFRESH_MS_TOKEN_INTERVAL"],
celeryRefreshAzureSpeechToken.s(),
name="refresh ms Azure token every less than 10 minutes")而app.py是:
from conf.app import settings from factory import create_app app = create_app(settings) # register_extensions(app) log = app.logger if __name__ == "__main__": app.run( host=app.config["FLASK_HOST"], port=app.config["FLASK_PORT"], debug=app.config["DEBUG"], use_reloader=False )
factory.py
import os
from flask import Flask
...
from celery import Celery
from flask import g
################################################################################
# Global Function
################################################################################
def create_app(config_object):
# global log
# app = Flask(__name__) #<Flask 'factory'>
app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
CORS(app)
# app.config.from_object('config.DevelopmentConfig')
# # app.config.from_object('config.ProductionConfig')
app.config.from_object(config_object)
with app.app_context():
g.app = app
log = create_log(app)
g.log = log
...
register_extensions(app)
return app
def register_extensions(app):
# global log, mongo, fsCollection, api, celery
log = g.log
...
celery = create_celery(app)
g.celery = celery
log.info("celery=%s", celery)
...
return app
...
def create_celery(app):
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
...目前,至少对于Flask的app的初始化是可以的,可以运行的。不会出现循环导入和全局变量的问题。
【后记】
后来去真正本地调试运行时,尤其是本地运行celery的work时,出错了:
【已解决】Flask的Celery改为工厂模式后本地调试worker出错:RuntimeError: Working outside of application context
所以之前的写法,还是不行。
最终是:
想办法解决了循环导入,以及全局变量的问题,期间用了很多折中的做法:
先说celery的库:
抛弃了之前的Flask-Celery-Helper,只用最原始的Celery
# from flask_celery import Celery from celery import Celery
-》因为其中会让我们在配置中去加上CELERY_IMPORTS:
# for Flask-Celery-Helper
# CELERY_IMPORTS = ('tasks.deleteTmpAudioFile', 'tasks.celeryRefreshAzureSpeechToken', )
# CELERY_IMPORTS = ('resources.tasks.deleteTmpAudioFile', 'resources.tasks.celeryRefreshAzureSpeechToken', )-》会导致找不到task
-》所以果断放弃。
再说每个文件的代码和其中的改动:
文件结构是:
➜ xxxRobotDemoServer git:(master) ✗ tree . ... ├── app.py ... ├── conf │ ├── __init__.py │ ├── __pycache__ │ │ └── __init__.cpython-36.pyc │ ├── app │ │ ├── __init__.py │ │ ├── __pycache__ │ │ │ ├── __init__.cpython-36.pyc │ │ │ └── settings.cpython-36.pyc │ │ ├── development │ │ │ └── __init__.py │ │ ├── production │ │ │ └── __init__.py │ │ └── settings.py ... ├── factory.py ├── resources │ ├── __init__.py │ ├── asr.py │ ├── extensions_celery.py │ ├── files.py │ ├── qa.py │ ├── tasks.py │ └── tts.py ...
详细解释每个文件:
(1)resources/extensions_celery.py
celery的初始化中:只是初始化celery,而不去做其他事情
# from flask_celery import Celery
from conf.app import settings
from celery import Celery
from celery.utils.log import get_task_logger
# celery = Celery()
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
celery_logger = get_task_logger(__name__)
print("in extensions_celery: celery=%s" % celery)(2)resources/tasks.py
背景是:
之前celery中依赖app和log
-》那是因为task中:
- 配置用到app.config[“xxx”] -》 后来改为settings.xxx了-》单独引入settings不会产生循环导入
- log是task中需要打印输出 -》 希望尽量和flask的app的log用同一个log
这样会导致:
A:否则始终无法很好的处理,如何再去得到app
因为,即使最开始运行app时,可以方便在的得到app后,但是celery work -A时,还是会无法得到app
而如果再去调用create_app时,就又会导致app初始化时,引入flask-restful时,引用到别的模块,比如:
resources/tts.py
而其中由于用到celery的异步task:deleteTmpAudioFile,所以要导入
from resources.tasks import deleteTmpAudioFile
从而会触发tasks中,多了一次app的初始化
-》变成了:
Flask的app的初始化运行,初始化了2次,这是无法接受的
B:无法方便的获取(app的)log
最后的折中的解决办法是:
celery的task中:去除之前依赖的flask的app和log
-》
- 把app.config[“xxx”] 改为settings.xxx了
- -》而单独引入settings不会产生循环导入
- 用另外的办法去获得log
- 此处用的是从resources/extensions_celery.py获取celery的logger
完整代码:
import os
# from resources.extensions_celery import celery
# print("celery=%s" % celery)
# print("celery.app=%s" % celery.app)
# print("celery.app.log=%s" % celery.app.log)
# print("celery.log=%s" % celery.log)
from conf.app import settings
# try:
# from flask import g
# print("tasks: import flask g ok")
# print("g=%s" % g)
# log = g.log
# print("log=%s" % log)
# except RuntimeError as err:
# # except:
# print("tasks: failed to import flask g, err=%s" % err)
# from factory import create_app
# print("tasks: import create_app ok")
# app = create_app(settings)
# print("create_app ok, app=%s" % app)
# log = app.logger
# print("log=%s" % log)
# import logging
# log = logging.getLogger(settings.FLASK_APP_NAME)
# log.info("test logging getLogger from flask app, work?")
# print("-----before: from factory import create_celery_app")
# from factory import create_celery_app
# from factory import create_log, create_app
# print("from factory import create_celery_app ok")
# celery = create_celery_app()
# app = create_app(settings)
# celery = create_celery_app(app)
# log = app.logger
from resources.extensions_celery import celery, celery_logger as log
print("create_celery_app return: celery=%s, log=%s" % (celery, log))
#----------------------------------------
# Celery tasks
#----------------------------------------
# @celery.task()
@celery.task
# @celery.task(name=settings.CELERY_TASK_NAME + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
"""
delete tmp audio file from filename
eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
"""
# print("deleteTmpAudioFile: celery=%s, filename=%s" % (celery, filename))
# log = app.logger
# with celery.app.app_context():
# log = celery.app.logger
# app = celery.app
# log = celery.log
# print("celery.log=%s" % celery.log)
# print("log=%s" % log)
log.info("deleteTmpAudioFile: celery=%s, filename=%s", celery, filename)
audioTmpFolder = settings.AUDIO_TEMP_FOLDER
# audioTmpFolder = "tmp/audio"
log.info("audioTmpFolder=%s", audioTmpFolder)
curFolderAbsPath = os.getcwd() #'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server'
log.info("curFolderAbsPath=%s", curFolderAbsPath)
audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
#'/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
if os.path.isfile(tempAudioFullname):
os.remove(tempAudioFullname)
log.info("Ok to delete file %s", tempAudioFullname)
else:
log.warning("No need to remove for not exist file %s", tempAudioFullname)
@celery.task
def celeryRefreshAzureSpeechToken():
"""celery's task: refreshAzureSpeechToken"""
log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)
# with celery.app.app_context():
from resources.tts import refreshAzureSpeechToken
refreshAzureSpeechToken()
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
log.info("celerySetupPeriodicTasks: celery=%s, sender=%s" % (celery, sender))
# with celery.app.app_context():
# log = app.logger
# log = celery.app.logger
# app = celery.app
# log = celery.log
# # print("celery.log=%s" % celery.log)
# # print("log=%s" % log)
# print("sender=%s" % sender)
# log.info("celerySetupPeriodicTasks: sender=%s", sender)
# print("celerySetupPeriodicTasks: log is usable")
sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
celeryRefreshAzureSpeechToken.s(),
name="refresh ms Azure token every less than 10 minutes")(3)factory.py
而factory中没有全局的变量celery(和其他全局变量)
celery的初始化,都放在create_celery_app
并且create_celery_app中,在最开始app初始化时是传入app的
而在celery work -A时,则只是从resources/tasks.py得到celery,而不会再去新建flask的app
import os
from flask import Flask
import logging
from logging.handlers import RotatingFileHandler
# from flask_pymongo import PyMongo
from gridfs import GridFS
from pymongo import MongoClient
from flask_restful import Api
from flask_cors import CORS
from conf.app import settings
from celery import Celery
# from flask_celery import Celery
# from resources.extensions_celery import celery
from flask import g
################################################################################
# Global Variables
################################################################################
# # log = logging.getLogger() #<RootLogger root (WARNING)>
# log = None
# print("log=%s" % log)
#
# # mongo = MongoClient() # MongoClient(host=['localhost:27017'], document_class=dict, tz_aware=False, connect=True)
# mongo = None
# print("mongo=%s" % mongo)
# fsCollection = None #None
# print("fsCollection=%s" % fsCollection)
#
# celery = Celery() #<Celery __main__ at 0x1068d8b38>
# celery = None
# print("celery=%s" % celery)
################################################################################
# Global Function
################################################################################
def create_app(config_object, init_extensions=True):
# global log
# app = Flask(__name__) #<Flask 'factory'>
app = Flask(config_object.FLASK_APP_NAME) #<Flask 'RobotQA'>
CORS(app)
# app.config.from_object('config.DevelopmentConfig')
# # app.config.from_object('config.ProductionConfig')
app.config.from_object(config_object)
with app.app_context():
g.app = app
log = create_log(app)
g.log = log
log.debug("after load from object: app.config=%s", app.config)
log.debug('app.config["DEBUG"]=%s, app.config["MONGODB_HOST"]=%s, app.config["FILE_URL_HOST"]=%s',
app.config["DEBUG"], app.config["MONGODB_HOST"], app.config["FILE_URL_HOST"])
if init_extensions:
register_extensions(app)
log.info("flask app extensions init completed")
return app
def register_extensions(app):
# global log, mongo, fsCollection, api, celery
log = g.log
mongo = create_mongo(app)
g.mongo = mongo
log.info("mongo=%s", mongo)
mongoServerInfo = mongo.server_info()
log.debug("mongoServerInfo=%s", mongoServerInfo)
fsCollection = create_gridfs_fs_collection(mongo)
g.fsCollection = fsCollection
log.info("fsCollection=%s", fsCollection)
celery = create_celery_app(app)
g.celery = celery
log.info("celery=%s", celery)
# api = Api(app)
api = create_rest_api(app)
log.debug("api=%s", api)
g.api = api
return app
def create_rest_api(app):
from resources.qa import RobotQaAPI
from resources.asr import RobotAsrAPI
from resources.files import GridfsAPI, TmpAudioAPI
rest_api = Api()
rest_api.add_resource(RobotQaAPI, '/qa', endpoint='qa')
rest_api.add_resource(RobotAsrAPI, '/asr/language/<string:language>', endpoint='asr')
rest_api.add_resource(GridfsAPI, '/files/<fileId>', '/files/<fileId>/<fileName>', endpoint='gridfs')
rest_api.add_resource(TmpAudioAPI, '/tmp/audio/<filename>', endpoint='TmpAudio')
rest_api.init_app(app)
return rest_api
def create_log(app):
print("create_log: before init log: app.logger=%s" % app.logger)
logFormatterStr = app.config["LOG_FORMAT"]
logFormatter = logging.Formatter(logFormatterStr)
fileHandler = RotatingFileHandler(
app.config['LOG_FILE_FILENAME'],
maxBytes=app.config["LOG_FILE_MAX_BYTES"],
backupCount=app.config["LOG_FILE_BACKUP_COUNT"],
encoding="UTF-8")
fileHandler.setLevel(logging.DEBUG)
fileHandler.setFormatter(logFormatter)
app.logger.addHandler(fileHandler)
# Note: should NOT set StreamHandler here, otherwise will duplicate debug log
app.logger.setLevel(logging.DEBUG) # set root log level
log = app.logger
log.info("app=%s", app)
# log.debug("app.config=%s", app.config)
print("create_log: after init log: app.logger=%s" % app.logger)
return log
def create_mongo(app):
# mongo_client = MongoClient(
# host=app.config["MONGODB_HOST"],
# port=app.config["MONGODB_PORT"],
# username=app.config["MONGODB_USERNAME"],
# password=app.config["MONGODB_PASSWORD"],
# authSource=app.config["MONGODB_AUTH_SOURCE"]
# )
if settings.MONGODB_AUTH_SOURCE:
mongo_client = MongoClient(
host=settings.MONGODB_HOST,
port=int(settings.MONGODB_PORT),
username=settings.MONGODB_USERNAME,
password=settings.MONGODB_PASSWORD,
authSource=settings.MONGODB_AUTH_SOURCE
)
elif settings.MONGODB_USERNAME and settings.MONGODB_PASSWORD:
mongo_client = MongoClient(
host=settings.MONGODB_HOST,
port=int(settings.MONGODB_PORT),
username=settings.MONGODB_USERNAME,
password=settings.MONGODB_PASSWORD,
)
elif settings.MONGODB_PORT:
mongo_client = MongoClient(
host=settings.MONGODB_HOST,
port=int(settings.MONGODB_PORT),
)
elif settings.MONGODB_HOST:
mongo_client = MongoClient(
host=settings.MONGODB_HOST,
)
else:
mongo_client = MongoClient()
return mongo_client
def create_gridfs_fs_collection(mongo_db):
# Pure PyMongo
gridfs_db = mongo_db.gridfs # Database(MongoClient(host=['xxx:32018'], document_class=dict, tz_aware=False, connect=True, authsource='gridfs'), 'gridfs')
gridfs_fs_collection = GridFS(gridfs_db) # <gridfs.GridFS object at 0x1107b2390>
return gridfs_fs_collection
def create_celery_app(app=None):
print("create_celery_app: app=%s" % app)
app = app or create_app(settings, init_extensions=False)
app_import_name = app.import_name
# app_name = app.name
# celery_app_name = app_name
celery_app_name = app_import_name
celery = Celery(celery_app_name, broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
# celery.log = app.logger
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
# g.log.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
app.logger.info("in celery ContextTask __call__: args=%s, kwargs=%s", args, kwargs)
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
# celery.init_app(app)
print("init celery ok")
return celery(4)app.py
app中只是调用create_app
而不去初始化celery了
import os
from conf.app import settings
from factory import create_app
# from factory import log
# from factory import create_celery_app
################################################################################
# Global Definitions
################################################################################
################################################################################
# Global Variables
################################################################################
################################################################################
# Global Function
################################################################################
################################################################################
# Global Init App
################################################################################
print("in flask app: settings=%s" % (settings))
app = create_app(settings)
app.app_context().push()
# register_extensions(app)
log = app.logger
log.debug("app=%s", app)
log.debug("log=%s", log)
log.debug("settings.FLASK_ENV=%s", settings.FLASK_ENV)
log.debug("settings.DEBUG=%s, settings.MONGODB_HOST=%s, settings.FILE_URL_HOST=%s",
settings.DEBUG, settings.MONGODB_HOST, settings.FILE_URL_HOST)
# celery = None
# with app.app_context():
# celery = create_celery_app(app)
# print("celery=%s" % celery)
if __name__ == "__main__":
app.run(
host=settings.FLASK_HOST,
port=settings.FLASK_PORT,
debug=settings.DEBUG,
use_reloader=False
)(5)其他模块用flask的g获取要的全局变量
其他模块中,为了获取最初app初始化后的app,log,mongo
通过factory初始化时加上with app.app_context(),
以及其他每个模块中用from flask import g
再去用log=g.log, app=g.app等去获取自己要的值
resources/files.py
from flask import g
from resources.tts import gTempAudioFolder
log = g.log
mongo = g.mongo
fsCollection = g.fsCollection
class GridfsAPI(Resource):
def get(self, fileId, fileName=None):
# log = app.logger
log.info("fileId=%s, file_name=%s", fileId, fileName)
...resources/qa.py
from flask import g app = g.app log = g.log fsCollection = g.fsCollection
resources/tts.py
from flask import g app = g.app log = g.log
这样每个模块中,都可以用上全局的log,全局的app了。
然后PyCharm去debug初始化时,就正常了:

接着再去Mac的终端中去运行celery的worker
celery worker -A resources.tasks.celery --loglevel=DEBUG
然后就可以有正常输出了:
➜ xxxRobotDemoServer git:(master) ✗ celery worker -A resources.tasks.celery --loglevel=DEBUG
cur_flask_environ=None
FLASK_ENV=development
cur_dir=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app
env_folder=development
dotenv_path=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/conf/app/development/.env
dotenv_load_ok=True
After load .env: DEBUG=True, MONGODB_HOST=xxx, FILE_URL_HOST=127.0.0.1
in extensions_celery: celery=<Celery RobotQA at 0x104277e80>
create_celery_app return: celery=<Celery RobotQA at 0x104277e80>, log=<Logger resources.extensions_celery (WARNING)>
[2018-08-26 11:11:45,796: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-08-26 11:11:45,801: DEBUG/MainProcess] | Worker: Building graph...
[2018-08-26 11:11:45,802: DEBUG/MainProcess] | Worker: New boot order: {Timer, Hub, Pool, Autoscaler, Beat, StateDB, Consumer}
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-08-26 11:11:45,831: DEBUG/MainProcess] | Consumer: Building graph...
[2018-08-26 11:11:45,876: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Heart, Mingle, Tasks, Control, Gossip, Agent, event loop}
celery@licrifandeMacBook-Pro.local
v4.2.1 (windowlicker)
Darwin-17.7.0-x86_64-i386-64bit-PE 2018-08-26 11:11:45
[config]
.> app: RobotQA:0x104277e80
.> transport:
redis://localhost:6379/0
.> results: disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
. resources.tasks.celeryRefreshAzureSpeechToken
. resources.tasks.deleteTmpAudioFile
[2018-08-26 11:11:45,921: DEBUG/MainProcess] | Worker: Starting Hub
[2018-08-26 11:11:45,922: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:45,922: DEBUG/MainProcess] | Worker: Starting Pool
[2018-08-26 11:11:46,059: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,060: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-08-26 11:11:46,061: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-08-26 11:11:46,098: INFO/MainProcess] Connected to
redis://localhost:6379/0
[2018-08-26 11:11:46,098: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,098: DEBUG/MainProcess] | Consumer: Starting Events
[2018-08-26 11:11:46,114: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,114: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-08-26 11:11:46,117: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:46,119: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-08-26 11:11:46,119: INFO/MainProcess] mingle: searching for neighbors
[2018-08-26 11:11:47,155: INFO/MainProcess] mingle: all alone
[2018-08-26 11:11:47,156: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,156: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-08-26 11:11:47,161: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,161: DEBUG/MainProcess] | Consumer: Starting Control
[2018-08-26 11:11:47,167: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,167: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-08-26 11:11:47,172: DEBUG/MainProcess] ^-- substep ok
[2018-08-26 11:11:47,172: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-08-26 11:11:47,173: DEBUG/MainProcess] | Worker: Hub.register Pool...
[2018-08-26 11:11:47,173: INFO/MainProcess]
celery@licrifandeMacBook-Pro.local
ready.
[2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16

其中的task是:
[tasks] . celery.accumulate . celery.backend_cleanup . celery.chain . celery.chord . celery.chord_unlock . celery.chunks . celery.group . celery.map . celery.starmap . resources.tasks.celeryRefreshAzureSpeechToken . resources.tasks.deleteTmpAudioFile
后续和Flask中的task是一致的,那Flask就可以互相识别了,就可以调用task了。
后续调用时task就可以正常运行了:
2018-08-26 11:11:47,174: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:28,913: INFO/MainProcess] Received task: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] ETA:[2018-08-26 03:14:38.521681+00:00]
[2018-08-26 11:14:28,915: DEBUG/MainProcess] basic.qos: prefetch_count->17
[2018-08-26 11:14:38,527: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x10433c840> (args:('resources.tasks.deleteTmpAudioFile', '6ea6c103-7b58-4d38-ad54-06666b0ebb59', {'lang': 'py', 'task': 'resources.tasks.deleteTmpAudioFile', 'id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'shadow': None, 'eta': '2018-08-26T03:14:38.521681+00:00', 'expires': None, 'group': None, 'retries': 0, 'timelimit': [None, None], 'root_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'parent_id': None, 'argsrepr': "['42667515-281a-4cab-bfac-5d2c23c25a34.mp3']", 'kwargsrepr': '{}', 'origin': '
gen47559@licrifandeMacBook-Pro.local
', 'reply_to': '10c57fe8-23c6-3e20-bea6-0576fa07fe57', 'correlation_id': '6ea6c103-7b58-4d38-ad54-06666b0ebb59', 'delivery_info': {'exchange': '', 'routing_key': 'celery', 'priority': 0, 'redelivered': None}}, b'[["42667515-281a-4cab-bfac-5d2c23c25a34.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]', 'application/json', 'utf-8') kwargs:{})
[2018-08-26 11:14:38,564: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-08-26 11:14:38,626: DEBUG/MainProcess] Task accepted: resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] pid:47617
[2018-08-26 11:14:38,628: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: deleteTmpAudioFile: celery=<Celery RobotQA at 0x104277e80>, filename=42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,632: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolder=tmp/audio
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: curFolderAbsPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer
[2018-08-26 11:14:38,634: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio
[2018-08-26 11:14:38,635: INFO/ForkPoolWorker-2] resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59]: Ok to delete file /Users/crifan/dev/dev_root/company/xxx/projects/robotDemo/server/xxxRobotDemoServer/tmp/audio/42667515-281a-4cab-bfac-5d2c23c25a34.mp3
[2018-08-26 11:14:38,638: INFO/ForkPoolWorker-2] Task resources.tasks.deleteTmpAudioFile[6ea6c103-7b58-4d38-ad54-06666b0ebb59] succeeded in 0.012065329006873071s: None
加上celery beat的输出,经过调试后,确保celery的task中,不要调用到别的模块的函数后才正常。
否则celery的task一旦调用别的模块中的东西,则别的模块中的flask的g,就会报错:
raise RuntimeError(_app_ctx_err_msg)
RuntimeError: Working outside of application context.
经过调整后的代码:
resources/tasks.py
@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken():
"""celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
log.info("celeryRefreshAzureSpeechToken: celery=%s" % celery)
# with celery.app.app_context():
# from resources.tts import refreshAzureSpeechToken
# refreshAzureSpeechToken()
# global gMsToken
# log = app.logger
# log.info("refreshAzureSpeechToken: gMsToken=%s", gMsToken)
# log.info("refreshAzureSpeechToken")
getMsTokenUrl = settings.MS_GET_TOKEN_URL
reqHeaders = {
"Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
}
log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
resp = requests.post(getMsTokenUrl, headers=reqHeaders)
log.info("resp=%s", resp)
respTokenText = resp.text # eyxxxxiJ9.xxx.xxx
log.info("respTokenText=%s", respTokenText)
# gMsToken = respTokenText
updatedToken = respTokenText
# # for debug
# gMsToken = "eyJ0eXAiOxxxxnez"
# log.info("after refresh: gMsToken=%s", gMsToken)
return updatedToken避免了之前的:
from resources.tts import refreshAzureSpeechToken refreshAzureSpeechToken()
的报错:
ImportError: cannot import name ‘refreshAzureSpeechToken’
而
resources/tts.py
中的,对于:
from common.util import generateUUID
必要要加上导入sys的path:
import sys resouces_dir = os.path.dirname(__file__) project_root_dir = os.path.abspath(os.path.join(resouces_dir, "..")) if project_root_dir not in sys.path: sys.path.append(project_root_dir)
否则会出现无法导入
以及tts内的token的代码也去更新了:
from resources.tasks import refreshAzureSpeechToken
def getAzureSpeechToken():
"""get Microsoft Azure speech service token key"""
# log = app.logger
global gMsToken
gMsToken = refreshAzureSpeechToken()
...
else:
# if errNo == BAIDU_ERR_TOKEN_INVALID:
if errNo == settings.MS_ERR_UNAUTHORIZED:
log.warning("Token invalid -> retry one for refresh token")
# refreshBaiduToken()
gMsToken = refreshAzureSpeechToken()
# isOk, audioBinData, errNo, errMsg = baiduText2Audio(unicodeText)
isOk, audioBinData, errNo, errMsg = msTTS(unicodeText)心得:
其实,此处能成功把Celery转换为工厂模式,核心的几个点是:
基本上明白了:
Celery的初始化,不一定非要是传入Flask的app
其实别人的,类似于
# extensions.py
from flask_celery import Celery
celery = Celery()
# application.py
from flask import Flask
from extensions import celery
def create_app():
app = Flask(__name__)
app.config['CELERY_IMPORTS'] = ('tasks.add_together', )
app.config['CELERY_BROKER_URL'] = 'redis://localhost'
app.config['CELERY_RESULT_BACKEND'] = 'redis://localhost'
celery.init_app(app)
return app或:
from celery import Celery from flask import Flask from app.config import BaseConfig celery = Celery(__name__, broker=BaseConfig.CELERY_BROKER_URL) def create_app(): app = Flask(__name__) # .... celery.conf.update(app.config) # 更新 celery 的配置 # ... return app
核心在于:
最开始初始化Celery的话,一定要加上broker参数,指定了
之后再去在create_app中初始化app后,再去更新配置:
celery.conf.update(app.config)
或
celery.init_app(app)
而Celery其实本身和Flask的app,并没有什么本质的关联。
主要是flask中的配置参数:
conf/app/settings.py
CELERY_BROKER_URL = "redis://localhost:6379/0" # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result # for periodical celery task CELERY_TIMEZONE = "Asia/Shanghai" CELERY_ENABLE_UTC = True
起到了效果。
所以此处改为:
celery初始化,从settings传入celery相关的配置:
resources/extensions_celery.py
celery = Celery(settings.FLASK_APP_NAME, broker=settings.CELERY_BROKER_URL)
就可以了。
然后别的地方,去导入,引用celery的时候
-》感觉会重新触发Celery的初始化,和(一般做法中会有的)Flask的app中的初始化中的flask,会不相同
-》其实是一样的
-》主要就是上面的Celery,传入了name和broker是一致的,就可以了
-》celery work -A 中就可以了
celery worker -A resources.tasks.celery --loglevel=DEBUG
调用task中的celery,就不会有什么,非要和Flask中的app关联在一起了。
其次是,celery的task中,不能依赖导入别的模块
-》否则celery的work或beat时,再去导入别的模块时,就会出现各种问题
比如模块路径问题导致无法导入
比如flask的g无法正常识别
等等。
最终的上述的celery的工厂模式的代码,去运行了flask的app后,再分别去运行worker和beat:
pipenv shell celery worker -A resources.tasks.celery --loglevel=DEBUG
和:
pipenv shell celery beat -A resources.tasks.celery -s runtime/celerybeat-schedule --loglevel=DEBUG
终于可以正常执行celery的异步函数和定期任务了。
转载请注明:在路上 » 【已解决】Flask中如何用工厂模式初始化Celery