最新消息:20210917 已从crifan.com换到crifan.org

【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type

celery crifan 6551浏览 0评论

折腾:

【已解决】把celery的task集成到Flask的app中

期间,已经参考教程去合并celery到flask中了:

from flask import Flask

from celery import Celery

app = Flask(__name__)

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

celeryApp.conf.update(app.config)

log.info("celeryApp=%s", celeryApp)

<————————————–

# Celery tasks

<————————————–

@celeryApp.task()

def deleteTmpAudioFile(filename):

    """

        delete tmp audio file from filename

            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3

    """

    global log

    log.info("deleteTmpAudioFile: filename=%s", filename)

    audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]

    # audioTmpFolder = "tmp/audio"

    log.info("audioTmpFolder=%s", audioTmpFolder)

    curFolderAbsPath = os.getcwd() #’/Usersxxx/server’

    log.info("curFolderAbsPath=%s", curFolderAbsPath)

    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)

    log.info("audioTmpFolderFullPath=%s", audioTmpFolderFullPath)

    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)

    #’/Users/xxx/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3′

    if os.path.isfile(tempAudioFullname):

        os.remove(tempAudioFullname)

        log.info("Ok to delete file %s", tempAudioFullname)

    else:

        log.warning("No need to remove for not exist file %s", tempAudioFullname)

class RobotQaAPI(Resource):

    def processResponse(self, respDict):

        …

            # 2. use celery to delay delete tmp file

            delayTimeToDelete = app.config["CELERY_DELETE_TMP_AUDIO_FILE_DELAY"]

            deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)

            log.info("Delay %s seconds to delete %s", delayTimeToDelete, tempFilename)

            …

        log.info("respDict=%s", respDict)

        return jsonify(respDict)

结果:

celery的woker显示:

[2018-05-14 11:22:19,624: ERROR/MainProcess] Received unregistered task of type ‘Celery_RobotQA.deleteTmpAudioFile’.

The message has been ignored and discarded.

Did you remember to import the module containing this task?

Or maybe you’re using relative imports?

Please see

http://docs.celeryq.org/en/latest/internals/protocol.html

for more information.

The full contents of the message body was:

b'[["cc06dc63-0be8-439c-b9aa-e8c61ea78639.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’ (119b)

Traceback (most recent call last):

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received

    strategy = strategies[type_]

KeyError: ‘Celery_RobotQA.deleteTmpAudioFile’

其中之前运行clery的task改为:

➜  server celery -A app.celeryApp worker

[2018-05-14 11:20:56,282] INFO in app: app=<Flask ‘app’>

[2018-05-14 11:20:56,283] INFO in app: api=<flask_restful.Api object at 0x106eaaef0>

[2018-05-14 11:20:56,285] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x106eec198>

[2018-05-14 11:20:56,286] INFO in app: aiContext=<DialogueManager.Context object at 0x1077b0be0>

[2018-05-14 11:20:56,287] INFO in app: getBaiduTokenUrl=https://openapi.baidu.com/oauth/2.0/token?grant_type=client_credentials&client_id=SNjsggdYDNWtnlbKhxsPLcaz&client_secret=47xxxba

[2018-05-14 11:20:56,422] INFO in app: resp=<Response [200]>

[2018-05-14 11:20:56,423] INFO in app: respJson={‘access_token’: ‘24.95xxx313be.2592000.1xxx6.282335-1xxx83’, ‘session_key’: ‘9mxxxDsZwWUs4H0pYmw==’, ‘scope’: ‘public audio_voice_assistant_get audio_tts_post wise_adapt lebo_resource_base lightservice_public hetu_basic lightcms_map_poi kaidian_kaidian ApsMisTest_Test权限 vis-classify_flower lpq_开放 cop_helloScope ApsMis_fangdi_permission smartapp_snsapi_base’, ‘refresh_token’: ‘25.6xxx35-11192483’, ‘session_secret’: ’21daxxx88e’, ‘expires_in’: 2592000}

