折腾:
【未解决】Flask部署到线上生产环境后多实例多线程中无法共享全局变量
期间,需要去想办法用最合适的方式去实现Python中的singleton单例
之前别人是参考:
python – Is there a simple, elegant way to define singletons? – Stack Overflow
最后用的:
<code>class Singleton(type):
"""
reference: https://stackoverflow.com/questions/31875/is-there-a-simple-elegant-way-to-define-singletons
"""
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls._instances[cls] = super(
Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
</code>自己此处再去研究看看:
python singleton
python singleton装饰器
The Singleton — Python 3 Patterns, Recipes and Idioms
Creating a singleton in Python – Stack Overflow
而从:
python – Why is this singleton implementation “not thread safe”? – Stack Overflow
才知道,原来上述那些singleton的实现方式,都不是线程安全的thread safe
-》对于多个线程来说,最终还不能保证一定是单实例的
-》所以要去实现 线程安全 thread-safety的单例Singleton
multithreading – Are Python instance variables thread-safe? – Stack Overflow
multithreading – What operations in Java are considered atomic? – Stack Overflow
What are some common uses for Python decorators? – Stack Overflow
然后目前用代码:
common/ThreadSafeSingleton.py
<code>import functools
import threading
thread_lock = threading.Lock()
print("ThreadSafeSingleton: thread_lock=%s" % thread_lock)
def synchronized(lock):
""" Synchronization decorator """
def wrapper(f):
print("synchronized: wrapper: f=%s, lock=%s" % (f, lock))
@functools.wraps(f)
def inner_wrapper(*args, **kw):
print("functools.wraps: args=%s, kw=%s" % (args, kw))
with lock:
return f(*args, **kw)
print("inner_wrapper%s" % inner_wrapper)
return inner_wrapper
return wrapper
# class Singleton(type):
class ThreadSafeSingleton(type):
_instances = {}
@synchronized(thread_lock)
def __call__(cls, *args, **kwargs):
print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs))
print("cls._instances=%s" % cls._instances)
if cls not in cls._instances:
# cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
print("after added _instances: cls._instances=%s" % cls._instances)
return cls._instances[cls]
</code>resources/tasks.py
<code>from resources.extensions_celery import celery, celery_logger as log
@celery.task
# def celeryRefreshAzureSpeechToken():
def refreshAzureSpeechToken(isSyncToSingleton=True):
"""celery's task: refresh microsoft azure speech token key for later call tts/ASR api"""
log.info("celeryRefreshAzureSpeechToken: celery=%s", celery)
# global gMsTtsTokenSingleton
updatedToken = None
getMsTokenUrl = settings.MS_GET_TOKEN_URL
reqHeaders = {
"Ocp-Apim-Subscription-Key": settings.MS_TTS_SECRET_KEY
}
log.info("getMsTokenUrl=%s, reqHeaders=%s", getMsTokenUrl, reqHeaders)
resp = requests.post(getMsTokenUrl, headers=reqHeaders)
log.info("resp=%s", resp)
if resp.ok:
respTokenText = resp.text # eyxxxxiJ9.xxx.xxx
log.info("respTokenText=%s", respTokenText)
# gMsToken = respTokenText
updatedToken = respTokenText
else:
log.error("get ms tts token failed for: reason=%s, text=%s", resp.reason, resp.text)
# log.info("after refresh: gMsToken=%s", gMsToken)
log.info("updatedToken=%s", updatedToken)
if isSyncToSingleton:
log.info("Sync new token to global singleton for ms token: %s", updatedToken)
# gMsTtsTokenSingleton.updateToken(updatedToken)
msTtsTokenSingleton = MsAzureCognitiveToken()
log.info("gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("msTtsTokenSingleton=%s", msTtsTokenSingleton)
msTtsTokenSingleton.updateToken(updatedToken)
return updatedToken
@celery.on_after_configure.connect
def celerySetupPeriodicTasks(sender, **kwargs):
log.info("celerySetupPeriodicTasks: celery=%s, sender=%s, kwargs=%s", celery, sender, kwargs)
sender.add_periodic_task(settings.CELERY_REFRESH_MS_TOKEN_INTERVAL,
# celeryRefreshAzureSpeechToken.s(),
refreshAzureSpeechToken.s(),
name="refresh ms Azure token every less than 10 minutes")
################################################################################
# Global Class
################################################################################
from common.ThreadSafeSingleton import ThreadSafeSingleton
class MsAzureCognitiveToken(metaclass=ThreadSafeSingleton):
curToken = ""
def __init__(self):
# Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
self.curToken = refreshAzureSpeechToken(False)
log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)
def updateToken(self, newToken=None):
log.info("updateToken: newToken=%s, curToken=%s", newToken, self.curToken)
if newToken:
self.curToken = newToken
else:
self.curToken = refreshAzureSpeechToken(False)
# refreshAzureSpeechToken(True)
log.info("after updateToken: curToken=%s", self.curToken)
gMsTtsTokenSingleton = MsAzureCognitiveToken()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)
# for debug singleton
testNewSingleton = MsAzureCognitiveToken()
log.info("new inited testNewSingleton=%s", testNewSingleton)
</code>去调试是可以单例的:
新建的另外的一个类,在内存中和之前的是一样的:

