概念

线程同步,线程间协同,通过某种技术,让一个线程访问某些数据时,其他线程不能访问这些数据,直到该线程完成对数据的操作

不同操作系统实现技术有所不同,有临界区(Critical Section)、互斥量(Mutex)、信号量(Semaphore)、事件Event等

Event

Event事件,是线程间通信机制中最简单的实现,使用一个内部的标记flag,通过flag的True或False的变化来进行操作

名称含义
set()标记设置为True
clear()标记设置为False
is_set()标记是否为False
wait(timeout=None)设置等待标记为True的时长,None为无限等待。等待返回True,未等到超时了返回False

需求:
老板雇佣了一个,让他生产杯子,老板一直等着这个工人,直到生产了10个杯子

import imp
from threading import Event, Thread
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

def boss(event: Event):
    logging.info("I'm boss, waiting for U.")
    # 等待
    event.wait()
    logging.info("Good Job.")


def worker(event: Event, count=10):
    logging.info("I'm working for U.")
    cups = []
    while True:
        logging.info("make 1")
        time.sleep(0.5)
        cups.append(1)
        if len(cups) >= count:
            # 通知
            event.set()
            break
    logging.info('I finished my job. cups={}'.format(cups))

event = Event()
w = Thread(target=worker, args=(event,))
b = Thread(target=boss, args=(event,))
w.start()
b.start()

总结
使用同一个Event对象的标记flag。
谁wait就是等到flag变为True,或等到超时返回False。不限制等待的个数

wait的使用

from threading import Event, Thread
import logging

logging.basicConfig(level=logging.INFO)

def do(event: Event, interval: int):
    while not event.wait(interval):  # 条件中使用,返回True或者False
        logging.info('do sth.')

e = Event()
Thread(target=do, args=(e, 3)).start()

e.wait(10)  # 也可以使用time.sleep(10)
e.set()
print('main exit')

Event的wait优于time.sleep,它会更快的切换到其它线程,提高并发效率

Event练习

实现Timer,延时执行的线程
延时计算add(x, y)

思路
Timer的构造函数中参数有哪些?
如何实现start启动一个线程执行函数
如何cancel取消待执行任务

思路实现

from threading import Event, Thread
import datetime
import logging
logging.basicConfig(level=logging.INFO)

def add(x: int, y: int):
    logging.info(x + y)

class Timer:
    def __init__(self, interval, fn, *args, **kwargs) -> None:
        self.interval = interval
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.event = Event()

    def start(self):
        Thread(target=self._run).start()

    def cancel(self):
        self.event.set()

    def _run(self):
        start = datetime.datetime.now()
        logging.info('waiting')

        self.event.wait(self.interval)
        if not self.event.is_set():
            self.fn(*self.args, **self.kwargs)
        delta = (datetime.datetime.now() - start).total_seconds()
        logging.info('finished{}'.format(delta))
        self.event.set()

t = Timer(10, add, 4, 50)
t.start()
e = Event()
e.wait(4)
# t.cancel()
print('=====')

或者

from threading import Event, Thread
import datetime
import logging
logging.basicConfig(level=logging.INFO)

def add(x: int, y: int):
    logging.info(x + y)

class Timer:
    def __init__(self, interval, fn, *args, **kwargs) -> None:
        self.interval = interval
        self.fn = fn
        self.args = args
        self.kwargs = kwargs
        self.event = Event()

    def start(self):
        Thread(target=self._run).start()

    def cancel(self):
        self.event.set()

    def _run(self):
        start = datetime.datetime.now()
        logging.info('waiting')

        
        if not self.event.wait(self.interval):
            self.fn(*self.args, **self.kwargs)
        delta = (datetime.datetime.now() - start).total_seconds()
        logging.info('finished{}'.format(delta))
        self.event.set()

Lock

锁,凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者都可以完全使用这个资源

需求
订单要求生产1000个被子,组织10个工人生产

from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