[2018-05-14 11:20:56,424] INFO in app: get baidu token resp: {‘access_token’: ‘24.95xxx483’, ‘session_key’: ‘9mzxxxmw==’, ‘scope’: ‘public audio_voice_assistant_get audio_tts_post wise_adapt lebo_resource_base lightservice_public hetu_basic lightcms_map_poi kaidian_kaidian ApsMisTest_Test权限 vis-classify_flower lpq_开放 cop_helloScope ApsMis_fangdi_permission smartapp_snsapi_base’, ‘refresh_token’: ‘25.6xxx35-11192483’, ‘session_secret’: ’21dxxx88e’, ‘expires_in’: 2592000}

[2018-05-14 11:20:56,424] INFO in app: audioTmpFolder=tmp/audio

[2018-05-14 11:20:56,424] INFO in app: curFolderAbsPath=/Users/crifan/dev/dev_root/xxxserver

[2018-05-14 11:20:56,425] INFO in app: audioTmpFolderFullPath=/Users/crifan/dev/dev_root/xxx

[2018-05-14 11:20:56,425] INFO in app: gTempAudioFolder=/Users/crifan/dev/dev_root/xxx/server/tmp/audio

[2018-05-14 11:20:56,426] INFO in app: purePymongo=MongoClient(host=[‘x.x.x.x:p’], document_class=dict, tz_aware=False, connect=True, authsource=’gridfs’)

