Python 协程详解与技巧总结

Python 协程详解与技巧总结
Python 协程详解与技巧总结一、协程是什么协程Coroutine是一种用户态的轻量级线程由程序自主控制调度通过协作式多任务处理实现并发。与普通函数不同协程可以在执行中途主动挂起自己稍后再从暂停处恢复运行。通俗理解普通函数像一次性的过山车上车跑完全程到站下车协程像一辆可以随时靠站停车、再随时启动的私家车状态变量和上下文全都在。协程 vs 线程 vs 进程特性协程线程进程调度方式用户态协作式主动让出内核态抢占式时间片轮转内核态独立调度内存占用1-10KB1-10MB独立地址空间MB级并发数量10万级数百级数十级适用场景I/O密集型I/O密集型有限并发CPU密集型多核并行每个协程仅需约5KB内存线程约8MB切换耗时仅0.1-1μs线程切换需5-30μs。由于所有协程在同一线程内调度天然避免了多线程的数据竞争问题。二、协程的演进历程Python协程经历了三个阶段Python 2.5生成器协程通过yield和.send()手动实现协程语法晦涩极易出错。Python 3.4过渡期asyncio.coroutineyield from正式纳入标准库但语法仍有缝合感。Python 3.7现代协程async def/await语法优雅自然是目前推荐的写法。三、核心语法与API3.1 基础语法import asyncio # async def 定义协程函数调用返回协程对象不立即执行 async def say_after(delay, what): await asyncio.sleep(delay) # await 挂起当前协程等待异步操作完成 print(what) # asyncio.run() 启动事件循环的入口程序中仅调用一次 async def main(): # 直接 await 是串行执行 await say_after(1, hello) await say_after(2, world) # 总耗时约 3 秒 asyncio.run(main())async def定义协程函数返回协程对象await挂起当前协程等待右侧可等待对象协程/Task/Future完成asyncio.run()创建事件循环、运行协程并关闭循环Python 3.7推荐入口3.2 并发执行create_task vs gather直接 await 是串行要实现并发必须创建 Taskasync def main(): # 方式一create_task 创建任务实现并发 task1 asyncio.create_task(say_after(1, hello)) task2 asyncio.create_task(say_after(2, world)) await task1 await task2 # 总耗时约 2 秒 # 方式二gather 批量并发 await asyncio.gather( say_after(1, hello), say_after(2, world) )create_task相当于发射后不管后面需要结果时再await拿结果-。asyncio.gather则同时等待所有任务完成。3.3 TaskGroupPython 3.11TaskGroup是create_task的更现代替代方案上下文管理器退出时自动等待所有任务完成async def main(): async with asyncio.TaskGroup() as tg: task1 tg.create_task(say_after(1, hello)) task2 tg.create_task(say_after(2, world)) # 退出上下文时自动 await 所有任务四、底层原理事件循环与调度机制4.1 事件循环Event Loop事件循环是协程的调度中心本质上是一个无限循环负责监听、调度和执行异步任务。工作流程任务注册协程通过create_task注册到事件循环任务执行协程执行到await时主动挂起释放控制权I/O监听事件循环通过epoll/kqueue监听文件描述符任务恢复I/O就绪后事件循环唤醒协程从暂停处继续执行4.2 协程的底层本质async def函数底层仍基于生成器generator实现遇到await时协程保存当前执行上下文局部变量、指令指针返回控制权给事件循环被await的对象就绪后事件循环调用协程的send(None)从上次暂停处继续执行整个过程在单线程内完成没有线程切换开销4.3 可等待对象Awaitable可在await语句中使用的对象有三种主要类型协程Coroutineasync def定义的函数任务Task用create_task包装的协程可跟踪执行状态Future表示异步操作的最终结果五、实战技巧技巧1控制并发度 —— Semaphore信号量批量请求时需限制并发数避免打爆目标服务器或耗尽本地资源import asyncio import aiohttp sem asyncio.Semaphore(3) # 最多同时 3 个协程执行 async def fetch(url, session): async with sem: # 申请并发名额完成后自动释放 timeout aiohttp.ClientTimeout(total5) async with session.get(url, timeouttimeout) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: tasks [fetch(fhttps://example.com/{i}, session) for i in range(100)] results await asyncio.gather(*tasks, return_exceptionsTrue)建议并发数控制在50-100之间根据目标服务器承受能力动态调整-20。技巧2异常处理 —— return_exceptions异步任务中的异常不会自动传播使用return_exceptionsTrue收集异常作为结果返回避免单个任务失败导致整个gather崩溃results await asyncio.gather( task1, task2, task3, return_exceptionsTrue ) for result in results: if isinstance(result, Exception): print(f任务失败: {result}) else: print(f任务成功: {result})技巧3超时控制 —— wait_forasync def long_task(): await asyncio.sleep(10) async def main(): try: await asyncio.wait_for(long_task(), timeout1.0) except asyncio.TimeoutError: print(任务超时)技巧4异步上下文管理器 —— async with管理需要异步获取和释放的资源数据库连接、网络连接等class AsyncDatabaseConnection: async def __aenter__(self): await asyncio.sleep(1) # 模拟异步连接 return self async def __aexit__(self, exc_type, exc, tb): await asyncio.sleep(0.5) # 模拟异步关闭 async def main(): async with AsyncDatabaseConnection() as db: await asyncio.sleep(2) # 执行查询技巧5复用连接池使用aiohttp.ClientSession或httpx.AsyncClient复用TCP连接避免每次请求都经历三次握手async def main(): async with aiohttp.ClientSession() as session: # 所有请求复用同一个 session共享连接池 tasks [fetch(url, session) for url in urls] await asyncio.gather(*tasks)六、常见坑与避坑指南❌ 坑1在协程中调用阻塞函数# 错误会冻结整个事件循环 async def bad(): time.sleep(1) # 阻塞 requests.get(url) # 阻塞✅ 正确做法使用异步替代方案async def good(): await asyncio.sleep(1) # 异步休眠 async with aiohttp.ClientSession() as session: await session.get(url) # 异步HTTP请求❌ 坑2混淆 await 和并发# 错误串行执行总耗时 3 秒 await coro1() await coro2() await coro3() # 正确并发执行总耗时约 1 秒 await asyncio.gather(coro1(), coro2(), coro3())await不等于并发await asyncio.gather(...)才是并发。❌ 坑3直接调用协程函数# 错误直接调用不会执行只返回协程对象 main() # coroutine object main at 0x... # 正确必须 await 或通过 asyncio.run 执行 asyncio.run(main())❌ 坑4CPU密集型任务使用协程协程提升的是I/O并发效率不是CPU计算速度。CPU密集型任务应使用多进程或run_in_executor否则会阻塞事件循环import concurrent.futures async def cpu_bound_task(): loop asyncio.get_running_loop() with concurrent.futures.ProcessPoolExecutor() as pool: result await loop.run_in_executor(pool, heavy_computation) return result七、总结要点说明适用场景I/O密集型任务网络请求、文件读写、数据库查询不适用CPU密集型任务应使用多进程核心优势单线程高并发、低资源消耗、无锁竞争、代码可读性好关键APIasync def、await、asyncio.run()、create_task、gather、SemaphorePython版本建议 Python 3.7TaskGroup 需要 3.11协程的核心可以总结为一句话在I/O等待时主动让出CPU让单线程在等待期间去处理其他任务从而实现高效的并发。