cups = []

def worker(count=10):
    logging.info("I'm working for U.")
    while len(cups) < count:
        time.sleep(0.0001)  # 为了看出线程切换效果
        cups.append(1)
    logging.info('I finished. cups={}'.format(len(cups)))


for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

# 运行结果
2022-04-09 19:43:08,181 Thread-1 (worker) 123145359519744 I'm working for U.
2022-04-09 19:43:08,181 Thread-2 (worker) 123145376309248 I'm working for U.
2022-04-09 19:43:08,181 Thread-3 (worker) 123145393098752 I'm working for U.
2022-04-09 19:43:08,182 Thread-4 (worker) 123145409888256 I'm working for U.
2022-04-09 19:43:08,182 Thread-5 (worker) 123145426677760 I'm working for U.
2022-04-09 19:43:08,183 Thread-6 (worker) 123145443467264 I'm working for U.
2022-04-09 19:43:08,183 Thread-7 (worker) 123145460256768 I'm working for U.
2022-04-09 19:43:08,184 Thread-8 (worker) 123145477046272 I'm working for U.
2022-04-09 19:43:08,184 Thread-9 (worker) 123145493835776 I'm working for U.
2022-04-09 19:43:08,185 Thread-10 (worker) 123145510625280 I'm working for U.
2022-04-09 19:43:08,199 Thread-5 (worker) 123145426677760 I finished. cups=1000
2022-04-09 19:43:08,199 Thread-1 (worker) 123145359519744 I finished. cups=1001
2022-04-09 19:43:08,199 Thread-3 (worker) 123145393098752 I finished. cups=1002
2022-04-09 19:43:08,199 Thread-6 (worker) 123145443467264 I finished. cups=1003
2022-04-09 19:43:08,199 Thread-8 (worker) 123145477046272 I finished. cups=1004
2022-04-09 19:43:08,199 Thread-2 (worker) 123145376309248 I finished. cups=1005
2022-04-09 19:43:08,199 Thread-4 (worker) 123145409888256 I finished. cups=1006
2022-04-09 19:43:08,199 Thread-7 (worker) 123145460256768 I finished. cups=1007
2022-04-09 19:43:08,199 Thread-10 (worker) 123145510625280 I finished. cups=1008
2022-04-09 19:43:08,199 Thread-9 (worker) 123145493835776 I finished. cups=1009

从上例的运行结果看出,多线程调度,导致了判断失效,多生产了杯子。如何修改?加锁

Lock
锁,一旦线程获得锁,其它试图获取锁的线程将被阻塞

名称含义
acquire(blocking=True, timeout=-1)默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置。成功获取锁,返回True,否则返回False
release()释放锁。可以从任何线程调用释放。已上锁的锁,会被重置为unlocked未上锁的锁上调用,抛RuntimeError异常

上例的锁的实现

from threading import Thread, Lock
import logging
import time

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

cups = []
lock = Lock()

def worker(count=10):
    logging.info("I'm working for U.")
    flag = False
    while True:
        lock.acquire() # 获取锁

        if len(cups) >= count:
            flag = True
  
        time.sleep(0.0001)  # 为了看出线程切换效果
        if not flag:
            cups.append(1)
        lock.release() 
        if flag:
            break
    logging.info('I finished. cups={}'.format(len(cups)))


for _ in range(10):
    Thread(target=worker, args=(1000,)).start()

我们发现锁保证了数据完整性,但是性能下降很多

上例中if flag: break是为了保证release方法被执行,否则,就出现了死锁,得到锁的永远没有释放锁

计数器类,可以加、可以减

import threading
from threading import Thread, Lock
import time

class Counter:
    def __init__(self) -> None:
        self._val = 0

    @property
    def value(self):
        return self._val
    
    def inc(self):
        self._val += 1

    def dec(self):
        self._val -= 1
    