[2018-05-14 11:20:56,428] INFO in app: gridfsDb=Database(MongoClient(host=[‘x.x.x.x:[port’], document_class=dict, tz_aware=False, connect=True, authsource=’gridfs’), ‘gridfs’)

[2018-05-14 11:20:56,429] INFO in app: fsCollection=<gridfs.GridFS object at 0x107e7ddd8>

而更早之前,是:

celery -A celery_task  worker

也是同样错误

celery ERROR/MainProcess Received unregistered task of type

celery flask ERROR/MainProcess Received unregistered task of type

python – Celery Received unregistered task of type (run example) – Stack Overflow

好像是:

要用:

–config=celeryconfig

其中:

CELERY_IMPORTS = ("tasks”,)

celery的worker遇到”Received unregistered task of type ‘tasks.add’”的错误 – 笑遍世界

去试试:

inspect registered?

celery inspect registered

Received unregistered task of type · Issue #3024 · celery/celery

Celery: Received unregistered task of type <AsyncResult: [hash]> · Issue #3255 · celery/celery

django Received unregistered task of type · Issue #3629 · celery/celery

cellery ImportError & AttributeError – 半天的半天 – 博客园

➜  server celery -A app.celeryApp worker inspect registered

[2018-05-14 11:37:54,359] INFO in app: app=<Flask ‘app’>

[2018-05-14 11:37:54,360] INFO in app: api=<flask_restful.Api object at 0x105863f28>

[2018-05-14 11:37:54,363] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x1058a51d0>

。。。

usage: celery worker [options]

celery: error: unrecognized arguments: inspect registered

然后去改为:

celery -A app.celeryApp inspect registered

Error: No nodes replied within time constraint.

参考:

https://github.com/celery/celery/issues/3024

$ celery -A scan.tasks:app worker -P gevent

好像是用冒号

celery -A app:celeryApp inspect registered

Error: No nodes replied within time constraint.

https://stackoverflow.com/questions/9769496/celery-received-unregistered-task-of-type-run-example

加上:

–loglevel=DEBUG

看看能否输出错误详细日志

celery -A app:celeryApp –loglevel=DEBUG inspect registered

usage: celery inspect [options]  <command> [arg1 .. argN]

celery: error: unrecognized arguments: –loglevel=DEBUG

然后需要是worker才能加上debug:

➜  server celery -A app:celeryApp worker –loglevel=DEBUG

[2018-05-14 11:45:08,254] INFO in app: app=<Flask ‘app’>

[2018-05-14 11:45:08,255] INFO in app: api=<flask_restful.Api object at 0x10bcc3ef0>

[2018-05-14 11:45:08,257] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x10bd05198>

[2018-05-14 11:45:08,541: DEBUG/MainProcess] | Worker: Preparing bootsteps.

[2018-05-14 11:45:08,546: DEBUG/MainProcess] | Worker: Building graph…

[2018-05-14 11:45:08,547: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}

[2018-05-14 11:45:08,577: DEBUG/MainProcess] | Consumer: Preparing bootsteps.

[2018-05-14 11:45:08,578: DEBUG/MainProcess] | Consumer: Building graph…

[2018-05-14 11:45:08,614: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Heart, Gossip, Agent, event loop}

celery@licrifandeMacBook-Pro.local v4.1.0 (latentcall)

Darwin-17.5.0-x86_64-i386-64bit 2018-05-14 11:45:08

[config]

.> app:         Celery_RobotQA:0x10bd05198

.> transport:   redis://localhost:6379/0

.> results:     disabled://

.> concurrency: 4 (prefork)

.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]

.> celery           exchange=celery(direct) key=celery

[tasks]

  . app.deleteTmpAudioFile

  . celery.accumulate

  . celery.backend_cleanup

  . celery.chain

  . celery.chord

  . celery.chord_unlock

  . celery.chunks

  . celery.group

  . celery.map

  . celery.starmap

[2018-05-14 11:45:08,660: DEBUG/MainProcess] | Worker: Starting Hub

[2018-05-14 11:45:08,661: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:08,661: DEBUG/MainProcess] | Worker: Starting Pool

[2018-05-14 11:45:08,860: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:08,868: DEBUG/MainProcess] | Worker: Starting Consumer

[2018-05-14 11:45:08,870: DEBUG/MainProcess] | Consumer: Starting Connection

[2018-05-14 11:45:08,896: INFO/MainProcess] Connected to redis://localhost:6379/0

[2018-05-14 11:45:08,896: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:08,897: DEBUG/MainProcess] | Consumer: Starting Events

[2018-05-14 11:45:08,920: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:08,921: DEBUG/MainProcess] | Consumer: Starting Mingle

[2018-05-14 11:45:08,921: INFO/MainProcess] mingle: searching for neighbors

[2018-05-14 11:45:09,954: INFO/MainProcess] mingle: all alone

[2018-05-14 11:45:09,954: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:09,955: DEBUG/MainProcess] | Consumer: Starting Tasks

[2018-05-14 11:45:09,960: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:09,960: DEBUG/MainProcess] | Consumer: Starting Control

[2018-05-14 11:45:09,965: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:09,965: DEBUG/MainProcess] | Consumer: Starting Heart

[2018-05-14 11:45:09,967: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:09,967: DEBUG/MainProcess] | Consumer: Starting Gossip

[2018-05-14 11:45:09,972: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:45:09,972: DEBUG/MainProcess] | Consumer: Starting event loop

[2018-05-14 11:45:09,972: DEBUG/MainProcess] | Worker: Hub.register Pool…

[2018-05-14 11:45:09,973: INFO/MainProcess] celery@licrifandeMacBook-Pro.local ready.

[2018-05-14 11:45:09,973: DEBUG/MainProcess] basic.qos: prefetch_count->16

 

很明显,此处看到了:

[tasks]

  . app.deleteTmpAudioFile

有我的函数

但是不是叫做,我以为的:

Celery_RobotQA.deleteTmpAudioFile

所以需要去搞清楚,此处如何确定celery的name

感觉是:

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

此处名字是:

Celery_RobotQA

 而此处运行celery的worker的地方:

以为是celery的instance的变量名,以为是celeryApp

所以用:

celery -A app:celeryApp worker

看来应该改为:

celery -A app:Celery_RobotQA worker –loglevel=DEBUG

去试试,结果出错:

Traceback (most recent call last):

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/app/utils.py", line 355, in find_app

    sym = symbol_by_name(app, imp=imp)

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 506, in symbol_by_name

    return imports.symbol_by_name(name, imp=imp)

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/kombu/utils/imports.py", line 61, in symbol_by_name

    return getattr(module, cls_name) if cls_name else module

AttributeError: module ‘app’ has no attribute ‘Celery_RobotQA’

再去改为:

➜  server celery -A app.celeryApp:Celery_RobotQA worker –loglevel=DEBUG

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/importlib/__init__.py", line 126, in import_module

    return _bootstrap._gcd_import(name[level:], package, level)

  File "<frozen importlib._bootstrap>", line 994, in _gcd_import

  File "<frozen importlib._bootstrap>", line 971, in _find_and_load

  File "<frozen importlib._bootstrap>", line 950, in _find_and_load_unlocked

ModuleNotFoundError: No module named ‘app.celeryApp’; ‘app’ is not a package

再去:

➜  server celery -A app worker –loglevel=DEBUG

Traceback (most recent call last):

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/celery", line 11, in <module>

    sys.exit(main())

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/__main__.py", line 14, in main

    _main()

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/celery.py", line 326, in main

    cmd.execute_from_commandline(argv)

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/celery.py", line 488, in execute_from_commandline

    super(CeleryCommand, self).execute_from_commandline(argv)))

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 279, in execute_from_commandline

    argv = self.setup_app_from_commandline(argv)

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 489, in setup_app_from_commandline

    self._handle_user_preload_options(argv)

  File "/Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages/celery/bin/base.py", line 494, in _handle_user_preload_options

    user_preload = tuple(self.app.user_options[‘preload’] or ())

