最新消息:20210917 已从crifan.com换到crifan.org

【已解决】Mac本地用Celery实现延时执行任务

celery crifan 2056浏览 0评论

折腾:

【已解决】Flask中如何保存临时文件且可以指定有效期

期间,先去Mac本地的Flask中用Celery(加redis)实现延期执行任务。

继续参考:

Calling Tasks — Celery 4.1.0 documentation

异步任务神器 Celery | FunHacks

和api文档:

celery.app.task — Celery 4.1.0 documentation

参考:

在 Flask 中使用 Celery — using celery with flask 1.00 documentation

就可以去写代码实现了。

python celery redis

First Steps with Celery — Celery 4.1.0 documentation

Using Redis — Celery 4.1.0 documentation

Celery 和 Redis 入门 – OneAPM 博客

Celery 使用简介 – Jason’s blog

去安装

<code>➜  server pipenv install "celery[redis]"
Installing celery[redis]…
Looking in indexes: https://pypi.python.org/simple
Collecting celery[redis]
  Downloading https://files.pythonhosted.org/packages/22/9b/88ef5cc7edf5d43215f383ae0a2b1cdeb33f5f07886386c7e4691b2eba0c/celery-4.1.0-py2.py3-none-any.whl (400kB)
Requirement already satisfied: pytz&gt;dev in /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/lib/python3.6/site-packages (from celery[redis]) (2018.4)
Collecting kombu&lt;5.0,&gt;=4.0.2 (from celery[redis])
  Downloading https://files.pythonhosted.org/packages/62/a4/5d16954803224a1e451713293c2a028614099f5538cf626e1fdd7b438c86/kombu-4.1.0-py2.py3-none-any.whl (181kB)
Collecting billiard&lt;3.6.0,&gt;=3.5.0.2 (from celery[redis])
  Downloading https://files.pythonhosted.org/packages/82/55/76f4e786141b7174926cdffa7a155aeea316b729118fb48ec548f3c6754f/billiard-3.5.0.3-py3-none-any.whl (89kB)
Collecting redis&gt;=2.10.5; extra == "redis" (from celery[redis])
  Downloading https://files.pythonhosted.org/packages/3b/f6/7a76333cf0b9251ecf49efff635015171843d9b977e4ffcf59f9c4428052/redis-2.10.6-py2.py3-none-any.whl (64kB)
Collecting amqp&lt;3.0,&gt;=2.1.4 (from kombu&lt;5.0,&gt;=4.0.2-&gt;celery[redis])
  Downloading https://files.pythonhosted.org/packages/88/4a/8c45a882d842678963516ebd9cf584a4ded51af719234c3b696c2e884c60/amqp-2.2.2-py2.py3-none-any.whl (48kB)
Collecting vine&gt;=1.1.3 (from amqp&lt;3.0,&gt;=2.1.4-&gt;kombu&lt;5.0,&gt;=4.0.2-&gt;celery[redis])
  Downloading https://files.pythonhosted.org/packages/10/50/5b1ebe42843c19f35edb15022ecae339fbec6db5b241a7a13c924dabf2a3/vine-1.1.4-py2.py3-none-any.whl
Installing collected packages: vine, amqp, kombu, billiard, redis, celery
Successfully installed amqp-2.2.2 billiard-3.5.0.3 celery-4.1.0 kombu-4.1.0 redis-2.10.6 vine-1.1.4

Adding celery[redis] to Pipfile's [packages]…
Pipfile.lock (b165df) out of date, updating to (132fcc)…
Locking [dev-packages] dependencies…
Locking [packages] dependencies…
Updated Pipfile.lock (132fcc)!
Installing dependencies from Pipfile.lock (132fcc)…
  🐍   ▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉▉ 29/29 — 00
➜  server pipenv graph
celery==4.1.0
  - billiard [required: &gt;=3.5.0.2,&lt;3.6.0, installed: 3.5.0.3]
  - kombu [required: &lt;5.0,&gt;=4.0.2, installed: 4.1.0]
    - amqp [required: &gt;=2.1.4,&lt;3.0, installed: 2.2.2]
      - vine [required: &gt;=1.1.3, installed: 1.1.4]
  - pytz [required: &gt;dev, installed: 2018.4]