def run(c: Counter, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10  # 线程数
c2 = 10
for i in range(c1):
    Thread(target=run, args=(c, c2)).start()

print(c.value)

c1取10、100、1000看看
c2取10、100、1000看看

self._val+=1或self._val-=1在线程中执行的时候,有可能被打断
要加锁。怎么加

加锁、解锁

一般来说,加锁就需要解锁,但是加锁后解锁前,还要有一些代码执行,就有可能抛异常,一旦出现异常,锁是无法释放,但是当前线程可能因为这个异常被终止了,这就产生了死锁

加锁、解锁常用语句:

  1. 使用try...finally语句保证锁的释放
  2. with上下文管理,锁对象支持上下文

改造Counter类,如下

import threading
from threading import Thread, Lock
import time

class Counter:
    def __init__(self) -> None:
        self._val = 0
        self._lock = Lock()

    @property
    def value(self):
        with self._lock:
            return self._val
    
    def inc(self):
        try:
            self._lock.acquire()
            self._val += 1
        finally:
            self._lock.release()

    def dec(self):
        with self._lock:
            self._val -= 1
    

def run(c: Counter, count=100):
    for _ in range(count):
        for i in range(-50, 50):
            if i < 0:
                c.dec()
            else:
                c.inc()


c = Counter()
c1 = 10  # 线程数
c2 = 10
for i in range(c1):
    Thread(target=run, args=(c, c2)).start()

while True:
    time.sleep(1)
    if threading.active_count() == 1:
        print(threading.enumerate())
        print(c.value)
        break
    else:
        print(threading.enumerate())
 

锁的应用场景

锁适用于访问和修改一个共享资源的时候,即读写同一资源的时候

如果全部都是读取同一个共享资源需要锁吗?
不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁

使用锁的注意事项:

  • 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行
    • 举例,高速公路上并行跑,可是到了省界只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过
  • 加锁时间越短越好,不需要就立即释放锁
  • 一定要避免死锁
    不使用锁,有了效率,但是结果是错的
    使用了锁,效率低下,但是结果是对的
    所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算

非阻塞锁使用

from ast import arg
import imp


import threading
import logging
import time

FORMAT = '%(asctime)-15s\t[%(threadName)s, %(thread)d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(tasks):
    for task in tasks:
        time.sleep(0.001)
        if task.lock.acquire(False):  # 获取锁则返回True
            logging.info('{} {} begin to start'.format(threading.current_thread(), task.name))
            # 适当的时机释放锁,为了演示不释放
        else:
            logging.info('{} {} is working'.format(threading.current_thread(), task.name))


class Task:
    def __init__(self, name) -> None:
        self.name = name
        self.lock = threading.Lock()

# 构造10个任务
tasks = [Task('task-{}'.format(x)) for x in range(10)]

# 启动5个线程
for i in range(5):
    threading.Thread(target=worker, name='worker-{}'.format(i), args=(tasks, )).start()

可重入锁RLock

可重入锁,是线程相关的锁
线程A获得可重复锁,并可以多次成功获取,不会阻塞。最后要在线程中做和acquire次数相同的release

import threading
import time

lock = threading.RLock()
print(lock.acquire())
print('----------')
print(lock.acquire(blocking=False))
print(lock.acquire())
print(lock.acquire(timeout=3.55))
print(lock.acquire(blocking=False))
# print(lock.acquire(blocking=False, timeout=10))  # 异常
lock.release()
lock.release()
lock.release()
lock.release()
print('main thread {}'.format(threading.current_thread().ident))
print("lock in main thread {}".format(lock))  # 注意观察lock对象的信息
lock.release()
# lock.release()
print('==========')
print()

print(lock.acquire(blocking=False))  # 1次
# threadimg.Timer(3, lambda x:x.release(), args=(lock,)).start()  # 跨线程了,异常
lock.release()
print('~~~~~~~~~~')
print()

# 测试多线程
print(lock.acquire())
def sub(l):
    print('{}: {}'.format(threading.current_thread(), l.acquire()))  # 阻塞
    print('{}: {}'.format(threading.current_thread(), l.acquire(False)))  # 阻塞
    print('lock in sub thread {}'.format(lock))
    l.release()
    print('sub 1')
    l.release()
    print('sub 2')
    # l.release()
    
threading.Timer(2, sub, args=(lock,)).start()  # 传入同一个lock对象
print('++++++++++')
print()

print(lock.acquire())
lock.release()
time.sleep(5)
print('释放主线程锁')
lock.release()

可重入锁,与线程相关,可在一个线程中获取锁,并可继续在同一线程中不阻塞获取锁。当锁未释放完,其它线程获取锁就会阻塞,直到当前持有锁的线程释放完锁

Condition

构造方法Condition(lock=None),可以传入一个Lock对象,默认是RLock

名称含义
acquire(*args)获取锁
wait(self, timeout=None)等待或超时
notify(n=1)唤醒至多指定数目个数的等待的线程,没有等待的线程就没有任何操作
notify_all()唤醒所有等待的线程

Condition用于生产者、消费者模型,为了解决生产者速度匹配问题
先看一个例子,消费者消费速度大于生产者生产速度

from threading import Thread, Event
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self) -> None:
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            logging.info(data)
            self.data = data
            self.event.wait(1)
        self.event.set()
    
    def consume(self):
        while not self.event.is_set():
            data = self.data
            logging.info("recieved {}".format(data))
            self.data = None
            self.event.wait(0.5)

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