AttributeError: ‘Flask’ object has no attribute ‘user_options’

flask  run celery

在 Flask 项目中使用 Celery – 李林克斯

celery worker -A example.celery -l INFO

还是去:

celery worker -A app.celeryApp –loglevel=DEBUG

[2018-05-14 11:57:57,262: DEBUG/MainProcess] | Worker: Preparing bootsteps.

[2018-05-14 11:57:57,265: DEBUG/MainProcess] | Worker: Building graph…

[2018-05-14 11:57:57,265: DEBUG/MainProcess] | Worker: New boot order: {Beat, Timer, Hub, Pool, Autoscaler, StateDB, Consumer}

[2018-05-14 11:57:57,300: DEBUG/MainProcess] | Consumer: Preparing bootsteps.

[2018-05-14 11:57:57,301: DEBUG/MainProcess] | Consumer: Building graph…

[2018-05-14 11:57:57,326: DEBUG/MainProcess] | Consumer: New boot order: {Connection, Events, Mingle, Tasks, Control, Heart, Agent, Gossip, event loop}

celery@licrifandeMacBook-Pro.local v4.1.0 (latentcall)

Darwin-17.5.0-x86_64-i386-64bit 2018-05-14 11:57:57

[config]

.> app:         Celery_RobotQA:0x10958d198

.> transport:   redis://localhost:6379/0

.> results:     disabled://

.> concurrency: 4 (prefork)

.> task events: OFF (enable -E to monitor tasks in this worker)

[queues]

.> celery           exchange=celery(direct) key=celery

[tasks]

  . app.deleteTmpAudioFile

  . celery.accumulate

  . celery.backend_cleanup

  . celery.chain

  . celery.chord

  . celery.chord_unlock

  . celery.chunks

  . celery.group

  . celery.map

  . celery.starmap

[2018-05-14 11:57:57,367: DEBUG/MainProcess] | Worker: Starting Hub

[2018-05-14 11:57:57,368: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:57,368: DEBUG/MainProcess] | Worker: Starting Pool

[2018-05-14 11:57:57,586: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:57,588: DEBUG/MainProcess] | Worker: Starting Consumer

[2018-05-14 11:57:57,588: DEBUG/MainProcess] | Consumer: Starting Connection

[2018-05-14 11:57:57,626: INFO/MainProcess] Connected to redis://localhost:6379/0

[2018-05-14 11:57:57,626: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:57,626: DEBUG/MainProcess] | Consumer: Starting Events

[2018-05-14 11:57:57,650: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:57,650: DEBUG/MainProcess] | Consumer: Starting Mingle

[2018-05-14 11:57:57,650: INFO/MainProcess] mingle: searching for neighbors

