Python3 多线程编程指南 一、什么是线程? 进程 是操作系统分配资源的基本单位,线程 是进程内的执行单元。一个进程可以包 含多个线程,它们共享进程的内存空间,切换开销远小于进程。
进程 ├── 线程 1 (主线程) ├── 线程 2 └── 线程 3
多线程适合的场景:
I/O 密集型任务 :网络请求、文件读写、数据库查询 — 线程等待 I/O 时 CPU 可以 切换执行其他线程
不适合 CPU 密集型任务 :由于 GIL 的存在,Python 多线程无法真正并行执行计算 密集型代码(应使用 multiprocessing)
二、GIL:Python 多线程的核心限制 GIL(Global Interpreter Lock,全局解释器锁)是 CPython 解释器的一把全局锁,同 一时刻只允许一个线程执行 Python 字节码 。
时间轴: 线程 A ██████░░░░██████░░░░ 线程 B ░░░░██████░░░░██████ ↑交替执行,并非真正并行
场景
多线程效果
推荐方案
网络请求、文件 I/O
显著提速
threading
数学计算、图像处理
几乎无提速
multiprocessing
异步 I/O
更低开销
asyncio
三、基础用法 3.1 创建线程 import threadingimport timedef worker (name, delay ): print (f"[{name} ] 开始" ) time.sleep(delay) print (f"[{name} ] 完成,耗时 {delay} s" ) t1 = threading.Thread(target=worker, args=("线程A" , 2 )) t2 = threading.Thread(target=worker, args=("线程B" , 1 )) t1.start() t2.start() t1.join() t2.join() print ("所有线程完成" )
输出:
[线程A] 开始 [线程B] 开始 [线程B] 完成,耗时 1s [线程A] 完成,耗时 2s 所有线程完成
3.2 继承 Thread 类 适合逻辑复杂、需要封装状态的场景:
import threadingimport timeclass DownloadThread (threading.Thread): def __init__ (self, url ): super ().__init__() self .url = url self .result = None def run (self ): print (f"开始下载: {self.url} " ) time.sleep(1 ) self .result = f"{self.url} 的内容" print (f"完成下载: {self.url} " ) threads = [ DownloadThread("https://example.com/file1" ), DownloadThread("https://example.com/file2" ), DownloadThread("https://example.com/file3" ), ] for t in threads: t.start() for t in threads: t.join() for t in threads: print (f"结果: {t.result} " )
3.3 守护线程(Daemon Thread) 守护线程会在主线程退出时自动终止,适合后台任务(如心跳检测、日志刷新):
import threadingimport timedef heartbeat (): while True : print ("💓 心跳检测..." ) time.sleep(1 ) t = threading.Thread(target=heartbeat, daemon=True ) t.start() print ("主程序运行中..." )time.sleep(3 ) print ("主程序退出,守护线程自动终止" )
四、线程同步 多个线程共享数据时,可能出现竞态条件(Race Condition) :
import threadingcounter = 0 def increment (): global counter for _ in range (100000 ): counter += 1 threads = [threading.Thread(target=increment) for _ in range (5 )] for t in threads: t.start() for t in threads: t.join() print (f"期望: 500000,实际: {counter} " )
4.1 Lock(互斥锁) 最基础的同步原语,同一时刻只有一个线程可以持有锁:
import threadingcounter = 0 lock = threading.Lock() def safe_increment (): global counter for _ in range (100000 ): with lock: counter += 1 threads = [threading.Thread(target=safe_increment) for _ in range (5 )] for t in threads: t.start() for t in threads: t.join() print (f"结果: {counter} " )
4.2 RLock(可重入锁) 同一线程可以多次 acquire,避免自己把自己锁死:
import threadingrlock = threading.RLock() def outer (): with rlock: print ("外层获取锁" ) inner() def inner (): with rlock: print ("内层获取锁" ) t = threading.Thread(target=outer) t.start() t.join()
4.3 Event(事件通知) 一个线程通知另一个线程某件事已发生:
import threadingimport timeready_event = threading.Event() def producer (): print ("生产者:准备数据中..." ) time.sleep(2 ) print ("生产者:数据已就绪,发出通知" ) ready_event.set () def consumer (): print ("消费者:等待数据..." ) ready_event.wait() print ("消费者:收到通知,开始消费数据" ) t1 = threading.Thread(target=producer) t2 = threading.Thread(target=consumer) t2.start() t1.start() t1.join() t2.join()
4.4 Semaphore(信号量) 控制同时访问某资源的线程数量,常用于限制并发连接数:
import threadingimport timesem = threading.Semaphore(3 ) def access_resource (name ): with sem: print (f"{name} 获得访问权限" ) time.sleep(1 ) print (f"{name} 释放访问权限" ) threads = [threading.Thread(target=access_resource, args=(f"线程{i} " ,)) for i in range (8 )] for t in threads: t.start() for t in threads: t.join()
五、线程间通信:Queue queue.Queue 是线程安全的队列,是多线程间传递数据的推荐方式,内部已封装了锁:
import threadingimport queueimport timeimport randomtask_queue = queue.Queue() def producer (q ): for i in range (10 ): item = f"任务-{i} " q.put(item) print (f"生产: {item} " ) time.sleep(random.uniform(0.1 , 0.3 )) q.put(None ) def consumer (q ): while True : item = q.get() if item is None : break print (f"消费: {item} " ) time.sleep(random.uniform(0.2 , 0.5 )) q.task_done() p = threading.Thread(target=producer, args=(task_queue,)) c = threading.Thread(target=consumer, args=(task_queue,)) p.start() c.start() p.join() c.join() print ("生产消费完成" )
六、线程池:ThreadPoolExecutor 手动管理线程生命周期很繁琐,concurrent.futures.ThreadPoolExecutor 提供了更高层 的接口:
6.1 基本用法 from concurrent.futures import ThreadPoolExecutorimport timedef fetch_data (url ): time.sleep(1 ) return f"{url} 的数据" urls = [ "https://api.example.com/user/1" , "https://api.example.com/user/2" , "https://api.example.com/user/3" , "https://api.example.com/user/4" , ] with ThreadPoolExecutor(max_workers=4 ) as executor: results = list (executor.map (fetch_data, urls)) for r in results: print (r)
6.2 使用 Future 获取结果 from concurrent.futures import ThreadPoolExecutor, as_completedimport timeimport randomdef process (task_id ): delay = random.uniform(0.5 , 2 ) time.sleep(delay) return {"id" : task_id, "delay" : round (delay, 2 )} with ThreadPoolExecutor(max_workers=3 ) as executor: futures = {executor.submit(process, i): i for i in range (6 )} for future in as_completed(futures): result = future.result() print (f"任务 {result['id' ]} 完成,耗时 {result['delay' ]} s" )
6.3 异常处理 from concurrent.futures import ThreadPoolExecutordef risky_task (n ): if n == 3 : raise ValueError(f"任务 {n} 出错了" ) return n * 2 with ThreadPoolExecutor(max_workers=4 ) as executor: futures = [executor.submit(risky_task, i) for i in range (6 )] for i, future in enumerate (futures): try : result = future.result() print (f"任务 {i} 结果: {result} " ) except ValueError as e: print (f"捕获异常: {e} " )
七、实战示例:并发批量 HTTP 请求 import threadingimport timefrom concurrent.futures import ThreadPoolExecutor, as_completedfrom urllib.request import urlopenfrom urllib.error import URLErrordef fetch (url, timeout=5 ): """发起 HTTP 请求并返回状态""" start = time.time() try : with urlopen(url, timeout=timeout) as resp: size = len (resp.read()) elapsed = round (time.time() - start, 2 ) return {"url" : url, "status" : resp.status, "size" : size, "time" : elapsed} except URLError as e: return {"url" : url, "error" : str (e)} urls = [ "https://httpbin.org/get" , "https://httpbin.org/ip" , "https://httpbin.org/user-agent" , "https://httpbin.org/headers" , ] print (f"串行请求预计耗时: ~{len (urls)} s" )print (f"并发请求最多耗时: ~1s\n" )start = time.time() with ThreadPoolExecutor(max_workers=len (urls)) as executor: futures = {executor.submit(fetch, url): url for url in urls} for future in as_completed(futures): result = future.result() if "error" in result: print (f"✗ {result['url' ]} : {result['error' ]} " ) else : print (f"✓ {result['url' ]} → {result['status' ]} " f"({result['size' ]} bytes, {result['time' ]} s)" ) print (f"\n总耗时: {round (time.time() - start, 2 )} s" )
八、常见陷阱 陷阱 1:闭包变量捕获 import threadingthreads = [] for i in range (5 ): t = threading.Thread(target=lambda : print (i)) threads.append(t) for t in threads: t.start() threads = [] for i in range (5 ): t = threading.Thread(target=lambda x=i: print (x)) threads.append(t) for t in threads: t.start()
陷阱 2:忘记 join 导致主线程提前退出 import threadingimport timeresults = [] def worker (): time.sleep(1 ) results.append(42 ) t = threading.Thread(target=worker) t.start() print (results)t.join() print (results)
陷阱 3:死锁 import threadinglock_a = threading.Lock() lock_b = threading.Lock() def thread1 (): with lock_a: time.sleep(0.1 ) with lock_b: print ("线程1完成" ) def thread2 (): with lock_b: time.sleep(0.1 ) with lock_a: print ("线程2完成" ) def thread1_safe (): with lock_a: with lock_b: print ("线程1完成" ) def thread2_safe (): with lock_a: with lock_b: print ("线程2完成" )
九、选型建议
需求
推荐方案
并发网络请求、文件 I/O
ThreadPoolExecutor
CPU 密集型计算
multiprocessing.Pool
高并发 I/O(万级连接)
asyncio
简单后台任务
threading.Thread(daemon=True)
生产者消费者模型
threading + queue.Queue
总结
Python 多线程受 GIL 限制,适合 I/O 密集型,不适合 CPU 密集型
共享数据必须用 Lock 保护,推荐用 with lock: 语法避免忘记释放
线程间通信优先用 queue.Queue,它是线程安全的
实际项目中优先使用 ThreadPoolExecutor,比手动管理线程更简洁安全
注意闭包变量捕获和死锁两大经典陷阱