这个例子采用了消费者主动消费,消费者浪费了大量时间,主动来查看有没有数据。能否换成一种通知机制,有数据通知消费者来消费呢
使用Condition对象

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self) -> None:
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)  # 模拟生产数据速度
        self.event.set()
    
    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()  # 阻塞等通知
                logging.info("received {}".format(self.data))
                self.data = None
            self.event.wait(0.5)  # 模拟消费速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
c = Thread(target=d.consume, name='consumer')
c.start()
p.start()

上例中,消费者等待数据等待,如果生产者准备好了会通知消费者消费,省得消费者反复来查看数据是否就绪
如果是一个生产者,多个消费者怎么改

from threading import Thread, Event, Condition
import logging
import random

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

## 此例只是为了演示,不考虑线程安全问题

class Dispatcher:
    def __init__(self) -> None:
        self.data = None
        self.event = Event()  # event只是为了使用方便,与逻辑无关
        self.cond = Condition()

    def produce(self, total):
        for _ in range(total):
            data = random.randint(0, 100)
            with self.cond:
                logging.info(data)
                self.data = data
                self.cond.notify_all()
            self.event.wait(1)  # 模拟生产数据速度
        self.event.set()
    
    def consume(self):
        while not self.event.is_set():
            with self.cond:
                self.cond.wait()  # 阻塞等通知
                logging.info("received {}".format(self.data))
                self.data = None
            self.event.wait(0.5)  # 模拟消费速度

d = Dispatcher()
p = Thread(target=d.produce, args=(10,), name='producer')
for i in range(5):
    c = Thread(target=d.consume, name='consumer')
    c.start()
p.start()

self.cond.notify_all() # 发通知
修改为
self.cond.notify(n=2)
试一试看看结果?
这个例子,可以看到实现了消息一对多,这其实就是广播模式

注:上例中,程序本身不是程序安全的,程序逻辑有很多瑕疵,但是可以很好的帮助理解Condition的使用,和生产者消费者模型

Condition总结

condition用于生产者消费者模型中,解决生产者消费这速度匹配的问题
采用了通知机制,非常有效率

使用方式
使用Condition,必须先acquire,用完了要release,因为内部使用了锁,默认使用RLock锁,最好的方式是使用with上下文
消费者wait,等待通知
生产者生产好消息,对消费者发通知,可以使用notify或者notify_all方法

Barrier

