折腾:
后,接着去把本地测试通过的celery的task集成到Flask的app中。
先去防止Flask的app名字和celery的app名字冲突,改名为celeryApp:
celery_task.py
<code># celery_task.py
from celery import Celery
import os
# celeryApp = Celery('tasks', broker='redis://localhost//')
celeryApp = Celery('tasks', broker='redis://localhost')
# @celeryApp.task()
# def add(x, y):
#    return x + y
@celeryApp.task()
def deleteTmpAudioFile(filename):
    """
        delete tmp audio file from filename
            eg: 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
    """
    # audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    print("deleteTmpAudioFile: filename=%s", filename)
    audioTmpFolder = "tmp/audio"
    print("audioTmpFolder=%s" % audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/crifan/xx/server'
    print("curFolderAbsPath=%s" % curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    print("audioTmpFolderFullPath=%s" % audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/xxxserver/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        print("Ok to delete file %s" % tempAudioFullname)
    else:
        print("No need to remove for not exist file %s" % tempAudioFullname)
</code>然后想办法合并到Flask的app中
celery in flask
此处celery是4.0了:
<code>➜ server celery --version 4.1.0 (latentcall) </code>
所以:
ask/flask-celery: Celery integration for Flask (SINCE CELERY 3.0 THIS IS NO LONGER NEEDED)
都不需要了。
在 Flask 中使用 Celery — using celery with flask 1.00 documentation
看起来是:
把celery的代码,都合并到Flask的app中了:
就不存在我此处担心的:
考虑如何把flask的app.py和celery_task.py如何合并呢
因为现在存在循环调用
flask的app.py要import celery_task
而celery_task要import flask的app
还不知道如何解决呢
基于 Celery 的后台任务 — Flask 0.10.1 文档
好像还要搞个:make_celery?好麻烦
Celery Background Tasks — Flask 1.0.2 documentation
Celery Based Background Tasks — Flask 0.12.4 documentation
“Celery和Flask一起使用并没有什么不和谐的地方,都可以不用定制的Flask扩展,按照网上随处可见的示例也很简单
然而,稍微上点规模的Flask应用都会使用Factory模式,即只有在创建Flask实例时,才会初始化各种扩展,这样可以动态的修改扩展程序的配置。比如你有一套线上部署的配置和一套本地开发测试的配置,希望通过不同的启动入口,就使用不同的配置。”
里面解释的很复杂,很深入。
暂时觉得没必要这么深入研究。
所以还是尽量简单点吧
抽空再去研究:
Flask的app的factory 工厂模式
算了,就是简单的,把celery的代码,合并到flask的app中吧
结果运行出错:
【已解决】合并Celery的task到Flask后任务运行出错:ERROR/MainProcess Received unregistered task of type
【总结】
最终是把celery集成到了flask中:
flask的app.py
<code>from flask import Flask from celery import Celery app = Flask(__name__) celeryApp = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) celeryApp.conf.update(app.config) @celeryApp.task def deleteTmpAudioFile(filename): 。。。 # call task deleteTmpAudioFile.apply_async([tempFilename], countdown=delayTimeToDelete) </code>
flask的config.py
<code># CELERY_TASK_NAME = "Celery_" + FLASK_APP_NAME # CELERY_BROKER_URL = "redis://localhost" CELERY_BROKER_URL = "redis://localhost:6379/0" # CELERY_RESULT_BACKEND = "redis://localhost:6379/0" # current not use result CELERY_DELETE_TMP_AUDIO_FILE_DELAY = 20 </code>
然后在Flask的app运行之前,确保先运行celery的worker
<code>celery worker -A app.celeryApp </code>
想要看到更新信息,比如tasks的列表,其中包含此处的deleteTmpAudioFile的全名:
app.deleteTmpAudioFile
可以加上debug参数:
<code>celery worker -A app.celeryApp --loglevel=DEBUG </code>
然后代码运行了
deleteTmpAudioFile.apply_async
后,celery的log会看到:
Received task: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] ETA:[2018-05-14 07:14:30.839388+00:00]
过了指定的延期时间后,会收到任务并执行:
<code>TaskPool: Apply <function _fast_trace_task at 0x106ef0b70> (args:('Celery_RobotQA.deleteTmpAudioFile', '9845ee03-35e5-4fd1-b641-f2f77fd47665',  ...
[2018-05-14 15:14:31,428: DEBUG/MainProcess] basic.qos: prefetch_count->16
[2018-05-14 15:14:31,431: DEBUG/MainProcess] Task accepted: Celery_RobotQA.deleteTmpAudioFile[9845ee03-35e5-4fd1-b641-f2f77fd47665] pid:62326
...
</code>即可正常在Flask中执行了。
但是此处貌似有个缺点:
celery放在了flask中,导致每次启动celery的worker时,同时启动了flask的app。。。
不过暂时就这么着吧,有空再优化。
转载请注明:在路上 » 【已解决】把celery的task集成到Flask的app中