折腾:
【已解决】把celery的task集成到Flask的app中
期间,已经参考教程去合并celery到flask中了:
from flask import Flask
from celery import Celery
app = Flask(__name__)
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)
<————————————–
# Celery tasks
<————————————–
@celeryApp.task()
def deleteTmpAudioFile(filename):
"""
delete tmp audio file from filename
eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
"""
global log
log.info("deleteTmpAudioFile: filename=%s", filename)
audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
# audioTmpFolder = "tmp/audio"
log.info("audioTmpFolder=%s", audioTmpFolder)
curFolderAbsPath = os.getcwd() #’/Usersxxx/server’
log.info("curFolderAbsPath=%s", curFolderAbsPath)
audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)
tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
#’/Users/xxx/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3′
if os.path.isfile(tempAudioFullname):
os.remove(tempAudioFullname)
log.info("Ok to delete file %s", tempAudioFullname)
else:
log.warning("No need to remove for not exist file %s", tempAudioFullname)
class RobotQaAPI(Resource):
def processResponse(self, respDict):
…
# 2. use celery to delay delete tmp file
delayTimeToDelete = app.config["CELERY_DELETE_TMP_AUDIO_FILE_DELAY"]
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
log.info("Delay %s seconds to delete %s", delayTimeToDelete, tempFilename)
…
log.info("respDict=%s", respDict)
return jsonify(respDict)
结果:
celery的woker显示:
[2018-05-14 11:22:19,624: ERROR/MainProcess] Received unregistered task of type ‘Celery_RobotQA.deleteTmpAudioFile’.
The message has been ignored and discarded.
Did you remember to import the module containing this task?
Or maybe you’re using relative imports?
Please see
http://docs.celeryq.org/en/latest/internals/protocol.html
for more information.
The full contents of the message body was:
b'[["cc06dc63-0be8-439c-b9aa-e8c61ea78639.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’ (119b)
Traceback (most recent call last):
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
strategy = strategies[type_]
KeyError: ‘Celery_RobotQA.deleteTmpAudioFile’
其中之前运行clery的task改为:
➜ server celery -A app.celeryApp worker
[2018-05-14 11:20:56,282] INFO in app: app=<Flask ‘app’>
[2018-05-14 11:20:56,283] INFO in app: api=<flask_restful.Api object at 0x106eaaef0>
[2018-05-14 11:20:56,285] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x106eec198>
[2018-05-14 11:20:56,286] INFO in app: aiContext=<DialogueManager.Context object at 0x1077b0be0>
[2018-05-14 11:20:56,287] INFO in app: getBaiduTokenUrl=https://openapi.baidu.com/oauth/2.0/token?grant_type=client_credentials&client_id=SNjsggdYDNWtnlbKhxsPLcaz&client_secret=47xxxba
[2018-05-14 11:20:56,422] INFO in app: resp=<Response [200]>
[2018-05-14 11:20:56,423] INFO in app: respJson={‘access_token’: ‘24.95xxx313be.2592000.1xxx6.282335-1xxx83’, ‘session_key’: ‘9mxxxDsZwWUs4H0pYmw==’, ‘scope’: ‘public audio_voice_assistant_get audio_tts_post wise_adapt lebo_resource_base lightservice_public hetu_basic lightcms_map_poi kaidian_kaidian ApsMisTest_Test权限 vis-classify_flower lpq_开放 cop_helloScope ApsMis_fangdi_permission smartapp_snsapi_base’, ‘refresh_token’: ‘25.6xxx35-11192483’, ‘session_secret’: ’21daxxx88e’, ‘expires_in’: 2592000}
[2018-05-14 11:20:56,424] INFO in app: get baidu token resp: {‘access_token’: ‘24.95xxx483’, ‘session_key’: ‘9mzxxxmw==’, ‘scope’: ‘public audio_voice_assistant_get audio_tts_post wise_adapt lebo_resource_base lightservice_public hetu_basic lightcms_map_poi kaidian_kaidian ApsMisTest_Test权限 vis-classify_flower lpq_开放 cop_helloScope ApsMis_fangdi_permission smartapp_snsapi_base’, ‘refresh_token’: ‘25.6xxx35-11192483’, ‘session_secret’: ’21dxxx88e’, ‘expires_in’: 2592000}
[2018-05-14 11:20:56,424] INFO in app: audioTmpFolder=tmp/audio
[2018-05-14 11:20:56,424] INFO in app: curFolderAbsPath=/Users/crifan/dev/dev_root/xxxserver
[2018-05-14 11:20:56,425] INFO in app: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/xxx
[2018-05-14 11:20:56,425] INFO in app: gTempAudioFolder=/Users/crifan/dev/dev_root/xxx/server/tmp/audio
[2018-05-14 11:20:56,426] INFO in app: purePymongo=MongoClient(host=[‘x.x.x.x:p’], document_class=dict, tz_aware=False, connect=True, authsource=’gridfs’)
[2018-05-14 11:20:56,428] INFO in app: gridfsDb=Database(MongoClient(host=[‘x.x.x.x:[port’], document_class=dict, tz_aware=False, connect=True, authsource=’gridfs’), ‘gridfs’)
[2018-05-14 11:20:56,429] INFO in app: fsCollection=<gridfs.GridFS object at 0x107e7ddd8>
而更早之前,是:
celery -A celery_task worker
也是同样错误
celery ERROR/MainProcess Received unregistered task of type
celery flask ERROR/MainProcess Received unregistered task of type
python – Celery Received unregistered task of type (run example) – Stack Overflow
好像是:
要用:
–config=celeryconfig
其中:
CELERY_IMPORTS = ("tasks”,)
?
celery的worker遇到”Received unregistered task of type ‘tasks.add’”的错误 – 笑遍世界
去试试:
inspect registered?
celery inspect registered
Received unregistered task of type · Issue #3024 · celery/celery
Celery: Received unregistered task of type <AsyncResult: [hash]> · Issue #3255 · celery/celery
django Received unregistered task of type · Issue #3629 · celery/celery
cellery ImportError & AttributeError – 半天的半天 – 博客园
➜ server celery -A app.celeryApp worker inspect registered
[2018-05-14 11:37:54,359] INFO in app: app=<Flask ‘app’>
[2018-05-14 11:37:54,360] INFO in app: api=<flask_restful.Api object at 0x105863f28>
[2018-05-14 11:37:54,363] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x1058a51d0>
。。。
usage: celery worker [options]
celery: error: unrecognized arguments: inspect registered
然后去改为:
celery -A app.celeryApp inspect registered
Error: No nodes replied within time constraint.
参考:
https://github.com/celery/celery/issues/3024
$ celery -A scan.tasks:app worker -P gevent
好像是用冒号
celery -A app:celeryApp inspect registered
Error: No nodes replied within time constraint.
https://stackoverflow.com/questions/9769496/celery-received-unregistered-task-of-type-run-example
加上:
–loglevel=DEBUG
看看能否输出错误详细日志
celery -A app:celeryApp –loglevel=DEBUG inspect registered
usage: celery inspect [options] <command> [arg1 .. argN]
celery: error: unrecognized arguments: –loglevel=DEBUG
然后需要是worker才能加上debug:
➜ server celery -A app:celeryApp worker –loglevel=DEBUG
[2018-05-14 11:45:08,254] INFO in app: app=<Flask ‘app’>
[2018-05-14 11:45:08,255] INFO in app: api=<flask_restful.Api object at 0x10bcc3ef0>
[2018-05-14 11:45:08,257] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x10bd05198>
…
[2018-05-14 11:45:08,541: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-05-14 11:45:08,546: DEBUG/MainProcess] | Worker: Building graph…
[2018-05-14 11:45:08,547: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}
[2018-05-14 11:45:08,577: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-05-14 11:45:08,578: DEBUG/MainProcess] | Consumer: Building graph…
[2018-05-14 11:45:08,614: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Heart, Gossip, Agent, event loop}
celery@licrifandeMacBook-Pro.local v4.1.0 (latentcall)
Darwin-17.5.0-x86_64-i386-64bit 2018-05-14 11:45:08
[config]
.> app: Celery_RobotQA:0x10bd05198
.> transport: redis://localhost:6379/0
.> results: disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.deleteTmpAudioFile
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
[2018-05-14 11:45:08,660: DEBUG/MainProcess] | Worker: Starting Hub
[2018-05-14 11:45:08,661: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:08,661: DEBUG/MainProcess] | Worker: Starting Pool
[2018-05-14 11:45:08,860: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:08,868: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-05-14 11:45:08,870: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-05-14 11:45:08,896: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-05-14 11:45:08,896: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:08,897: DEBUG/MainProcess] | Consumer: Starting Events
[2018-05-14 11:45:08,920: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:08,921: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-05-14 11:45:08,921: INFO/MainProcess] mingle: searching for neighbors
[2018-05-14 11:45:09,954: INFO/MainProcess] mingle: all alone
[2018-05-14 11:45:09,954: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:09,955: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-05-14 11:45:09,960: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:09,960: DEBUG/MainProcess] | Consumer: Starting Control
[2018-05-14 11:45:09,965: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:09,965: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-05-14 11:45:09,967: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:09,967: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-05-14 11:45:09,972: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:45:09,972: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-05-14 11:45:09,972: DEBUG/MainProcess] | Worker: Hub.register Pool…
[2018-05-14 11:45:09,973: INFO/MainProcess] celery@licrifandeMacBook-Pro.local ready.
[2018-05-14 11:45:09,973: DEBUG/MainProcess] basic.qos: prefetch_count->16
很明显,此处看到了:
[tasks]
. app.deleteTmpAudioFile
有我的函数
但是不是叫做,我以为的:
Celery_RobotQA.deleteTmpAudioFile
所以需要去搞清楚,此处如何确定celery的name
感觉是:
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
此处名字是:
Celery_RobotQA
而此处运行celery的worker的地方:
以为是celery的instance的变量名,以为是celeryApp
所以用:
celery -A app:celeryApp worker
看来应该改为:
celery -A app:Celery_RobotQA worker –loglevel=DEBUG
去试试,结果出错:
Traceback (most recent call last):
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/app/utils.py", line 355, in find_app
sym = symbol_by_name(app, imp=imp)
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 506, in symbol_by_name
return imports.symbol_by_name(name, imp=imp)
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/kombu/utils/imports.py", line 61, in symbol_by_name
return getattr(module, cls_name) if cls_name else module
AttributeError: module ‘app’ has no attribute ‘Celery_RobotQA’
再去改为:
➜ server celery -A app.celeryApp:Celery_RobotQA worker –loglevel=DEBUG
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 950, in _find_and_load_unlocked
ModuleNotFoundError: No module named ‘app.celeryApp’; ‘app’ is not a package
再去:
➜ server celery -A app worker –loglevel=DEBUG
Traceback (most recent call last):
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/celery", line 11, in <module>
sys.exit(main())
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/__main__.py", line 14, in main
_main()
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/celery.py", line 326, in main
cmd.execute_from_commandline(argv)
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/celery.py", line 488, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 279, in execute_from_commandline
argv = self.setup_app_from_commandline(argv)
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 489, in setup_app_from_commandline
self._handle_user_preload_options(argv)
File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 494, in _handle_user_preload_options
user_preload = tuple(self.app.user_options[‘preload’] or ())
AttributeError: ‘Flask’ object has no attribute ‘user_options’
flask run celery
celery worker -A example.celery -l INFO
还是去:
celery worker -A app.celeryApp –loglevel=DEBUG
[2018-05-14 11:57:57,262: DEBUG/MainProcess] | Worker: Preparing bootsteps.
[2018-05-14 11:57:57,265: DEBUG/MainProcess] | Worker: Building graph…
[2018-05-14 11:57:57,265: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}
[2018-05-14 11:57:57,300: DEBUG/MainProcess] | Consumer: Preparing bootsteps.
[2018-05-14 11:57:57,301: DEBUG/MainProcess] | Consumer: Building graph…
[2018-05-14 11:57:57,326: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Heart, Agent, Gossip, event loop}
celery@licrifandeMacBook-Pro.local v4.1.0 (latentcall)
Darwin-17.5.0-x86_64-i386-64bit 2018-05-14 11:57:57
[config]
.> app: Celery_RobotQA:0x10958d198
.> transport: redis://localhost:6379/0
.> results: disabled://
.> concurrency: 4 (prefork)
.> task events: OFF (enable -E to monitor tasks in this worker)
[queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. app.deleteTmpAudioFile
. celery.accumulate
. celery.backend_cleanup
. celery.chain
. celery.chord
. celery.chord_unlock
. celery.chunks
. celery.group
. celery.map
. celery.starmap
[2018-05-14 11:57:57,367: DEBUG/MainProcess] | Worker: Starting Hub
[2018-05-14 11:57:57,368: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:57,368: DEBUG/MainProcess] | Worker: Starting Pool
[2018-05-14 11:57:57,586: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:57,588: DEBUG/MainProcess] | Worker: Starting Consumer
[2018-05-14 11:57:57,588: DEBUG/MainProcess] | Consumer: Starting Connection
[2018-05-14 11:57:57,626: INFO/MainProcess] Connected to redis://localhost:6379/0
[2018-05-14 11:57:57,626: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:57,626: DEBUG/MainProcess] | Consumer: Starting Events
[2018-05-14 11:57:57,650: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:57,650: DEBUG/MainProcess] | Consumer: Starting Mingle
[2018-05-14 11:57:57,650: INFO/MainProcess] mingle: searching for neighbors
[2018-05-14 11:57:58,679: INFO/MainProcess] mingle: all alone
[2018-05-14 11:57:58,680: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:58,680: DEBUG/MainProcess] | Consumer: Starting Tasks
[2018-05-14 11:57:58,687: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:58,687: DEBUG/MainProcess] | Consumer: Starting Control
[2018-05-14 11:57:58,693: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:58,693: DEBUG/MainProcess] | Consumer: Starting Heart
[2018-05-14 11:57:58,696: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:58,696: DEBUG/MainProcess] | Consumer: Starting Gossip
[2018-05-14 11:57:58,703: DEBUG/MainProcess] ^– substep ok
[2018-05-14 11:57:58,704: DEBUG/MainProcess] | Consumer: Starting event loop
[2018-05-14 11:57:58,704: DEBUG/MainProcess] | Worker: Hub.register Pool…
[2018-05-14 11:57:58,705: INFO/MainProcess] celery@licrifandeMacBook-Pro.local ready.
[2018-05-14 11:57:58,705: DEBUG/MainProcess] basic.qos: prefetch_count->16
但是不知道去运行是否能够调用异步任务
python – Run Celery Worker from FLASK app – Stack Overflow
感觉是,必须要用:
celery = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
即,Celery中的name,必须要是app.name才可以
但是看到:
Get Started with Celery and Flask – Scalingo
也是可以自定义名字的啊
Setting up Celery with Flask · Avi Aryan
用的是:
app.import_name
搜了下
flask app import_name
是:
API — Flask 1.0.2 documentation
“class flask.Flask(import_name,
If you are using a single module, __name__ is always the correct value. If you however are using a package, it’s usually recommended to hardcode the name of your package there.
app = Flask(‘yourapplication’)
app = Flask(__name__.split(‘.’)[0])”
所以去打印看看
log.info("app.name=%s", app.name)
log.info("app.import_name=%s", app.import_name)
log.info("__name__=%s", __name__)
输出:
[2018-05-14 13:35:08,102] INFO in app: app.name=app
[2018-05-14 13:35:08,102] INFO in app: app.import_name=app
[2018-05-14 13:35:08,103] INFO in app: __name__=app
对应还是:
[tasks]
. app.deleteTmpAudioFile
去搜索celery的初始化
Celery – Distributed Task Queue — Celery 4.1.0 documentation
API Reference — Celery 4.1.0 documentation
celery — Distributed processing — Celery 4.1.0 documentation
celery.app.task — Celery 4.1.0 documentation
“class celery.Celery(main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs)[source]
Celery application.
Parameters:
main (str) – Name of the main module if running as __main__. This is used as the prefix for auto-generated task names.”
再去搜:
celery app main name
Celery task display name – Stack Overflow
Application — Celery 4.1.0 documentation
“>>> from celery importCelery
>>> app =Celery()
>>> app
<Celery __main__:0x100469fd0>”
中的__main__就是Celery这个app的main name
但是此处我也没问题啊
Tasks — Celery 4.1.0 documentation
突然发现之前的装饰器好像写的有问题:
@celeryApp.task()
def deleteTmpAudioFile(filename):
应该改为:
@celeryApp.task
def deleteTmpAudioFile(filename):
不过先去调试看看
代码:
# celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
log.info("app.name=%s", app.name)
log.info("app.import_name=%s", app.import_name)
log.info("__name__=%s", __name__)
celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
celeryApp.conf.update(app.config)
log.info("celeryApp=%s", celeryApp)
@celeryApp.task()
# @celeryApp.task
def deleteTmpAudioFile(filename):
…
log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)
log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)
log.info("celeryApp.tasks=%s", celeryApp.tasks)
运行:
celery worker -A app.celeryApp –loglevel=DEBUG
输出:
[2018-05-14 13:54:40,255] INFO in app: deleteTmpAudioFile=<@task: app.deleteTmpAudioFile of app at 0x11129f198>
[2018-05-14 13:54:40,272] INFO in app: deleteTmpAudioFile.name=app.deleteTmpAudioFile
[2018-05-14 13:54:40,276] INFO in app: celeryApp.tasks={‘app.deleteTmpAudioFile’: <@task: app.deleteTmpAudioFile of app at 0x11129f198>, ‘celery.chord_unlock’: <@task: celery.chord_unlock of app at 0x11129f198>, ‘celery.group’: <@task: celery.group of app at 0x11129f198>, ‘celery.map’: <@task: celery.map of app at 0x11129f198>, ‘celery.chain’: <@task: celery.chain of app at 0x11129f198>, ‘celery.backend_cleanup’: <@task: celery.backend_cleanup of app at 0x11129f198>, ‘celery.starmap’: <@task: celery.starmap of app at 0x11129f198>, ‘celery.chord’: <@task: celery.chord of app at 0x11129f198>, ‘celery.accumulate’: <@task: celery.accumulate of app at 0x11129f198>, ‘celery.chunks’: <@task: celery.chunks of app at 0x11129f198>}
换成:
# @celeryApp.task()
@celeryApp.task
def deleteTmpAudioFile(filename):
效果一样
且换成:
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
# celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
结果生成的task的name还是
[2018-05-14 13:58:01,914] INFO in app: deleteTmpAudioFile=<@task: app.deleteTmpAudioFile of Celery_RobotQA at 0x10bcfa198>
[2018-05-14 13:58:01,928] INFO in app: deleteTmpAudioFile.name=app.deleteTmpAudioFile
看来是:
【总结】
根据:
http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-names
的:
the module the task is defined in
-》此处不论celery如何初始化,对于:
@celeryApp.task
def deleteTmpAudioFile(filename):
来说,如果没有显示的指定这个task的name的话,则系统会根据:
此task所在的模块名
此处即:
app.py中的app
去最为前缀,所以task的name始终是:
app.deleteTmpAudioFile
-》
所以可以看到,不论是:
celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
celeryApp = Celery(__name__, broker=app.config[‘CELERY_BROKER_URL’])
还是:
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
其中:
app.config["CELERY_TASK_NAME”]
是
Celery_RobotQA
tasks显示的都是:
. app.deleteTmpAudioFile
而此时如果是后者:
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
去初始化的celery,则此时代码中调用
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
则尝试去找的是:
celery的main = Celery_RobotQA
-》Celery_RobotQA.deleteTmpAudioFile
而此处只有:
app.deleteTmpAudioFile
所以找不到。
解决办法是:
要么:
1.都统一用:app.deleteTmpAudioFile
具体写法:
celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
@celeryApp.task
def deleteTmpAudioFile(filename):
。。。
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
对应的apply_async后,相关log是:
[2018-05-14 15:05:49,056: INFO/MainProcess] Received task: app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495] ETA:[2018-05-14 07:06:08.084889+00:00]
[2018-05-14 15:05:49,062: DEBUG/MainProcess] basic.qos: prefetch_count->17
然后过了delay时间后,执行对应任务:
[2018-05-14 15:06:08,089: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x106bc6b70> (args:(‘app.deleteTmpAudioFile’, ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, {‘lang’: ‘py’, ‘task’: ‘app.deleteTmpAudioFile’, ‘id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘eta’: ‘2018-05-14T07:06:08.084889+00:00’, ‘expires’: None, ‘group’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘parent_id’: None, ‘argsrepr’: "[‘51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3’]", ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen62154@licrifandeMacBook-Pro.local‘, ‘reply_to’: ‘513d99ab-dfe4-33d9-8c8f-f16a94fb2f99’, ‘correlation_id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘delivery_info’: {‘exchange’: ”, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}}, b'[["51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’, ‘application/json’, ‘utf-8’) kwargs:{})
[2018-05-14 15:06:08,491: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-05-14 15:06:08,496: DEBUG/MainProcess] Task accepted: app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495] pid:62182
[2018-05-14 15:06:08,500: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,498] INFO in app: deleteTmpAudioFile: filename=51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3
[2018-05-14 15:06:08,498: INFO/ForkPoolWorker-1] deleteTmpAudioFile: filename=51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3
[2018-05-14 15:06:08,504: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,504] INFO in app: audioTmpFolder=tmp/audio
[2018-05-14 15:06:08,504: INFO/ForkPoolWorker-1] audioTmpFolder=tmp/audio
[2018-05-14 15:06:08,506: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,505] INFO in app: curFolderAbsPath=/Users/crifan/dev/xxx/server
[2018-05-14 15:06:08,505: INFO/ForkPoolWorker-1] curFolderAbsPath=/Users/crifan/dev/xxx/server
[2018-05-14 15:06:08,506: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,506] INFO in app: audioTmpFolderFullPath=/Users/crifan/xxx/server/tmp/audio
[2018-05-14 15:06:08,506: INFO/ForkPoolWorker-1] audioTmpFolderFullPath=/Users/crifan/xxx/server/tmp/audio
[2018-05-14 15:06:08,508: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,508] INFO in app: Ok to delete file /Users/crifan/devxxx/server/tmp/audio/51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3
[2018-05-14 15:06:08,508: INFO/ForkPoolWorker-1] Ok to delete file /Users/crifan/dev/xxx/server/tmp/audio/51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3
[2018-05-14 15:06:08,510: INFO/ForkPoolWorker-1] Task app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495] succeeded in 0.013172306003980339s: None
其中的task是:
app.deleteTmpAudioFile
要么:
2.指定了celery的main的name后,所有的task都手动指定前缀为celery的main的name
celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])
# app.config["CELERY_TASK_NAME”] = Celery_RobotQA
@celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")
def deleteTmpAudioFile(filename):
。。。
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
其中的task是:
Celery_RobotQA.deleteTmpAudioFile
缺点就是写起来很麻烦了。
运行效果是:
[2018-05-14 15:12:59,850] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x1055f31d0>
[2018-05-14 15:13:00,036] INFO in app: deleteTmpAudioFile=<@task: Celery_RobotQA.deleteTmpAudioFile of Celery_RobotQA at 0x1055f31d0>
[2018-05-14 15:13:00,052] INFO in app: deleteTmpAudioFile.name=Celery_RobotQA.deleteTmpAudioFile
[2018-05-14 15:13:00,055] INFO in app: celeryApp.tasks={‘Celery_RobotQA.deleteTmpAudioFile’: <@task: Celery_RobotQA.deleteTmpAudioFile of Celery_RobotQA at 0x1055f31d0>, ‘celery.chord_unlock’: <@task: celery.chord_unlock of Celery_RobotQA at 0x1055f31d0>, ‘celery.group’: <@task: celery.group of Celery_RobotQA at 0x1055f31d0>, ‘celery.map’: <@task: celery.map of Celery_RobotQA at 0x1055f31d0>, ‘celery.chain’: <@task: celery.chain of Celery_RobotQA at 0x1055f31d0>, ‘celery.backend_cleanup’: <@task: celery.backend_cleanup of Celery_RobotQA at 0x1055f31d0>, ‘celery.starmap’: <@task: celery.starmap of Celery_RobotQA at 0x1055f31d0>, ‘celery.chord’: <@task: celery.chord of Celery_RobotQA at 0x1055f31d0>, ‘celery.accumulate’: <@task: celery.accumulate of Celery_RobotQA at 0x1055f31d0>, ‘celery.chunks’: <@task: celery.chunks of Celery_RobotQA at 0x1055f31d0>}
[tasks]
. Celery_RobotQA.deleteTmpAudioFile
调用apply_async后:
[2018-05-14 15:14:11,092: INFO/MainProcess] Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] ETA:[2018-05-14 07:14:30.839388+00:00]
[2018-05-14 15:14:11,093: DEBUG/MainProcess] basic.qos: prefetch_count->17
[2018-05-14 15:14:30,846: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:(‘Celery_RobotQA.deleteTmpAudioFile’, ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, {‘lang’: ‘py’, ‘task’: ‘Celery_RobotQA.deleteTmpAudioFile’, ‘id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘eta’: ‘2018-05-14T07:14:30.839388+00:00’, ‘expires’: None, ‘group’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘parent_id’: None, ‘argsrepr’: "[‘384541cb-2132-4d60-a42f-f693593b7921.mp3’]", ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen62343@licrifandeMacBook-Pro.local‘, ‘reply_to’: ‘e33e7319-ad76-33a4-8c48-4ba2121138c0’, ‘correlation_id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘delivery_info’: {‘exchange’: ”, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}}, b'[["384541cb-2132-4d60-a42f-f693593b7921.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’, ‘application/json’, ‘utf-8’) kwargs:{})
[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326
[2018-05-14 15:14:31,433: WARNING/ForkPoolWorker-1] [2018-05-14 15:14:31,431] INFO in app: deleteTmpAudioFile: filename=384541cb-2132-4d60-a42f-f693593b7921.mp3
。。。
[2018-05-14 15:14:31,446: INFO/ForkPoolWorker-1] Task Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] succeeded in 0.014525277016218752s: None
注:
不论那种celery都是用:
celery worker -A app.celeryApp –loglevel=DEBUG
去运行。
最终结论是:
为了简单方便,还是用第一种方法:
flask的app.py中用:
from flask import Flask
from celery import Celery
app = Flask(__name__)
celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])
celeryApp.conf.update(app.config)
@celeryApp.task
def deleteTmpAudioFile(filename):
。。。
# call task
deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)
即可。
然后在Flask的app运行之前,确保先运行celery的worker
celery worker -A app.celeryApp
想要看到更新信息,比如tasks的列表,其中包含此处的deleteTmpAudioFile的全名:
app.deleteTmpAudioFile
可以加上debug参数:
celery worker -A app.celeryApp –loglevel=DEBUG
然后代码运行了
deleteTmpAudioFile.apply_async
后,celery的log会看到:
Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] ETA:[2018-05-14 07:14:30.839388+00:00]
过了指定的延期时间后,会收到任务并执行:
TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:(‘Celery_RobotQA.deleteTmpAudioFile’, ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, …
[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326
…
即可正常在Flask中执行了。
但是此处貌似有个缺点:
celery放在了flask中,导致每次启动celery的worker时,同时启动了flask的app。。。
不过暂时就这么着吧,有空再优化。
转载请注明:在路上 » 【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type