Flask-Cors==3.0.4
  - Flask [required: &gt;=0.9, installed: 1.0.2]
    - click [required: &gt;=5.1, installed: 6.7]
    - itsdangerous [required: &gt;=0.24, installed: 0.24]
    - Jinja2 [required: &gt;=2.10, installed: 2.10]
      - MarkupSafe [required: &gt;=0.23, installed: 1.0]
    - Werkzeug [required: &gt;=0.14, installed: 0.14.1]
  - Six [required: Any, installed: 1.11.0]
Flask-PyMongo==0.5.1
  - Flask [required: &gt;=0.8, installed: 1.0.2]
    - click [required: &gt;=5.1, installed: 6.7]
    - itsdangerous [required: &gt;=0.24, installed: 0.24]
    - Jinja2 [required: &gt;=2.10, installed: 2.10]
      - MarkupSafe [required: &gt;=0.23, installed: 1.0]
    - Werkzeug [required: &gt;=0.14, installed: 0.14.1]
  - PyMongo [required: &gt;=2.5, installed: 3.6.1]
Flask-RESTful==0.3.6
  - aniso8601 [required: &gt;=0.82, installed: 3.0.0]
  - Flask [required: &gt;=0.8, installed: 1.0.2]
    - click [required: &gt;=5.1, installed: 6.7]
    - itsdangerous [required: &gt;=0.24, installed: 0.24]
    - Jinja2 [required: &gt;=2.10, installed: 2.10]
      - MarkupSafe [required: &gt;=0.23, installed: 1.0]
    - Werkzeug [required: &gt;=0.14, installed: 0.14.1]
  - pytz [required: Any, installed: 2018.4]
  - six [required: &gt;=1.3.0, installed: 1.11.0]
gunicorn==19.8.1
openpyxl==2.5.3
  - et-xmlfile [required: Any, installed: 1.0.1]
  - jdcal [required: Any, installed: 1.4]
PyMySQL==0.8.1
redis==2.10.6
requests==2.18.4
  - certifi [required: &gt;=2017.4.17, installed: 2018.4.16]
  - chardet [required: &gt;=3.0.2,&lt;3.1.0, installed: 3.0.4]
  - idna [required: &gt;=2.5,&lt;2.7, installed: 2.6]
  - urllib3 [required: &gt;=1.21.1,&lt;1.23, installed: 1.22]
</code>

可以看到:

安装了Celery和Redis

然后去试试

结果出错:

【已解决】运行Celery出错:consumer Cannot connect to redis Error 61

然后继续测试延迟运行任务的效果

先看看官网demo是正常的:

<code>➜  ~ /Users/crifan/dev/dev_root/company/naturling/projects/robotDemo/server
➜  server ll
total 176
-rw-r--r--  1 crifan  staff   303B  5 11 17:22 Pipfile
-rw-r--r--  1 crifan  staff    12K  5 11 17:24 Pipfile.lock
-rw-r--r--  1 crifan  staff   2.3K  5 10 15:22 README.md
drwxr-xr-x  3 crifan  staff    96B  4 24 15:12 __pycache__
drwxr-xr-x  7 crifan  staff   224B  5 10 15:15 ai
-rw-r--r--  1 crifan  staff    28K  5 11 16:56 app.py
-rw-r--r--  1 crifan  staff   191B  5 11 17:39 celery_task.py
-rw-r--r--  1 crifan  staff   1.3K  5 11 14:30 config.py
-rw-r--r--  1 crifan  staff    92B  5 11 17:49 dump.rdb
-rw-r--r--  1 crifan  staff   2.0K  4 24 16:36 gunicorn_config.py
drwxr-xr-x  8 crifan  staff   256B  4 26 14:56 logs
-rw-r--r--  1 crifan  staff    10K  4 24 17:00 supervisord_local.conf
-rw-r--r--  1 crifan  staff   592B  4 24 16:07 supervisord_server.conf
drwxr-xr-x  4 crifan  staff   128B  5 11 16:52 tmp
➜  server which python
/usr/bin/python
➜  server pipenv shell
Spawning environment shell (/bin/zsh). Use 'exit' to leave.
. /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/activate
➜  server . /Users/crifan/.local/share/virtualenvs/server-9an_1rEM/bin/activate
➜  server python
Python 3.6.4 (default, Mar 22 2018, 13:54:22)
[GCC 4.2.1 Compatible Apple LLVM 9.0.0 (clang-900.0.39.2)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
&gt;&gt;&gt; from celery_task import add
&gt;&gt;&gt; add.delay(4, 5)
&lt;AsyncResult: 9c8caf7a-e571-4f74-9049-cf5076b9a6ee&gt;
&gt;&gt;&gt;
</code>

 

结果出错:

【已解决】celery调用task的apply_async传递参数出错:TypeError takes 1 positional argument but 40 were given

 然后发现是可以延迟10秒后运行的:

其中一个终端:

<code>&gt;&gt;&gt; deleteTmpAudioFile.apply_async(("98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3",), countdown=10)
&lt;AsyncResult: bd89951c-2078-49af-b8ea-8c9d8d5b121a&gt;
</code>

另外运行celery的终端:

<code>[2018-05-11 18:33:18,769: WARNING/ForkPoolWorker-2] deleteTmpAudioFile: filename=%s
[2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
[2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] audioTmpFolder=tmp/audio
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] curFolderAbsPath=/Users/crifan/dev/xx/server
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] audioTmpFolderFullPath=/Users/crifan/dev/xx/robotDemo/server/tmp/audio
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] No need to remove for not exist file /Users/crifan/dev/dev_root/xx/server/tmp/audio/98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
</code>