有人翻译成栏栅,我建议使用屏障,可以想象成路障、道闸
3.2引入Python的新功能

名称含义
Barrier(parties, action=None, timeout=None)构建Barrier对象,指定参与方数目。timeout是wait方法未指定超时的默认值
n_waiting当前在屏障中等待的线程数
parties各方数,就是需要多少个等待
wait(timeout=None)等待通过屏障。返回0到线程数-1的整数,每个线程返回不同。如果wait方法设置了超时,并超时发送,屏障将处于broken状态

Barrier实例

import threading
import logging

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

 
def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(3)

for x in range(3):  # 改成4, 5, 6试一试
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

logging.info('started')

# 运行结果
2022-04-09 22:31:48,929 worker-0 123145335320576 waiting for 0 threads.
2022-04-09 22:31:48,930 worker-1 123145352110080 waiting for 1 threads.
2022-04-09 22:31:48,930 worker-2 123145368899584 waiting for 2 threads.
2022-04-09 22:31:48,930 worker-2 123145368899584 after barrier 2
2022-04-09 22:31:48,930 worker-0 123145335320576 after barrier 0
2022-04-09 22:31:48,930 MainThread 4405802496 started
2022-04-09 22:31:48,930 worker-1 123145352110080 after barrier 1

从线程结果看出:
所有线程冲到了Barrier前等待,直到到达parties的数目,屏障打开,所有线程停止等待,继续执行
再有线程wait,屏障就绪等待到达参数数方数目。
举例,赛马比赛所有马匹就位,开闸。下批马匹陆续来到闸门前等待比赛

名称含义
broken如果屏障处于打破的状态,返回True
abort()将屏障置于broken状态,等待中的线程或者调用等待方法的线程中都会抛出BrokenBarrierError异常,直到reset方法来恢复屏障
reset()恢复屏障,重新开始拦截
import threading
import logging

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

 
def worker(barrier: threading.Barrier):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(3)

for x in range(9):
    if x == 2:
        barrier.abort()
    elif x == 6:
        barrier.reset()
    threading.Event().wait(1)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier,)).start()

logging.info('started')

# 运行结果
2022-04-09 23:26:42,520 worker-0 123145321443328 waiting for 0 threads.
2022-04-09 23:26:43,520 worker-1 123145338232832 waiting for 1 threads.
2022-04-09 23:26:43,521 worker-1 123145338232832 Broken Barrier
2022-04-09 23:26:43,521 worker-0 123145321443328 Broken Barrier
2022-04-09 23:26:44,523 worker-2 123145321443328 waiting for 0 threads.
2022-04-09 23:26:44,523 worker-2 123145321443328 Broken Barrier
2022-04-09 23:26:45,529 worker-3 123145321443328 waiting for 0 threads.
2022-04-09 23:26:45,529 worker-3 123145321443328 Broken Barrier
2022-04-09 23:26:46,532 worker-4 123145321443328 waiting for 0 threads.
2022-04-09 23:26:46,533 worker-4 123145321443328 Broken Barrier
2022-04-09 23:26:47,536 worker-5 123145321443328 waiting for 0 threads.
2022-04-09 23:26:47,536 worker-5 123145321443328 Broken Barrier
2022-04-09 23:26:48,540 worker-6 123145321443328 waiting for 0 threads.
2022-04-09 23:26:49,543 worker-7 123145338232832 waiting for 1 threads.
2022-04-09 23:26:50,547 worker-8 123145355022336 waiting for 2 threads.
2022-04-09 23:26:50,547 MainThread 4739347968 started
2022-04-09 23:26:50,547 worker-8 123145355022336 after barrier 2
2022-04-09 23:26:50,547 worker-7 123145338232832 after barrier 1
2022-04-09 23:26:50,547 worker-6 123145321443328 after barrier 0

上例中,屏障中等待了2个,屏障就被break了,waiting的线程抛了BrokenBarrierError异常,新wait的线程也抛异常,直到屏障恢复,才继续按照parties数目要求继续拦截线程

