线程安全
线程安全
IPython中演示,python命令行、pycharm都不能演示出效果
import threading
def worker():
for x in range(1000):
print("{} is running.".format(threading.current_thread().name))
for x in range(1, 5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
看代码,应该是一行行打印,但是很多字符串打在了一起,为什么
说明,print函数被打断了,被线程切换打断了。print函数分两步,第一步打印字符串,第二步换行,就在这之间,发生了线程的切换
这说明print函数是线程不安全的
线程安全
线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的
上例中,本以为print应该是打印文本之后紧跟着一个换行的,但是有时候确实好几个文本在一起,后面跟上换行,而且发生这种情况的时机不确定,所以,print函数不是线程安全函数
如果是这样,多线程编程的时候,print输出日志,不能保证一个输出一定后面立即换行了
不让print打印换行
import threading
def worker():
for x in range(100):
print("{} is running.\n".format(threading.current_thread().name), end='')
for x in range(1, 5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
字符串是不可变的类型,它可以作为一个整体不可分割输出。end=''就不在让print输出换行了
使用logging
标准库里面的logging模块,日志处理模块,线程安全的,生产环境代码都是使用logging
import threading
import logging
def worker():
for x in range(100):
logging.warning("{} is running".format(threading.current_thread().name))
for x in range(1, 5):
name = "worker{}".format(x)
t = threading.Thread(name=name, target=worker)
t.start()
daemon线程和non-daemon线程
注意:这里的daemon不是linux中的守护进程
进程靠线程执行代码,至少有一个主线程,其它线程是工作线程
主线程是第一个启动的线程
父线程:如果线程A中启动了一个线程B,A就是B的父线程
子线程:B就是A的子线程
Python中,构造线程的时候,可以设置daemon属性,这个属性必须在start方法前设置好
# 源码Thread的__init__方法中
if daemon is not None:
self._daemonic = daemon # 用户设定bool值
else:
self._daemonic = current_thread().daemon
self._ident = None
线程daemon属性,如果设定就是用户的设置,否则就取当前线程的daemon值
主线程是non-daemon线程,即daemon = False
import time
import threading
def foo():
time.sleep(5)
for i in range(20):
print(i)
# 主线程是non-daemon线程
t = threading.Thread(target=foo, daemon=False)
t.start()
print("Main Thread Exiting")
发现线程t依然执行,主线程已经执行完,但是一直等着线程t
修改为t = threading.Thread(target=foo, daemon=True)
试一试
程序立即结束了,根本没有等线程
名称 | 含义 |
---|---|
daemon属性 | 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常 |
isDaemon() | 是否是daemon线程 |
setDaemon | 设置为daemon线程,必须在start方法之前设置 |
总结
线程具有一个daemon属性,可以显示设置为True或False,也可以不设置,则取默认值None
如果不设置daemon,就取当前线程的daemon来设置它
主线程是non-daemon线程,即daemon = False
从主线程创建的所有线程的不设置daemon属性,则默认都是daemon = false,也就是non-daemon线程
python程序在没有活着的non-daemon线程运行时退出,也就是剩下的只能是daemon线程,主线程才能退出,否则主线程就只能等待
import time
import threading
def bar():
time.sleep(10)
print('bar')
def foo():
for i in range(20):
print(i)
t = threading.Thread(target=bar, daemon=False)
t.start()
t = threading.Thread(target=foo, daemon=True)
t.start()
print('Main Thread Exiting')
上例中,会不会输出bar这个字符串,如果没有,如何修改才能打印出来
time.sleep(2)
print('Main Thread Exiting')
上例说明,如果non-daemon线程的时候,主线程退出时,也不会杀掉所有daemon线程,知道所有non-daemon线程全部结束,如果还有daemon线程,主线程需要退出,会结束所有daemon线程,退出
join方法
先看一个简单的例子,看看效果
import time
import threading
def foo(n):
for i in range(n):
print(i)
time.sleep(1)
t1 = threading.Thread(target=foo, args=(10,), daemon=True)
t1.start()
t1.join() # 设置join,取消join对比一下
print("Main Thread Exiting")
使用了join方法后,daemon线程执行完了,主线程才退出了
join(timeout=None),是线程的标准方法之一
一个线程中调用另一个线程的join方法,调用者将被阻塞,直到被调用线程终止
一个线程可以join多次
timeout参数指定调用者等待多久,没有设置超时,就一直等到调用线程结束
调用谁的join方法,就是join谁,就要等谁
daemon线程应用场景
简单来说就是,本来并没有daemon thread,为了简化程序员的工作,让他们不用去记录和管理那些后台线程,创造了一个daemon thread的概念。这个概念唯一的作用就是,当你把一个线程设置为daemon,它会随主线程的退出而退出
主要应用场景有:
1、后台任务,如发送心跳包、监控,这种场景最多
2、主线程工作才有用的线程。如主线程中维护这公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适
3、随时可以被终止的线程
如果在non-daemon线程A中,对另一个daemon线程B使用了join方法,这个线程B设置成daemon就没有什么意义了,因为non-daemon线程A总要等待B
如果在一个daemon线程C中,对另一个damon线程D使用了join方法,只能说明C要等待D,主线程退出,C和D不管是否结束,也不管它们谁等谁,都要被杀掉
举例
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print("t1's daemon={}".format(threading.current_thread().isDaemon))
t2 = threading.Thread(target=bar)
t2.start()
print("t2's daemon={}".format(t2.isDaemon()))
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
time.sleep(3)
print("Main Thread Exiting")
上例,只要主线程要退出,2个工作线程都结束。
可以使用join,让线程结束不了,怎么做
import time
import threading
def bar():
while True:
time.sleep(1)
print('bar')
def foo():
print("t1's daemon={}".format(threading.current_thread().isDaemon))
t2 = threading.Thread(target=bar)
t2.start()
print("t2's daemon={}".format(t2.isDaemon()))
t2.join()
t1 = threading.Thread(target=foo, daemon=True)
t1.start()
t1.join()
time.sleep(3)
print("Main Thread Exiting")
threading.local类
import threading
import time
# 局部变量变现
def worker():
x = 0
for i in range(100):
time.sleep(0.0001)
x += 1
print(threading.current_thread(), x)
for i in range(10):
threading.Thread(target=worker).start()
上例使用多线程,每个线程完成不同的计算任务。x是局部变量
能否改造成使用全局变量完成
import threading
import time
class A:
def __init__(self):
self.x = 0
# 全局对象
global_data = A()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
上例虽然使用了全局对象,但是线程之间互相干扰,导致了错误的结果
能不能使用全局对象,还能保持每个线程使用不同的数据呢
python提供threading.local
类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储的数据其他线程看不见
import threading
import time
# 全局对象
global_data = threading.local()
def worker():
global_data.x = 0
for i in range(100):
time.sleep(0.0001)
global_data.x += 1
print(threading.current_thread(), global_data.x)
for i in range(10):
threading.Thread(target=worker).start()
结果显示和使用局部变量的效果一样
再看threading.local的例子
import threading
X = 'abc'
ctx = threading.local() # 注意这个对象所处的线程
ctx.x = 123
print(ctx, type(ctx), ctx.x)
def worker():
print(x)
print(ctx)
print(ctx.x)
print('working')
worker() # 普通函数调用
print()
threading.Thread(target=worker).start() # 另起一个线程
从运行结果来看,另起一个线程打印ctx.x出错了
AttributeError: '_thread._local' object has no attribute 'x'
但是,ctx打印没有出错,说明看到ctx,但是看到ctx,但是ctx中的x看不到,这个x不能跨线程
threading.local类构建了一个大字典,其元素是每一线程实例的地址为key和线程对象引用线程单独的字典的映射,如下:
{ id(Thread) -> (ref(Thread), thread-local dict)}
通过threading.local实例就可在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全
定时器Timer/延迟执行
threading.Timer继承自Thread,这个类用来定义多久执行一个函数
class threading.Timer(interval, function, args=None, kwargs=None)
start方法执行之后,Timer对象会处于等待状态,等待了interval之后,开始执行function函数的。如果在执行函数之前的等待阶段,使用了cancel方法,就会跳过执行函数结束
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.setName('w1')
t.start() # 启动线程
print(threading.enumerate())
t.cancel() # 取消,可以1注释这一句看看如何定时执行
time.sleep(1)
print(threading.enumerate())
如果线程中worker函数已经开始执行,cancel就没有任何效果了
总结
Timer是线程Thread的子类,就是线程类,具有线程的能力和特征
它的实例是能够延时执行目标函数的线程,在真正执行目标函数之前,都可以cancel它提前cancel
import threading
import logging
import time
FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
logging.info('in worker')
time.sleep(2)
t = threading.Timer(5, worker)
t.setName('w1')
t.cancel() # 提前取消
t.start() # 启动线程
print(threading.enumerate())
time.sleep(1)
print(threading.enumerate())