concurrent.futures

3.2版本引入的模块

异步并行任务编程模块,提供一个高级的异步可执行的便利接口

提供了2个池执行器
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor

ThreadPoolExecutor对象

首先需要定义一个池的执行器对象,Executor类子类对象

方法含义
ThreadPoolExecutor(max_workers=1)池中至多创建max_workers个线程的池来同时异步执行,返回Executor实例
submit(fn, *args, **kwargs)提交执行的函数及其参数,返回Future实例
shutdown(wait=True)清理池

Future类

方法含义
done()如果调用被成功的取消或者执行完成,返回True
cancelled()如果调用被成功的取消,返回True
running()如果正在运行且不能被取消,返回True
cancel()尝试取消调用。如果已经执行且不能取消返回False,否则返回True
result(timeout=None)取返回的结果,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常
exception(timeout=None)取返回的异常,timeout为None,一直等待返回;timeout设置到期,抛出concurrent.futures.TimeoutError异常
import threading
from concurrent import futures

import logging
import time

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))


# 创建线程池,池的容量为3
executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []

for i in range(3):
    futures = executor.submit(worker, i)
    fs.append(futures)

for i in range(3, 6):
    futures = executor.submit(worker, i)
    fs.append(futures)

while True:
    time.sleep(2)
    logging.info(threading.enumerate())

    flag = True
    for f in fs:
        logging.info(f.done())
        flag = flag and f.done()
        # if not flag:
        #     break
    print('-'*30)

    if flag:
        executor.shutdown()  # 清理池,池中线程全部杀掉
        logging.info(threading.enumerate())
        break
   
# 线程池中一旦创建了线程,就不要用频繁清除

ProcessPoolExecutor对象

方法一样。就是使用多进程完成

import threading
from concurrent import futures

import logging
import time

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))


# 创建进程池,池的容量为3
if __name__ == '__main__'
    executor = futures.ProcessPoolExecutor(max_workers=3)
    fs = []

    for i in range(3):
        futures = executor.submit(worker, i)
        fs.append(futures)

    for i in range(3, 6):
        futures = executor.submit(worker, i)
        fs.append(futures)

    while True:
        time.sleep(2)
        logging.info(threading.enumerate())

        flag = True
        for f in fs:
            logging.info(f.done())
            flag = flag and f.done()
            # if not flag:
            #     break
        print('-'*30)

        if flag:
            executor.shutdown()  # 清理池,池中线程全部杀掉
            logging.info(threading.enumerate())
            break
    

支持上下文管理

concurrent.futures.ProcessPoolExecutor继承自concurrent.futures.base.Executer,而父类有__enter____exit__方法,支持上下文管理。可以使用with语句
__exit__方法本质还是调用shutdown(wait=True),就是一直阻塞到所有运行的任务完成

使用方法

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())

使用上下文改造上面的例子,增加返回计算的结果

import threading
from concurrent import futures

import logging
import time

FORMAT = '%(asctime)-15s\t [%(processName)s:%(threadName)s, %(process)d:%(thread)8d] %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)


def worker(n):
    logging.info('begin to work{}'.format(n))
    time.sleep(5)
    logging.info('finished{}'.format(n))


if __name__ == '__main__':
    # 创建线程池,池的容量为3
    executor = futures.ThreadPoolExecutor(max_workers=3)
    
    with executor:
        fs = []

        for i in range(3):
            futures = executor.submit(worker, i)
            fs.append(futures)

        for i in range(3, 6):
            futures = executor.submit(worker, i)
            fs.append(futures)

        while True:
            time.sleep(2)
            logging.info(threading.enumerate())

            flag = True
            for f in fs:
                logging.info(f.done())
                flag = flag and f.done()
                # if not flag:
                #     break
            print('-'*30)

            if flag:
                break
    # executor.shutdown()
    logging.info('=====end=====')
    logging.info(threading.enumerate()) 

总结

该库统一了线程池、进程池调用,简化了编程
是Python简单哲学的体验

唯一的缺点:无法设置线程名称。但这都不值得一提