【后记】
后来Mac本地+PyCharm中或终端中,去本地调试gunicorn:
<code>/Users/crifan/.local/share/virtualenvs/xxx-SCpLPEyZ/bin/gunicorn -c conf/gunicorn/gunicorn_config.py app:app </code>
结果输出的内容看起来:
和之前一样,也有类似错误:
<code>objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. objc[22770]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. We cannot safely call it or ignore it in the fork() child process. Crashing instead. Set a breakpoint on objc_initializeAfterForkError to debug. objc[22768]: +[__NSPlaceholderDate initialize] may have been in progress in another thread when fork() was called. </code>

要么:

也是有多个实例运行
-》感觉这个不是线程安全的单例啊
-》不同的进程,好像会生成不同的单例啊
-》需要抽空去服务器中生产环境中调试看看效果
然后证明,果然是:
不同线程/进程,初始化出来的单例,实际上是不同的:

<code>[2018-08-29 16:48:11,866 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b3550> [2018-08-29 16:48:12,016 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b24a8> [2018-08-29 16:48:12,228 INFO tasks.py:114 <module>] inited gMsTtsTokenSingleton=<resources.tasks.MsTtsTokenSingleton object at 0x7f67199b56a0> </code>
即:
上述办法:
对于单线程中,的确可以实现单例
但是对于多线程的话,就无法实现单例了。
What are some common uses for Python decorators? – Stack Overflow
Single instance of class in Python – Stack Overflow
Python 中 Singleton 的写法及其拓展 – 掘金
python单例装饰器 // wangzhilong’s blog
python multiple thread singleton
multithreading – Python sharing class instance among threads – Stack Overflow
multithreading – Concurrent Singleton Class Python – Stack Overflow
multithreading – Thread-safe Singleton doesn’t work. Python – Stack Overflow
好像和现有做法,没啥区别
当Singleton遇到multi-threading – AllenYoung – ITeye博客
是java,不是我要的python的
Python Thread Safe Singleton Pattern | Works (Tips / Assets / Snippets)
再去看看,还是问题依旧:

其中代码中的:
<code>class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton):
curToken = ""
def __init__(self):
# Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
self.curToken = refreshAzureSpeechToken(False)
log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)
def updateToken(self, newToken=None):
log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken)
if newToken:
self.curToken = newToken
else:
self.curToken = refreshAzureSpeechToken(False)
# refreshAzureSpeechToken(True)
log.info("after updateToken: curToken=%s", self.curToken)
gMsTtsTokenSingleton = MsTtsTokenSingleton()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)
</code>中的updateToken中,对于单个进程:
<code>[2018-08-29 17:15:14,222 INFO tasks.py:104 updateToken] updateToken: self=<resources.tasks.MsTtsTokenSingleton object at 0x7fd806f694a8>, newToken=eyJhbxxxqx </code>

是正常的:
单个进程中,的确是单例的。
导致代码中token获取异常:
9个线程中,4个线程的token是None:

而之所以None的原因是:
<code>[2018-08-29 17:15:00,086 INFO tasks.py:57 refreshAzureSpeechToken] resp=<Response [429]>
[2018-08-29 17:15:00,087 ERROR tasks.py:65 refreshAzureSpeechToken] from MS_TTS_SECRET_KEY=dfxxx59f, get ms tts token failed for: reason=Too Many Requests, text={ "statusCode": 429, "message": "Rate limit is exceeded. Try again in 1 seconds." }
</code>对于429,找到定义是:
<code>"Response 429
application/json
{ "statusCode": 429, "message": "Rate limit is exceeded. Try again in 26 seconds." }"
</code>虽然此处可以暂时通过:
<code>log.error("from MS_TTS_SECRET_KEY=%s, get ms tts token failed for: reason=%s, text=%s",
settings.MS_TTS_SECRET_KEY, resp.reason, resp.text)
if resp.status_code == 429:
tryOneMoreDelay = 5
refreshAzureSpeechToken.apply_async([], countdown=tryOneMoreDelay)
</code>去规避此问题,但不是根本解决办法。
根本的办法还是要真正确保:
多线程中也是单例
Singleton pattern for Python (single and multi-threaded) – mor krispil
Glossary — Python 3.3.7 documentation
Double-checked locking – Wikipedia
去自己测试 多线程 是否是单例,然后想办法调试并解决,所以去:
【已解决】Python中用多线程thread去测试单例Singleton
难道是线程thread不对?要用多进程?
所以再去试试:
【已解决】Python中用多进程process测试单例Singleton
但是结果:
用gunicorn的多worker部署到线上环境后,又无效:不是单例了,单个类的实例都不同。
所以问题就转为:
【已解决】Flask的gunicorn中多进程多worker如何共享数据或单实例
【总结】
此处用如下代码:
common/ThreadSafeSingleton.py
<code>import functools
import threading
thread_lock = threading.Lock()
print("ThreadSafeSingleton: thread_lock=%s" % thread_lock)
# refer: https://stackoverflow.com/questions/50566934/why-is-this-singleton-implementation-not-thread-safe
def synchronized(lock):
""" Synchronization decorator """
def wrapper(f):
print("synchronized: wrapper: f=%s, lock=%s" % (f, lock))
@functools.wraps(f)
def inner_wrapper(*args, **kw):
print("functools.wraps: args=%s, kw=%s" % (args, kw))
with lock:
return f(*args, **kw)
print("inner_wrapper%s" % inner_wrapper)
return inner_wrapper
return wrapper
# class Singleton(type):
class ThreadSafeSingleton(type):
_instances = {}
@synchronized(thread_lock)
def __call__(cls, *args, **kwargs):
print("synchronized __call__: cls=%s, args=%s, kwargs=%s" % (cls, args, kwargs))
print("cls._instances=%s" % cls._instances)
if cls not in cls._instances:
# cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
cls._instances[cls] = super(ThreadSafeSingleton, cls).__call__(*args, **kwargs)
print("after added _instances: cls._instances=%s" % cls._instances)
return cls._instances[cls]
</code>然后别的地方调用:
比如logging:
common/FlaskLogSingleton.py
<code>import logging
from logging.handlers import RotatingFileHandler
from conf.app import settings
from common.ThreadSafeSingleton import ThreadSafeSingleton
# from sys import stdout
def init_logger(flask_settings, enableConsole=True):
print("init_logger")
flaskAppLogger = logging.getLogger(flask_settings.FLASK_APP_NAME) # <Logger RobotQA (WARNING)>
print("flaskAppLogger=%s" % flaskAppLogger)
flaskAppLogger.setLevel(flask_settings.LOG_LEVEL_FILE)
logFormatter = logging.Formatter(flask_settings.LOG_FORMAT)
fileHandler = RotatingFileHandler(
flask_settings.LOG_FILE_FILENAME,
maxBytes=flask_settings.LOG_FILE_MAX_BYTES,
backupCount=flask_settings.LOG_FILE_BACKUP_COUNT,
encoding="UTF-8")
fileHandler.setLevel(flask_settings.LOG_LEVEL_FILE)
fileHandler.setFormatter(logFormatter)
flaskAppLogger.addHandler(fileHandler)
if enableConsole :
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
# console = logging.StreamHandler(stdout)
console.setLevel(flask_settings.LOG_LEVEL_CONSOLE)
# set a format which is simpler for console use
formatter = logging.Formatter(
# fmt=logFormatter)
# fmt=logFormatter,
fmt=flask_settings.LOG_FORMAT,
datefmt=flask_settings.LOG_CONSOLE_DATA_FORMAT)
# tell the handler to use this format
console.setFormatter(formatter)
flaskAppLogger.addHandler(console)
print("init_logger: after init flaskAppLogger%s" % flaskAppLogger)
return flaskAppLogger
class LoggerSingleton(metaclass=ThreadSafeSingleton):
curLog = ""
def __init__(self):
self.curLog = init_logger(settings)
# Note: during __init__, AVOID use log, otherwise will deadlock
# log.info("LoggerSingleton __init__: curLog=%s", self.curLog)
print("LoggerSingleton __init__: curLog=%s" % self.curLog)
logSingleton = LoggerSingleton()
log = logSingleton.curLog
log.info("LoggerSingleton inited, logSingleton=%s", logSingleton) # <factory.LoggerSingleton object at 0x10cbcafd0>
log.info("log=%s", log) # <Logger RobotQA (DEBUG)>
# # debug for singleton log
# log2 = LoggerSingleton()
# print("log2=%s" % log2)
</code>比如,ms的tts的token:
resources/tasks.py
<code>
class MsTtsTokenSingleton(metaclass=ThreadSafeSingleton):
curToken = ""
def __init__(self):
# Not use refreshAzureSpeechToken(True) ini __init__, otherwise circular call
self.curToken = refreshAzureSpeechToken(False)
log.info("MsAzureCognitiveToken __init__: curToken=%s", self.curToken)
def updateToken(self, newToken=None):
log.info("updateToken: self=%s, newToken=%s, curToken=%s", self, newToken, self.curToken)
if newToken:
self.curToken = newToken
else:
self.curToken = refreshAzureSpeechToken(False)
# refreshAzureSpeechToken(True)
log.info("after updateToken: curToken=%s", self.curToken)
gMsTtsTokenSingleton = MsTtsTokenSingleton()
log.info("inited gMsTtsTokenSingleton=%s", gMsTtsTokenSingleton)
log.info("gMsTtsTokenSingleton.curToken=%s", gMsTtsTokenSingleton.curToken)
# debug refreshAzureSpeechToken
# newToken = refreshAzureSpeechToken()
# log.info("newToken=%s", newToken)
</code>现象是:
在本地环境:Mac中PyCharm,中调试期间:
单例都是完美工作的:
不论是log的:log2 = LoggerSingleton()
还是ms的tts的token的:newToken = refreshAzureSpeechToken()
都是和原有的类相同
-》此时已能证明,至少在单进程Process或单线程thread中,上述代码的单例是工作的
-》另外,本地又去调试了上面的MsTtsTokenSingleton的:
多线程thread和多进程Process
代码:
<code>log.info("========== test multiple thread singleton ==========")
import time, threading
def singleThreadDo():
log.info("---------- singleThreadDo ----------")
cur_thread = threading.current_thread()
cur_thread_name = cur_thread.name
curThreadTokenSingleton = MsTtsTokenSingleton()
log.info("[%s] cur_thread=%s, curThreadTokenSingleton=%s", cur_thread_name, cur_thread, curThreadTokenSingleton)
max_thread_num = 5
for idx in range(max_thread_num):
cur_num = idx + 1
each_thread_name = "T%s" % cur_num
cur_thread = threading.Thread(target=singleThreadDo, name=each_thread_name)
log.info("[%d] %s, %s", cur_num, each_thread_name, cur_thread)
# cur_thread.start()
cur_thread.run()
log.info("========== test multiple process singleton ==========")
import multiprocessing
def singleProcessDo(cur_num):
log.info("---------- singleProcessDo ----------")
cur_process = multiprocessing.current_process()
curProcessTTokenSingleton = MsTtsTokenSingleton()
log.info("curProcessTTokenSingleton=%s", curProcessTTokenSingleton)
log.info("[%d] name=%s, pid=%s, process=%s", cur_num, cur_process.name, cur_process.pid, cur_process)
max_process_num = 5
for idx in range(max_process_num):
cur_num = idx + 1
each_process_name = "P%s" % cur_num
cur_process = multiprocessing.Process(target=singleProcessDo, name=each_process_name, args=(cur_num, ))
log.info("[%d] name=%s, process=%s", cur_num, each_process_name, cur_process)
cur_process.start()
</code>-》结论是:
单例也是工作的:多个线程或进程中的单例都是一个类的实例
-》但是问题来了:
在部署到线上后,多进程或多线程时:
且不论部署方式是:
gunicorn的多worker=9个,type为sync -》 多进程=共9个线程
gunicorn是单worker=1,type是gevent -》 单进程
加上额外的2个线程:supervisor管理的celery的worker和beat的
共3个线程
上述单例失效:初始化出来的实例,都不同。
且没有很好的办法去解决celery的2个额外的进程导致上述单例失效的办法。
【后记】
后来想到一个,估计是更好的办法:
对于这种,多个Process进程之间共享数据(包括需要读取和修改)的事情,最好还是用通用的数据共享的方式,比如redis
如果是redis,则可以实现一个getToken,setToken/updateToken等函数,内部直接访问redis即可。
且还可以考虑利用redis的高级功能:expired,给set的token设置一个过期的时间,比如9分钟(其中是考虑到ms的tts的token的过期时间是10分钟),然后或许还有个callback,这样就可以去updateToken更新token了。
而此处由于时间限制 + 暂时单例失效但是业务逻辑中3个Process的获取tokne都返回200,都正常,业务逻辑中可以继续正常运行,所以就暂时不去尝试这个办法了。
等之后继续抽空优化,去试试redis的方案。