注:

之前一次测试,是删除了该文件的:

至此,算是从逻辑上和测试代码上,实现了延迟10秒去删除文件。

后续就是整合到Flask代码中了。

【总结】

至此,已经实现了Mac本地中,在Flask中集成了Celery+Reids实现异步任务,去删除临时文件。

步骤是:

安装celery,其中此处用的是redis

<code>pipenv install "celery[redis]"
</code>

然后运行celery之前,要先去安装和运行redis:

Mac中安装redis

<code>brew install redis
</code>

然后运行redis服务:

<code>redis-server
</code>

然后写了个celery的task,保存为:

celery_task.py

<code>
# celery_task.py
from celery import Celery
import os

# app = Celery('tasks', broker='redis://localhost//')
app = Celery('tasks', broker='redis://localhost')

@app.task()
def add(x, y):
   return x + y

@app.task()
def deleteTmpAudioFile(filename):
    # audioTmpFolder = app.config["AUDIO_TEMP_FOLDER"]
    print("deleteTmpAudioFile: filename=%s", filename)
    audioTmpFolder = "tmp/audio"
    print("audioTmpFolder=%s" % audioTmpFolder)
    curFolderAbsPath = os.getcwd() #'/Users/xx/server'
    print("curFolderAbsPath=%s" % curFolderAbsPath)
    audioTmpFolderFullPath = os.path.join(curFolderAbsPath, audioTmpFolder)
    print("audioTmpFolderFullPath=%s" % audioTmpFolderFullPath)
    tempAudioFullname = os.path.join(audioTmpFolderFullPath, filename)
    #'/Users/crifan/xx/server/tmp/audio/2aba73d1-f8d0-4302-9dd3-d1dbfad44458.mp3'
    if os.path.isfile(tempAudioFullname):
        os.remove(tempAudioFullname)
        print("Ok to delete file %s" % tempAudioFullname)
    else:
        print("No need to remove for not exist file %s" % tempAudioFullname)
</code>

再去运行celery服务:

<code>celery -A celery_task  worker
</code>

然后再去调用即可:

此处是python在终端下的调用:

<code>python
&gt;&gt;&gt; import celery_task
&gt;&gt;&gt; from celery_task import deleteTmpAudioFile
&gt;&gt;&gt; deleteTmpAudioFile.apply_async(("98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3",), countdown=10)
&lt;AsyncResult: bd89951c-2078-49af-b8ea-8c9d8d5b121a&gt;
</code>

然后过了指定延期的10秒后,即可调用到:celery_task中的deleteTmpAudioFile

然后celery的服务终端即可看到log:

<code>[2018-05-11 18:33:18,769: WARNING/ForkPoolWorker-2] deleteTmpAudioFile: filename=%s
[2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] 98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
[2018-05-11 18:33:18,771: WARNING/ForkPoolWorker-2] audioTmpFolder=tmp/audio
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] curFolderAbsPath=/Users/crifan/dev/dev_root/xx/server
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] audioTmpFolderFullPath=/Users/crifan/dev/xx/server/tmp/audio
[2018-05-11 18:33:18,772: WARNING/ForkPoolWorker-2] No need to remove for not exist file /Users/crifan/dev/dev_root/xx/tmp/audio/98fc7c46-7aa0-4dd7-aa9d-89fdf516abd6.mp3
</code>

如此即可。

转载请注明:在路上 » 【已解决】Mac本地用Celery实现延时执行任务

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
93 queries in 0.184 seconds, using 23.35MB memory