Python3 多线程编程指南

一、什么是线程?

进程是操作系统分配资源的基本单位,线程是进程内的执行单元。一个进程可以包
含多个线程,它们共享进程的内存空间,切换开销远小于进程。

进程
├── 线程 1 (主线程)
├── 线程 2
└── 线程 3

多线程适合的场景:

  • I/O 密集型任务:网络请求、文件读写、数据库查询 — 线程等待 I/O 时 CPU 可以
    切换执行其他线程
  • 不适合 CPU 密集型任务:由于 GIL 的存在,Python 多线程无法真正并行执行计算
    密集型代码(应使用 multiprocessing

二、GIL:Python 多线程的核心限制

GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器的一把全局锁,
一时刻只允许一个线程执行 Python 字节码

时间轴:
线程 A ██████░░░░██████░░░░
线程 B ░░░░██████░░░░██████
↑交替执行,并非真正并行
场景 多线程效果 推荐方案
网络请求、文件 I/O 显著提速 threading
数学计算、图像处理 几乎无提速 multiprocessing
异步 I/O 更低开销 asyncio

三、基础用法

3.1 创建线程

import threading
import time

def worker(name, delay):
print(f"[{name}] 开始")
time.sleep(delay)
print(f"[{name}] 完成,耗时 {delay}s")

# 方式一:直接传入函数
t1 = threading.Thread(target=worker, args=("线程A", 2))
t2 = threading.Thread(target=worker, args=("线程B", 1))

t1.start()
t2.start()

t1.join() # 等待 t1 完成
t2.join() # 等待 t2 完成

print("所有线程完成")

输出:

[线程A] 开始
[线程B] 开始
[线程B] 完成,耗时 1s
[线程A] 完成,耗时 2s
所有线程完成

3.2 继承 Thread 类

适合逻辑复杂、需要封装状态的场景:

import threading
import time

class DownloadThread(threading.Thread):
def __init__(self, url):
super().__init__()
self.url = url
self.result = None

def run(self):
print(f"开始下载: {self.url}")
time.sleep(1) # 模拟网络请求
self.result = f"{self.url} 的内容"
print(f"完成下载: {self.url}")

threads = [
DownloadThread("https://example.com/file1"),
DownloadThread("https://example.com/file2"),
DownloadThread("https://example.com/file3"),
]

for t in threads:
t.start()

for t in threads:
t.join()

for t in threads:
print(f"结果: {t.result}")

3.3 守护线程(Daemon Thread)

守护线程会在主线程退出时自动终止,适合后台任务(如心跳检测、日志刷新):

import threading
import time

def heartbeat():
while True:
print("💓 心跳检测...")
time.sleep(1)

t = threading.Thread(target=heartbeat, daemon=True)
t.start()

print("主程序运行中...")
time.sleep(3)
print("主程序退出,守护线程自动终止")

四、线程同步

多个线程共享数据时,可能出现竞态条件(Race Condition)

import threading

counter = 0

def increment():
global counter
for _ in range(100000):
counter += 1 # 非原子操作,存在竞态

threads = [threading.Thread(target=increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()

print(f"期望: 500000,实际: {counter}") # 实际结果每次不同!

4.1 Lock(互斥锁)

最基础的同步原语,同一时刻只有一个线程可以持有锁:

import threading

counter = 0
lock = threading.Lock()

def safe_increment():
global counter
for _ in range(100000):
with lock: # 自动 acquire / release
counter += 1

threads = [threading.Thread(target=safe_increment) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()

print(f"结果: {counter}") # 稳定输出 500000

4.2 RLock(可重入锁)

同一线程可以多次 acquire,避免自己把自己锁死:

import threading

rlock = threading.RLock()

def outer():
with rlock:
print("外层获取锁")
inner() # 同一线程再次获取不会死锁

def inner():
with rlock:
print("内层获取锁")

t = threading.Thread(target=outer)
t.start()
t.join()

4.3 Event(事件通知)

一个线程通知另一个线程某件事已发生:

import threading
import time

ready_event = threading.Event()

def producer():
print("生产者:准备数据中...")
time.sleep(2)
print("生产者:数据已就绪,发出通知")
ready_event.set() # 触发事件

def consumer():
print("消费者:等待数据...")
ready_event.wait() # 阻塞直到事件触发
print("消费者:收到通知,开始消费数据")

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t2.start()
t1.start()

t1.join()
t2.join()

4.4 Semaphore(信号量)

控制同时访问某资源的线程数量,常用于限制并发连接数:

import threading
import time

# 最多同时 3 个线程访问
sem = threading.Semaphore(3)

def access_resource(name):
with sem:
print(f"{name} 获得访问权限")
time.sleep(1)
print(f"{name} 释放访问权限")

threads = [threading.Thread(target=access_resource, args=(f"线程{i}",))
for i in range(8)]

for t in threads:
t.start()
for t in threads:
t.join()

五、线程间通信:Queue

queue.Queue 是线程安全的队列,是多线程间传递数据的推荐方式,内部已封装了锁:

import threading
import queue
import time
import random

task_queue = queue.Queue()

def producer(q):
for i in range(10):
item = f"任务-{i}"
q.put(item)
print(f"生产: {item}")
time.sleep(random.uniform(0.1, 0.3))
q.put(None) # 发送结束信号

def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(random.uniform(0.2, 0.5))
q.task_done()

p = threading.Thread(target=producer, args=(task_queue,))
c = threading.Thread(target=consumer, args=(task_queue,))

p.start()
c.start()

p.join()
c.join()
print("生产消费完成")

六、线程池:ThreadPoolExecutor

手动管理线程生命周期很繁琐,concurrent.futures.ThreadPoolExecutor 提供了更高层
的接口:

6.1 基本用法

from concurrent.futures import ThreadPoolExecutor
import time

def fetch_data(url):
time.sleep(1) # 模拟网络请求
return f"{url} 的数据"

urls = [
"https://api.example.com/user/1",
"https://api.example.com/user/2",
"https://api.example.com/user/3",
"https://api.example.com/user/4",
]

with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(fetch_data, urls))

for r in results:
print(r)

6.2 使用 Future 获取结果

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import random

def process(task_id):
delay = random.uniform(0.5, 2)
time.sleep(delay)
return {"id": task_id, "delay": round(delay, 2)}

with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(process, i): i for i in range(6)}

# as_completed 按完成顺序返回,而非提交顺序
for future in as_completed(futures):
result = future.result()
print(f"任务 {result['id']} 完成,耗时 {result['delay']}s")

6.3 异常处理

from concurrent.futures import ThreadPoolExecutor

def risky_task(n):
if n == 3:
raise ValueError(f"任务 {n} 出错了")
return n * 2

with ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(risky_task, i) for i in range(6)]

for i, future in enumerate(futures):
try:
result = future.result()
print(f"任务 {i} 结果: {result}")
except ValueError as e:
print(f"捕获异常: {e}")

七、实战示例:并发批量 HTTP 请求

import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.request import urlopen
from urllib.error import URLError

def fetch(url, timeout=5):
"""发起 HTTP 请求并返回状态"""
start = time.time()
try:
with urlopen(url, timeout=timeout) as resp:
size = len(resp.read())
elapsed = round(time.time() - start, 2)
return {"url": url, "status": resp.status, "size": size, "time": elapsed}
except URLError as e:
return {"url": url, "error": str(e)}

urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/user-agent",
"https://httpbin.org/headers",
]

