最新消息:20210917 已从crifan.com换到crifan.org

【未解决】Flask中的RQ不能同时调度运行多个后台任务

Flask crifan 2065浏览 0评论

Flask中,用RQ去运行后台任务,已有代码:

@rq.job
def updateTaskErrandorLocation():
    runningTaskIdListKey = genRunningTaskIdListKey()
    gLog.debug(“runningTaskIdListKey=%s”, runningTaskIdListKey)
updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule(
    timedelta(seconds=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS), repeat=0)
gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob)
@rq.job
def updateUserStatus():
    global gOnlineUserIdList
    # for debug
    gLog.debug(“gOnlineUserIdList=%s”, gOnlineUserIdList)
    curDatetime = datetime.now()
    gLog.debug(“curDatetime=%s”, curDatetime)
updateUserStatusJob = updateUserStatus.schedule(
    timedelta(seconds=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS), repeat=0)
gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob)

即:

每隔几秒就去调度运行rq的job

问题现象是:

哪个rq的job放在前面,哪个job就可以被执行:

[2016-12-19 17:36:17,135 DEBUG BackgroundWorker.py:84 <module>] updateTaskErrandorLocationJob=<Job 48147d2c-51bb-4622-8e66-585089caad89: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>
[2016-12-19 17:36:17,137 DEBUG BackgroundWorker.py:148 <module>] updateUserStatusJob=<Job cd35db06-3bf9-4ea2-9650-c26652fe9452: runningfast.resources.BackgroundWorker.updateUserStatus()>
[2016-12-19 17:36:17,137 DEBUG BackgroundWorker.py:48 updateTaskErrandorLocation] runningTaskIdListKey=develop|staging|RunningTaskIdList
[2016-12-19 17:36:17,139 DEBUG BackgroundWorker.py:50 updateTaskErrandorLocation] type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([])

以及:

[2016-12-19 17:32:06,774 DEBUG BackgroundWorker.py:102 <module>] updateUserStatusJob=<Job ad8eff3c-1be4-429d-8a54-d8d5f7f130b2: runningfast.resources.BackgroundWorker.updateUserStatus()>
[2016-12-19 17:32:06,775 DEBUG BackgroundWorker.py:145 <module>] updateTaskErrandorLocationJob=<Job 830334e6-199d-4a28-99ce-9aea020b6a88: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>
[2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:47 updateUserStatus] gOnlineUserIdList=[]
[2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:50 updateUserStatus] curDatetime=2016-12-19 17:32:06.776366
[2016-12-19 17:32:06,776 DEBUG BackgroundWorker.py:87 updateUserStatus] after remove offline: gOnlineUserIdList=[]

但两个job无法同时被执行

所以要去搞清楚:

Flask中的RQ,如何支持同时schedule多个后台的任务

flask rq schedule multiple job

flask rq2 schedule Repeated  job

GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)

 去试试把:

repeat的值从0改为None

但是好像没有变化。

不过,通过log中看到的:

<div–<——————————————————————————
20:48:45 default: Job OK (8736f59d-63e3-4854-8409-715055e295a8)
20:48:45 Result is kept for 500 seconds
20:48:45
20:48:45 *** Listening on default…
20:48:45 default: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation() (9b6b8062-a7c5-4db2-897f-8a7bba3d7792)

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:113]:
redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:119]:
app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516b290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc16b7c210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516b6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ba50>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:141]:
API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]:
updateTaskErrandorLocationJob=<Job a5df8b7c-1895-4339-a741-b0ac1ce148ea: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]:
updateUserStatusJob=<Job 02adbbb3-5081-494b-8ee6-7e7d557ceceb: runningfast.resources.BackgroundWorker.updateUserStatus()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:48]:
runningTaskIdListKey=develop|staging|RunningTaskIdList

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:50]:
type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([])

<div–<——————————————————————————

20:48:46 default: Job OK (9b6b8062-a7c5-4db2-897f-8a7bba3d7792)
20:48:46 Result is kept for 500 seconds
20:48:46
20:48:46 *** Listening on default…
20:48:46 default: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation() (ae3973d0-af0c-405d-b5e7-172c9f2f2386)

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:113]:
redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:119]:
app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516b290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc16b7c210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516b6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ba50>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:141]:
API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]:
updateTaskErrandorLocationJob=<Job 3d9bc26c-e26f-4b91-9a17-dea5c78a8020: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]:
updateUserStatusJob=<Job 13c10fd2-7cec-4dfc-b066-7f5d62fa844c: runningfast.resources.BackgroundWorker.updateUserStatus()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:48]:
runningTaskIdListKey=develop|staging|RunningTaskIdList

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:50]:
type(runningTaskIdList)=<type ‘set’>, runningTaskIdList=set([])

<div–<——————————————————————————