[2018-05-14 11:57:58,679: INFO/MainProcess] mingle: all alone

[2018-05-14 11:57:58,680: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:58,680: DEBUG/MainProcess] | Consumer: Starting Tasks

[2018-05-14 11:57:58,687: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:58,687: DEBUG/MainProcess] | Consumer: Starting Control

[2018-05-14 11:57:58,693: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:58,693: DEBUG/MainProcess] | Consumer: Starting Heart

[2018-05-14 11:57:58,696: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:58,696: DEBUG/MainProcess] | Consumer: Starting Gossip

[2018-05-14 11:57:58,703: DEBUG/MainProcess] ^– substep ok

[2018-05-14 11:57:58,704: DEBUG/MainProcess] | Consumer: Starting event loop

[2018-05-14 11:57:58,704: DEBUG/MainProcess] | Worker: Hub.register Pool…

[2018-05-14 11:57:58,705: INFO/MainProcess] celery@licrifandeMacBook-Pro.local ready.

[2018-05-14 11:57:58,705: DEBUG/MainProcess] basic.qos: prefetch_count->16

但是不知道去运行是否能够调用异步任务

python – Run Celery Worker from FLASK app – Stack Overflow

感觉是,必须要用:

celery = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

即,Celery中的name,必须要是app.name才可以

但是看到:

Get Started with Celery and Flask – Scalingo

也是可以自定义名字的啊

Setting up Celery with Flask · Avi Aryan

用的是:

app.import_name

搜了下

flask app  import_name

是:

API — Flask 1.0.2 documentation