wait方法超时实例

如果wait方法超时发生,屏障将处于broken状态,直到reset

import threading
import logging

FORMAT = '%(asctime)s %(threadName)s %(thread)d %(message)s'
logging.basicConfig(format=FORMAT, level=logging.INFO)

 
def worker(barrier: threading.Barrier, i: int):
    logging.info('waiting for {} threads.'.format(barrier.n_waiting))
    try:
        logging.info(barrier.broken)  # 是否broken
        if i < 3:
            barrier_id = barrier.wait(1)  # 超时后,屏蔽broken
        else:
            if i == 6:
                barrier.reset()  # 恢复屏障
            barrier_id = barrier.wait()
        logging.info('after barrier {}'.format(barrier_id))
    except threading.BrokenBarrierError:
        logging.info('Broken Barrier')

barrier = threading.Barrier(3)

for x in range(9):
    threading.Event().wait(2)
    threading.Thread(target=worker, name='worker-{}'.format(x), args=(barrier, x)).start()

Barrier应用

并发初始化
所有线程都必须初始化完成后,才能继续工作,例如运行前加载数据、检查,如果这些工作没完成,就开始运行,将不能正常工作
10个线程做10种工作准备,每个线程负责一种工作,只有这10个线程都完成后,才能继续工作,先完成的要等待后完成的线程
例如,启动一个程序,需要先加载磁盘文件、缓存预热、初始化连接池等工作,这些工作可以齐头并进,不过只有都满足了,程序才能继续向后执行。假设数据库连接失败,则初始化工作失败,就要abort,barrier置为broken,所有线程收到异常退出

工作量
有10个计算任务,完成6个,就算工作完成

semaphore信号量

和Lock很像,信号量对象内部维护一个倒计数器,每一次acquire都会减1,当acquire方法发现计数为0就阻塞请求的线程,直到其它线程对信号量release后,计数大于0,恢复阻塞的线程

名称含义
Semaphore(value=1)构造方法。value小于0,抛ValueError异常
acquire(blocking=True, timeout=None)获取信号量,计数器减1,获取成功返回True
release()释放信号量,计数器加1

计数器永远不会低于0,因为acquire的时候,发现是0,都会被阻塞

from cmath import log
import threading
import logging
import time

# 输出格式定义
FORMAT = "%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)

def worker(s: threading.Semaphore):
    logging.info('in sub thread')
    logging.info(s.acquire())  # 阻塞
    logging.info('sub thread over')

# 信号量
s = threading.Semaphore(3)
logging.info(s.acquire())
print(s._value)
logging.info(s.acquire())
print(s._value)
logging.info(s.acquire())
print(s._value)

threading.Thread(target=worker, args=(s,)).start()

time.sleep(2)

logging.info(s.acquire(False))
logging.info(s.acquire(timeout=3))  # 阻塞3秒

# 释放
logging.info('released')
s.release()

应用举例

连接池
因为资源有限,且开启一个连接成本高,所以,使用连接池

一个简单的连接池
连接池应该有容量(总数),有一个工厂方法可以获取连接,能够把不用的连接返回,供其他调用者使用

import threading

class Conn:
    def __init__(self, name) -> None:
        self.name = name

class Pool:
    def __init__(self, count: int) -> None:
        self.count = count
        # 池中是连接对象的列表
        self.pool = [self._connect("conn-{}".format(x)) x for x in range(self.count)]
             
    def _connect(self, conn_name):
        # 创建连接的方法,返回一个名称
        return Conn(conn_name)
    
    def get_conn(self):
        # 从池中拿走一个连接
        if len(self.pool) > 0:
            return self.pool.pop()
    
    def return_conn(self, conn: Conn):
        # 向池中添加一个连接
        self.pool.append(conn)