print(f"串行请求预计耗时: ~{len(urls)}s")
print(f"并发请求最多耗时: ~1s\n")

start = time.time()

with ThreadPoolExecutor(max_workers=len(urls)) as executor:
futures = {executor.submit(fetch, url): url for url in urls}
for future in as_completed(futures):
result = future.result()
if "error" in result:
print(f"✗ {result['url']}: {result['error']}")
else:
print(f"✓ {result['url']}{result['status']} "
f"({result['size']} bytes, {result['time']}s)")

print(f"\n总耗时: {round(time.time() - start, 2)}s")

八、常见陷阱

陷阱 1:闭包变量捕获

import threading

# ❌ 错误:所有线程捕获的是同一个变量 i
threads = []
for i in range(5):
t = threading.Thread(target=lambda: print(i))
threads.append(t)

for t in threads:
t.start()
# 可能全部输出 4

# ✅ 正确:通过默认参数绑定当前值
threads = []
for i in range(5):
t = threading.Thread(target=lambda x=i: print(x))
threads.append(t)

for t in threads:
t.start()

陷阱 2:忘记 join 导致主线程提前退出

import threading
import time

results = []

def worker():
time.sleep(1)
results.append(42)

t = threading.Thread(target=worker)
t.start()
# ❌ 忘记 t.join(),results 可能仍为空
print(results)

# ✅ 正确
t.join()
print(results) # [42]

陷阱 3:死锁

import threading

lock_a = threading.Lock()
lock_b = threading.Lock()

# ❌ 线程 1 持有 A 等待 B,线程 2 持有 B 等待 A → 死锁
def thread1():
with lock_a:
time.sleep(0.1)
with lock_b:
print("线程1完成")

def thread2():
with lock_b:
time.sleep(0.1)
with lock_a:
print("线程2完成")

# ✅ 解决:保持所有线程以相同顺序获取锁
def thread1_safe():
with lock_a:
with lock_b:
print("线程1完成")

def thread2_safe():
with lock_a: # 与 thread1 相同顺序
with lock_b:
print("线程2完成")

九、选型建议

需求 推荐方案
并发网络请求、文件 I/O ThreadPoolExecutor
CPU 密集型计算 multiprocessing.Pool
高并发 I/O(万级连接) asyncio
简单后台任务 threading.Thread(daemon=True)
生产者消费者模型 threading + queue.Queue

总结

  • Python 多线程受 GIL 限制,适合 I/O 密集型,不适合 CPU 密集型
  • 共享数据必须用 Lock 保护,推荐用 with lock: 语法避免忘记释放
  • 线程间通信优先用 queue.Queue,它是线程安全的
  • 实际项目中优先使用 ThreadPoolExecutor,比手动管理线程更简洁安全
  • 注意闭包变量捕获和死锁两大经典陷阱