“class flask.Flask(import_name,

If you are using a single module, __name__ is always the correct value. If you however are using a package, it’s usually recommended to hardcode the name of your package there.

app = Flask(‘yourapplication’)

app = Flask(__name__.split(‘.’)[0])”

所以去打印看看

log.info("app.name=%s", app.name)

log.info("app.import_name=%s", app.import_name)

log.info("__name__=%s", __name__)

输出:

[2018-05-14 13:35:08,102] INFO in app: app.name=app

[2018-05-14 13:35:08,102] INFO in app: app.import_name=app

[2018-05-14 13:35:08,103] INFO in app: __name__=app

对应还是: 

[tasks]

  . app.deleteTmpAudioFile

去搜索celery的初始化

Celery – Distributed Task Queue — Celery 4.1.0 documentation

API Reference — Celery 4.1.0 documentation

celery — Distributed processing — Celery 4.1.0 documentation

celery.app.task — Celery 4.1.0 documentation

“class celery.Celery(main=None, loader=None, backend=None, amqp=None, events=None, log=None, control=None, set_as_current=True, tasks=None, broker=None, include=None, changes=None, config_source=None, fixups=None, task_cls=None, autofinalize=True, namespace=None, strict_typing=True, **kwargs)[source]

Celery application.

Parameters:

main (str) – Name of the main module if running as __main__. This is used as the prefix for auto-generated task names.”

再去搜:

celery app main name

Celery task display name – Stack Overflow

Application — Celery 4.1.0 documentation

“>>> from celery importCelery

>>> app =Celery()

>>> app

<Celery __main__:0x100469fd0>”

中的__main__就是Celery这个app的main name

但是此处我也没问题啊

Tasks — Celery 4.1.0 documentation

突然发现之前的装饰器好像写的有问题:

@celeryApp.task()

def deleteTmpAudioFile(filename):

应该改为:

@celeryApp.task

def deleteTmpAudioFile(filename):

不过先去调试看看

代码:

# celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

log.info("app.name=%s", app.name)

log.info("app.import_name=%s", app.import_name)

log.info("__name__=%s", __name__)

celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

celeryApp.conf.update(app.config)

log.info("celeryApp=%s", celeryApp)

@celeryApp.task()

# @celeryApp.task

def deleteTmpAudioFile(filename):

   …

log.info("deleteTmpAudioFile=%s", deleteTmpAudioFile)

log.info("deleteTmpAudioFile.name=%s", deleteTmpAudioFile.name)

log.info("celeryApp.tasks=%s", celeryApp.tasks)

运行:

celery worker -A app.celeryApp –loglevel=DEBUG

输出:

[2018-05-14 13:54:40,255] INFO in app: deleteTmpAudioFile=<@task: app.deleteTmpAudioFile of app at 0x11129f198>

[2018-05-14 13:54:40,272] INFO in app: deleteTmpAudioFile.name=app.deleteTmpAudioFile

[2018-05-14 13:54:40,276] INFO in app: celeryApp.tasks={‘app.deleteTmpAudioFile’: <@task: app.deleteTmpAudioFile of app at 0x11129f198>, ‘celery.chord_unlock’: <@task: celery.chord_unlock of app at 0x11129f198>, ‘celery.group’: <@task: celery.group of app at 0x11129f198>, ‘celery.map’: <@task: celery.map of app at 0x11129f198>, ‘celery.chain’: <@task: celery.chain of app at 0x11129f198>, ‘celery.backend_cleanup’: <@task: celery.backend_cleanup of app at 0x11129f198>, ‘celery.starmap’: <@task: celery.starmap of app at 0x11129f198>, ‘celery.chord’: <@task: celery.chord of app at 0x11129f198>, ‘celery.accumulate’: <@task: celery.accumulate of app at 0x11129f198>, ‘celery.chunks’: <@task: celery.chunks of app at 0x11129f198>}

换成:

# @celeryApp.task()

@celeryApp.task

def deleteTmpAudioFile(filename):

效果一样

且换成:

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

# celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

结果生成的task的name还是

[2018-05-14 13:58:01,914] INFO in app: deleteTmpAudioFile=<@task: app.deleteTmpAudioFile of Celery_RobotQA at 0x10bcfa198>

[2018-05-14 13:58:01,928] INFO in app: deleteTmpAudioFile.name=app.deleteTmpAudioFile

看来是:

【总结】

根据:

http://docs.celeryproject.org/en/latest/userguide/tasks.html#task-names

的:

the module the task is defined in

-》此处不论celery如何初始化,对于:

@celeryApp.task

def deleteTmpAudioFile(filename):

来说,如果没有显示的指定这个task的name的话,则系统会根据:

此task所在的模块名

此处即:

app.py中的app

去最为前缀,所以task的name始终是:

app.deleteTmpAudioFile

-》

所以可以看到,不论是:

celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

celeryApp = Celery(__name__, broker=app.config[‘CELERY_BROKER_URL’])

还是:

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

其中:

app.config["CELERY_TASK_NAME”]

Celery_RobotQA

tasks显示的都是:

. app.deleteTmpAudioFile

而此时如果是后者:

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

去初始化的celery,则此时代码中调用

deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)

则尝试去找的是:

celery的main = Celery_RobotQA

-》Celery_RobotQA.deleteTmpAudioFile

而此处只有:

app.deleteTmpAudioFile

所以找不到。

解决办法是:

要么:

1.都统一用:app.deleteTmpAudioFile

具体写法:

celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

@celeryApp.task

def deleteTmpAudioFile(filename):

    。。。

deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)

对应的apply_async后,相关log是:

[2018-05-14 15:05:49,056: INFO/MainProcess] Received task: app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495]  ETA:[2018-05-14 07:06:08.084889+00:00]

[2018-05-14 15:05:49,062: DEBUG/MainProcess] basic.qos: prefetch_count->17

然后过了delay时间后,执行对应任务:

[2018-05-14 15:06:08,089: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x106bc6b70> (args:(‘app.deleteTmpAudioFile’, ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, {‘lang’: ‘py’, ‘task’: ‘app.deleteTmpAudioFile’, ‘id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘eta’: ‘2018-05-14T07:06:08.084889+00:00’, ‘expires’: None, ‘group’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘parent_id’: None, ‘argsrepr’: "[‘51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3’]", ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen62154@licrifandeMacBook-Pro.local‘, ‘reply_to’: ‘513d99ab-dfe4-33d9-8c8f-f16a94fb2f99’, ‘correlation_id’: ‘0d5f7bfa-3f40-452f-beb9-91cc3c52f495’, ‘delivery_info’: {‘exchange’: ”, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}}, b'[["51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’, ‘application/json’, ‘utf-8’) kwargs:{})

[2018-05-14 15:06:08,491: DEBUG/MainProcess] basic.qos: prefetch_count->16

[2018-05-14 15:06:08,496: DEBUG/MainProcess] Task accepted: app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495] pid:62182

[2018-05-14 15:06:08,500: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,498] INFO in app: deleteTmpAudioFile: filename=51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3

[2018-05-14 15:06:08,498: INFO/ForkPoolWorker-1] deleteTmpAudioFile: filename=51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3

[2018-05-14 15:06:08,504: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,504] INFO in app: audioTmpFolder=tmp/audio

[2018-05-14 15:06:08,504: INFO/ForkPoolWorker-1] audioTmpFolder=tmp/audio

[2018-05-14 15:06:08,506: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,505] INFO in app: curFolderAbsPath=/Users/crifan/dev/xxx/server

[2018-05-14 15:06:08,505: INFO/ForkPoolWorker-1] curFolderAbsPath=/Users/crifan/dev/xxx/server

[2018-05-14 15:06:08,506: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,506] INFO in app: audioTmpFolderFullPath=/Users/crifan/xxx/server/tmp/audio

[2018-05-14 15:06:08,506: INFO/ForkPoolWorker-1] audioTmpFolderFullPath=/Users/crifan/xxx/server/tmp/audio

[2018-05-14 15:06:08,508: WARNING/ForkPoolWorker-1] [2018-05-14 15:06:08,508] INFO in app: Ok to delete file /Users/crifan/devxxx/server/tmp/audio/51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3

[2018-05-14 15:06:08,508: INFO/ForkPoolWorker-1] Ok to delete file /Users/crifan/dev/xxx/server/tmp/audio/51206d31-4a78-4fd1-bb5f-43002276ae6e.mp3

[2018-05-14 15:06:08,510: INFO/ForkPoolWorker-1] Task app.deleteTmpAudioFile[0d5f7bfa-3f40-452f-beb9-91cc3c52f495] succeeded in 0.013172306003980339s: None

其中的task是:

app.deleteTmpAudioFile

要么:

2.指定了celery的main的name后,所有的task都手动指定前缀为celery的main的name

celeryApp = Celery(app.config["CELERY_TASK_NAME"], broker=app.config[‘CELERY_BROKER_URL’])

# app.config["CELERY_TASK_NAME”] = Celery_RobotQA

@celeryApp.task(name=app.config["CELERY_TASK_NAME"] + ".deleteTmpAudioFile")

def deleteTmpAudioFile(filename):

    。。。

deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)

其中的task是:

Celery_RobotQA.deleteTmpAudioFile

缺点就是写起来很麻烦了。

运行效果是:

[2018-05-14 15:12:59,850] INFO in app: celeryApp=<Celery Celery_RobotQA at 0x1055f31d0>

[2018-05-14 15:13:00,036] INFO in app: deleteTmpAudioFile=<@task: Celery_RobotQA.deleteTmpAudioFile of Celery_RobotQA at 0x1055f31d0>

[2018-05-14 15:13:00,052] INFO in app: deleteTmpAudioFile.name=Celery_RobotQA.deleteTmpAudioFile

[2018-05-14 15:13:00,055] INFO in app: celeryApp.tasks={‘Celery_RobotQA.deleteTmpAudioFile’: <@task: Celery_RobotQA.deleteTmpAudioFile of Celery_RobotQA at 0x1055f31d0>, ‘celery.chord_unlock’: <@task: celery.chord_unlock of Celery_RobotQA at 0x1055f31d0>, ‘celery.group’: <@task: celery.group of Celery_RobotQA at 0x1055f31d0>, ‘celery.map’: <@task: celery.map of Celery_RobotQA at 0x1055f31d0>, ‘celery.chain’: <@task: celery.chain of Celery_RobotQA at 0x1055f31d0>, ‘celery.backend_cleanup’: <@task: celery.backend_cleanup of Celery_RobotQA at 0x1055f31d0>, ‘celery.starmap’: <@task: celery.starmap of Celery_RobotQA at 0x1055f31d0>, ‘celery.chord’: <@task: celery.chord of Celery_RobotQA at 0x1055f31d0>, ‘celery.accumulate’: <@task: celery.accumulate of Celery_RobotQA at 0x1055f31d0>, ‘celery.chunks’: <@task: celery.chunks of Celery_RobotQA at 0x1055f31d0>}

[tasks]

  . Celery_RobotQA.deleteTmpAudioFile

调用apply_async后:

[2018-05-14 15:14:11,092: INFO/MainProcess] Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665]  ETA:[2018-05-14 07:14:30.839388+00:00]

