yan's blog

Python ThreadPoolExecutor usage

Python 线程池介绍

Python 的多线程对于IO-bound 类型的任务还是能有很大的提升的。涉及到多线程,必然会谈到线程的数目。如果自己实现线程池,可能花了很大的精力还是有很多 BUG。所幸的是,Python 中提供了开箱即用的 ThreadPoolExecutor,具体文档见此处

简单的示例

串行

import requests
import time


def get(i):
    url = "https://www.baidu.com"
    res = requests.get(url)
    print(res.status_code)


start = time.time()

for i in range(10):
    get(i)
print(time.time() - start)

# 0.5793931484222412

ThreadPoolExecutor

import requests
from concurrent.futures import ThreadPoolExecutor
import time


def get(i):
    url = "https://www.baidu.com"
    res = requests.get(url)
    print(res.status_code)
    i / (i-1)

start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    result = executor.map(get, [1, 2, 3, 4, 5, 6, 7, 8, 9])
print(time.time() - start)

# 0.11638116836547852

可以看到提升还是很明显的。那么问题来了,worker是不是设置越多就越好呢?其实不是, worker 设置的过多,线程间的切换开销将变得比较大,反而会降低性能。那么,worker 一般设置多少最好呢?Python 3.8max_workers 的默认值为 min(32, os.cpu_count() + 4),我们设置的时候可以默认不设置或者自己多尝试几次,取最佳的情况。

上面代码有个坑不知道你注意到了没?i/(i-1)i=1的时候应该是会报错的,但是当你实际运行的时候却发现并不会有异常。查看 Python 的官方文档后可以知道

If a func call raises an exception, then that exception will be raised when its value is retrieved from the iterator.

但是有的时候我们不关心结果,只关心执行是否有异常,我们可以用 try ... except 包含起来,这样就算有异常,也能够被捕获。

import logging
import requests
from concurrent.futures import ThreadPoolExecutor
import time


def get(i):
    try:
        url = "https://www.baidu.com"
        res = requests.get(url)
        print(res.status_code)
        i / (i - 1)
    except Exception as e:
        logging.warning(f"has exception {str(e)}")


start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    result = executor.map(get, [1, 2, 3, 4, 5, 6, 7, 8, 9])
print(time.time() - start)

#200
#200
#200
#200
#200
#200
#200
#200
#200
#200
#WARNING:root:has exception division by zero
#0.2282712459564209

可以看到,异常被捕获了。

你可能会觉得上面的 try ... except 会有点傻,用装饰器就可以了。另外如果你用 Loguru 这个日志库的话,你可以用它的 @logger.catch 这个装饰器,这样不用自己写了。并且它给的异常信息非常丰富

import logging
import requests
from concurrent.futures import ThreadPoolExecutor
import time

from loguru import logger

@logger.catch
def get(i):
    url = "https://www.baidu.com"
    res = requests.get(url)
    print(res.status_code)
    i / (i - 1)


start = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
    result = executor.map(get, [1, 2, 3, 4, 5, 6, 7, 8, 9])
print(time.time() - start)

如果运行的话,日志会是这个样子

200
200
200
200
200
2021-08-04 18:13:14.662 | ERROR    | concurrent.futures.thread:run:52 - An error has been caught in function 'run', process 'MainProcess' (12314), thread 'ThreadPoolExecutor-0_0' (6187937792):
Traceback (most recent call last):

  File "/opt/homebrew/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 930, in _bootstrap
    self._bootstrap_inner()
         <function Thread._bootstrap_inner at 0x100856d30>
     <Thread(ThreadPoolExecutor-0_0, started 6187937792)>

  File "/opt/homebrew/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 973, in _bootstrap_inner
    self.run()
         <function Thread.run at 0x100856a60>
     <Thread(ThreadPoolExecutor-0_0, started 6187937792)>

  File "/opt/homebrew/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/threading.py", line 910, in run
    self._target(*self._args, **self._kwargs)
                                 {}
                             <Thread(ThreadPoolExecutor-0_0, started 6187937792)>
                     (<weakref at 0x10169b220; to 'ThreadPoolExecutor' at 0x1007c5f70>, <_queue.SimpleQueue object at 0x101348590>, None, ())
                 <Thread(ThreadPoolExecutor-0_0, started 6187937792)>
         <function _worker at 0x10135a040>
     <Thread(ThreadPoolExecutor-0_0, started 6187937792)>

  File "/opt/homebrew/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 77, in _worker
    work_item.run()
              <function _WorkItem.run at 0x10135a160>
     <concurrent.futures.thread._WorkItem object at 0x101681760>

> File "/opt/homebrew/Cellar/python@3.9/3.9.6/Frameworks/Python.framework/Versions/3.9/lib/python3.9/concurrent/futures/thread.py", line 52, in run
    result = self.fn(*self.args, **self.kwargs)
                                    {}
                                <concurrent.futures.thread._WorkItem object at 0x101681760>
                         (1,)
                     <concurrent.futures.thread._WorkItem object at 0x101681760>
                  <function get at 0x101616c10>
              <concurrent.futures.thread._WorkItem object at 0x101681760>

  File "/Users/darcy/dfhx/work/projects/opcc_related/benchmark/c.py", line 13, in get
    i / (i - 1)
         1
     1

ZeroDivisionError: division by zero
200
200
200
200
0.13989615440368652