真正的连接池的实现比上面的例子要复杂的多,这里只是简单的一个功能的实现
本例中,get_conn()方法在多线程的时候有线程安全问题
假设池中正好有一个连接,有可能多个线程判断池的长度大于0的,当一个线程拿走了连接对象,其他线程再来pop就会抛异常的,如何解决?

  1. 加锁,在读写的地方加锁
  2. 使用信号量Semaphore

使用信号量对上例进行修改

import threading
import logging
import random

FORMAT = "%(asctime)-15s\t [%(threadName)s, %(thread)8d] %(message)s"
logging.basicConfig(level=logging.INFO, format=FORMAT)


class Conn:
    def __init__(self, name) -> None:
        self.name = name

class Pool:
    def __init__(self, count: int) -> None:
        self.count = count
        # 池中是连接对象的列表
        self.pool = [self._connect("conn-{}".format(x)) x for x in range(self.count)]
        self.semaphore = threading.Semaphore(count)
             
    def _connect(self, conn_name):
        # 创建连接的方法,返回一个名称
        return Conn(conn_name)
    
    def get_conn(self):
        # 从池中拿走一个连接
        print('----------')
        self.semaphore.acquire()
        print('==========')
        conn = self.pool.pop()
        return conn

    def return_conn(self, conn: Conn):
        # 向池中添加一个连接
        self.pool.append(conn)
        self.semaphore.release()


# 连接池初始化
pool = Pool(3)

def worker(pool: Pool):
    conn = pool.get_conn()
    logging.info(conn)
    # 模拟使用了一段时间
    threading.Event().wait(random.randint(1, 4))
    pool.return_conn(conn)

for i in range(6):
    threading.Thread(target=worker, name="worker-{}".format(i), args=(pool,)).start()

上例中,使用信号量解决资源有限的问题
如果池中有资源,请求者获取资源时信号量减1,拿走资源。当请求超过资源数,请求者只能等待。当使用者用完归还资源后信号量加1,等待线程就可以被唤醒拿走资源
注意:这个例子不能用到生产环境,只是为了说明信号量使用的例子,还有很多未完成功能

问题

self.conns.append(conn)这一句要不要加锁

假设如果还没有使用信号量,就release,会怎么样

import logging
import threading

sema = threading.Semaphore(3)
logging.warning(sema.__dict__)
for i in range(3):
    sema.acquire()

logging.warning('~~~~~~~~~~')
logging.warning(sema.__dict__)

for i in range(4):
    sema.release()
logging.warnning(sema.__dict__)

for i in range(3):
    sema.acqure()
logging.warning('~~~~~~~~~~')
logging.warning(sema.__dict__)
sema.acquire()
logging.warning('~~~~~~~~~~')
logging.warning(sema.__dict__)

从上例输出结果可以看出,竟然内置计数器达到了4,这样实际上超出我们的最大值,需要解决这个问题

BoundedSemaphore类

有界的信号量,不允许使用release超出初始值范围,否则,抛出ValueError异常

这样用有界信号量修改源代码,保证如果多return_conn就会抛异常
保证了多归还连接抛出异常

如果归还了同一个连接多次怎么办,去重很容易判断出来

如果使用了信号量,但是还没有用完

self.pool.append(conn)
self.semaphore.release()

假设一种极端情况,计数器还差1就满了,有三个线程A、B、C都执行了第一句,都没有来得及release,这时候轮到线程A release,正常的release,然后轮到线程C先release,一定出问题,超界了,直接抛异常
因此信号量,可以保证,一定不能多归还

很多线程用完了信号量

没有获得信号量的线程都阻塞,没有线程和归还的线程争抢,当append后才release,这时候才能pop,也就是pop,也就是没有信号量就不能pop,这才是安全的

经过上面分析,信号量比计算列表长度好,线程安全

信号量和锁

锁,只允许同一个时间一个线程独占资源。他是特殊的信号量,即信号量计数器初值为1,可以多个线程访问共享资源,但这个共享资源数量有限
锁,可以看做特殊的信号量