[2018-05-14 15:14:11,093: DEBUG/MainProcess] basic.qos: prefetch_count->17

[2018-05-14 15:14:30,846: DEBUG/MainProcess] TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:(‘Celery_RobotQA.deleteTmpAudioFile’, ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, {‘lang’: ‘py’, ‘task’: ‘Celery_RobotQA.deleteTmpAudioFile’, ‘id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘eta’: ‘2018-05-14T07:14:30.839388+00:00’, ‘expires’: None, ‘group’: None, ‘retries’: 0, ‘timelimit’: [None, None], ‘root_id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘parent_id’: None, ‘argsrepr’: "[‘384541cb-2132-4d60-a42f-f693593b7921.mp3’]", ‘kwargsrepr’: ‘{}’, ‘origin’: ‘gen62343@licrifandeMacBook-Pro.local‘, ‘reply_to’: ‘e33e7319-ad76-33a4-8c48-4ba2121138c0’, ‘correlation_id’: ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’, ‘delivery_info’: {‘exchange’: ”, ‘routing_key’: ‘celery’, ‘priority’: 0, ‘redelivered’: None}}, b'[["384541cb-2132-4d60-a42f-f693593b7921.mp3"], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]’, ‘application/json’, ‘utf-8’) kwargs:{})

[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16

[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326

[2018-05-14 15:14:31,433: WARNING/ForkPoolWorker-1] [2018-05-14 15:14:31,431] INFO in app: deleteTmpAudioFile: filename=384541cb-2132-4d60-a42f-f693593b7921.mp3

。。。

[2018-05-14 15:14:31,446: INFO/ForkPoolWorker-1] Task Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] succeeded in 0.014525277016218752s: None

注:

不论那种celery都是用:

celery worker -A app.celeryApp –loglevel=DEBUG

去运行。

最终结论是:

为了简单方便,还是用第一种方法:

flask的app.py中用:

from flask import Flask

from celery import Celery

app = Flask(__name__)

celeryApp = Celery(app.name, broker=app.config[‘CELERY_BROKER_URL’])

celeryApp.conf.update(app.config)

@celeryApp.task

def deleteTmpAudioFile(filename):

    。。。

# call task

deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete)

即可。

然后在Flask的app运行之前,确保先运行celery的worker

celery worker -A app.celeryApp

想要看到更新信息,比如tasks的列表,其中包含此处的deleteTmpAudioFile的全名:

app.deleteTmpAudioFile

可以加上debug参数:

celery worker -A app.celeryApp –loglevel=DEBUG

然后代码运行了

deleteTmpAudioFile.apply_async

后,celery的log会看到:

Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665]  ETA:[2018-05-14 07:14:30.839388+00:00]

过了指定的延期时间后,会收到任务并执行:

TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:(‘Celery_RobotQA.deleteTmpAudioFile’, ‘9845ee03-35e5-4fd1-b641-f2f77fd47665’,  …

[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16

[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326

即可正常在Flask中执行了。

但是此处貌似有个缺点:

celery放在了flask中,导致每次启动celery的worker时,同时启动了flask的app。。。

不过暂时就这么着吧,有空再优化。

转载请注明:在路上 » 【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
99 queries in 0.209 seconds, using 23.47MB memory