20:48:48 default: Job OK (ae3973d0-af0c-405d-b5e7-172c9f2f2386)
20:48:48 Result is kept for 500 seconds
20:48:48
20:48:48 *** Listening on default…
20:48:48 default: runningfast.resources.BackgroundWorker.updateUserStatus() (49151c04-987b-43f9-9926-01e64e2b6312)

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:113]:
redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:119]:
app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7fcc1516c290>, redis_store=<flask_redis.FlaskRedis object at 0x7fcc168fb210>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7fcc1516c6d0>, sockets=<flask_sockets.Sockets object at 0x7fcc1516ca50>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [./runningfast/app.py:141]:
API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:87]:
updateTaskErrandorLocationJob=<Job 35f043c2-7c25-4dac-841a-f9da2dfe9156: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:153]:
updateUserStatusJob=<Job 7c8531c9-c9f6-4c95-b8bf-fdae0ced6bff: runningfast.resources.BackgroundWorker.updateUserStatus()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:96]:
gOnlineUserIdList=[]

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:99]:
curDatetime=2016-12-19 20:48:49.635094

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:136]:
after remove offline: gOnlineUserIdList=[]

<div–<——————————————————————————

20:48:49 default: Job OK (49151c04-987b-43f9-9926-01e64e2b6312)
20:48:49 Result is kept for 500 seconds
20:48:49
20:48:49 *** Listening on default…
20:48:49 default: runningfast.resources.BackgroundWorker.updateUserStatus() (e328deb9-e534-43c6-b397-3ad857b19306)

感觉是:

其实,两个job,都已经在后台运行了:

所以才有:

runningfast.resources.BackgroundWorker.updateUserStatus()

runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()

但是:

updateUserStatus

updateTaskErrandorLocation

两个job的运行间隔时间,都不太对:

不是我自己设置的10秒和5秒,

感觉是每个1秒,3秒左右就执行了

后来看到自己代码是:

updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule(timedelta(seconds=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS),
    #repeat=0)
    repeat=None)
updateUserStatusJob = updateUserStatus.schedule(
    timedelta(seconds=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS),
    #repeat=0)
    repeat=None)

好像是:

自己参数传递错了?

应该加上:

GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)

中的interval的

而:

timedelta(xxx)

只是:

在多长时间之后运行,不是间隔的时间。

所以去改为:

updateUserStatusJob = updateUserStatus.schedule(
    datetime.utcnow(),
    interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS,
    #repeat=0)
    repeat=None)

但是好像结果还是:

每隔3秒 就执行一次

-》

感觉是:

每次都运行了代码:

updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule(
    datetime.utcnow(),
    interval=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS,
    #repeat=0)
    repeat=None)
gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob)
# schedule periodic job to update user status
updateUserStatusJob = updateUserStatus.schedule(
    datetime.utcnow(),
    interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS,
    #repeat=0)
    repeat=None)
gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob)

每次都是重新创建任务

然后等待被调度时就去执行。

-》所以一直都是下一秒或下几秒。

-》所以要去把这段代码,放到别的,初始化的地方。

结果把:

def initUpdateTaskErrandorLocationJob():
    # schedule periodic job to update running task errandor location
    updateTaskErrandorLocationJob = updateTaskErrandorLocation.schedule(
        datetime.utcnow(),
        interval=BACKGROUND_UPDATE_RUNNING_TASK_ERRANDOR_LOCATION_INTERVAL_SECONDS,
        #repeat=0)
        repeat=None)
    gLog.debug(“updateTaskErrandorLocationJob=%s”, updateTaskErrandorLocationJob)
def initUpdateUserStatusJob():
    # schedule periodic job to update user status
    updateUserStatusJob = updateUserStatus.schedule(
        datetime.utcnow(),
        interval=BACKGROUND_UPDATE_USER_STATUS_INTERVAL_SECONDS,
        #repeat=0)
        repeat=None)
    gLog.debug(“updateUserStatusJob=%s”, updateUserStatusJob)
def initBackgroundWorker():
    initUpdateUserStatusJob()
    initUpdateTaskErrandorLocationJob()

/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/app.py

# from runningfast.resources.BackgroundWorker import *
from runningfast.resources.BackgroundWorker import initBackgroundWorker
from runningfast.resources.Image import ImageAPI, ImageUploadUrl
# Background worker
initBackgroundWorker()

结果第一次初始化没问题:

[2016-12-19 21:32:56 +0000] [7652] [INFO] Booting worker with pid: 7652

<div–<——————————————————————————

DEBUG in app [/root/RunningFast/staging/runningfast/app.py:113]:
redisConnection=Redis<ConnectionPool<Connection<host=localhost,port=6379,db=0>>>, queue=<Queue u’default’>, pushConnResult=None

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [/root/RunningFast/staging/runningfast/app.py:119]:
app=<Flask ‘runningfast.app’>, server_port=21085, api=<flask_restful.Api object at 0x7f1b3b1cabd0>, redis_store=<flask_redis.FlaskRedis object at 0x7f1b3c3690d0>, db=<SQLAlchemy engine=’mysql://runningfast:Jiandao123@localhost/runningfast_dev’>, server_mode=staging, server_type=develop, rq=<flask_rq2.app.RQ object at 0x7f1b3b1cafd0>, sockets=<flask_sockets.Sockets object at 0x7f1b3b164390>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [/root/RunningFast/staging/runningfast/resources/BackgroundWorker.py:155]:
updateUserStatusJob=<Job 07d01286-af4e-42b8-bf89-b0e69af5eede: runningfast.resources.BackgroundWorker.updateUserStatus()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in BackgroundWorker [/root/RunningFast/staging/runningfast/resources/BackgroundWorker.py:90]:
updateTaskErrandorLocationJob=<Job 5049efa4-a9eb-4cf8-8784-5988e139b9af: runningfast.resources.BackgroundWorker.updateTaskErrandorLocation()>

<div–<——————————————————————————

<div–<——————————————————————————

DEBUG in app [/root/RunningFast/staging/runningfast/app.py:144]:
API_VERSION=1.0, API_URL_PREFIX=/runningfast/api/v1.0, OPEN_API_URL_PREFIX=/runningfast/api/v1.0/open

<div–<——————————————————————————

但是后台执行任务时再去调用就出错了:

/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/logs/rq_worker.log

Traceback (most recent call last):
  File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/worker.py”, line 588, in perform_job
    rv = job.perform()
  File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/job.py”, line 498, in perform
    self._result = self.func(*self.args, **self.kwargs)
  File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/job.py”, line 206, in func
    return import_attribute(self.func_name)
  File “/root/Envs/RunningFast/lib/python2.7/site-packages/rq/utils.py”, line 150, in import_attribute
    module = importlib.import_module(module_name)
  File “/usr/local/lib/python2.7/importlib/__init__.py”, line 37, in import_module
    __import__(name)
  File “./runningfast/resources/BackgroundWorker.py”, line 7, in <module>
    from runningfast.app import gLog
  File “./runningfast/app.py”, line 132, in <module>
    from runningfast.resources.BackgroundWorker import initBackgroundWorker
ImportError: cannot import name initBackgroundWorker
21:33:13 Moving job to u’failed’ queue

flask ImportError: cannot import name

flask rq2 job ImportError: cannot import name

后来参考:

GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)

去加上:

/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/app.py

scheduler = rq.get_scheduler()

然后再去:

/Users/crifan/dev/dev_root/daryun/Projects/RunningFast/sourcecode/RunningFast-Server/runningfast/resources/BackgroundWorker.py

curAllJobs = scheduler.get_jobs()
gLog.debug(“curAllJobs=%s”, curAllJobs)
initBackgroundWorker()

然后打印出的jobs列表,太多,且无法区分是不是这两个任务:

<div–<——————————————————————————
DEBUG in BackgroundWorker [./runningfast/resources/BackgroundWorker.py:163]:
curAllJobs=[Job(u’ff8f0a71-d93d-473d-bcb2-83459eec3e85′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’05f153d0-ed71-477c-8d55-6f3e0df666b6′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’186dc290-080c-4182-ba82-ee1be2944f5a’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’3853d022-d5f7-4365-b3b4-fe92cd62f46c’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’3bf599b0-4d20-418b-92e7-b6a4f64b675d’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’462cdcf1-a7b7-49b2-89b2-6369999dc664′, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job(u’465c41f6-bdbc-4752-ba45-356464f37fbf’, enqueued_at=datetime.datetime(2016, 12, 20, 2, 5, 45)), Job
。。。。

GitHub – ui/rq-scheduler: A light library that adds job scheduling capabilities to RQ (Redis Queue)

提到了rq的job的实例和id

但是没找到说明

参考:

GitHub – jezdez/Flask-RQ2: A Flask extension for RQ.

-》

RQ: Documentation

看到是job.id

RQ: Documentation

(创建队列Queue时)说是允许设置job的id

不过queue和job不是等价的?

但是对于此处的

@rq.job

如何设置job的id?

找了半天,看到源码:

rq/job.py at master · nvie/rq · GitHub

# Job construction @classmethod def create(cls, func, args=None, kwargs=None, connection=None, result_ttl=None, ttl=None, status=None, description=None, depends_on=None, timeout=None, id=None, origin=None, meta=None): “””Creates a new Job instance for the given function, arguments, and        keyword arguments.

好像是可以通过:

@rq.job(id=xxx)

去设置的。

也找到了job的其他的属性(参数)了。

结果出错:

  File “./runningfast/resources/BackgroundWorker.py”, line 46, in <module>
    @rq.job(id=”updateTaskErrandorLocation”)
TypeError: job() got an unexpected keyword argument ‘id’

-》貌似job的id是不可以设置的。

转载请注明:在路上 » 【未解决】Flask中的RQ不能同时调度运行多个后台任务

发表我的评论
取消评论

表情

Hi,您需要填写昵称和邮箱!

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
88 queries in 0.187 seconds, using 20.23MB memory