如何在Python 的線程中運行協(xié)程
在一篇文章理解Python異步編程的基本原理這篇文章中,我們講到,如果在異步代碼里面又包含了一段非常耗時的同步代碼,異步代碼就會被卡住。
那么有沒有辦法讓同步代碼與異步代碼看起來也是同時運行的呢?方法就是使用事件循環(huán)的.run_in_executor()方法。
我們來看一下 Python 官方文檔[1]中的說法:
那么怎么使用呢?還是以非常耗時的遞歸方式計算斐波那契數(shù)列的這個函數(shù)為例:
- def sync_calc_fib(n):
- if n in [1, 2]:
- return1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- async def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項計算完成,結(jié)果是:{result}')
- return result
我們現(xiàn)在需要用 aiohttp 訪問一個延遲5秒的網(wǎng)頁,同時計算斐波那契數(shù)列第36項。
首先我們看看單獨計算第36項需要5秒鐘:
我們再來看看如果直接把這計算斐波那契數(shù)列和請求網(wǎng)站的兩個異步任務(wù)放在一起“并行”,實際時間是兩個任務(wù)的時間疊加:
具體原因我在上一篇文章里面已經(jīng)做了說明。
現(xiàn)在,我想讓兩個任務(wù)“同時運行”,于是就可以這樣修改代碼:
- import aiohttp
- import asyncio
- import time
- from concurrent.futures import ThreadPoolExecutor
- async def request(sleep_time):
- async with aiohttp.ClientSession() as client:
- resp = await client.get(f'http://127.0.0.1:8000/sleep/{sleep_time}')
- resp_json = await resp.json()
- print(resp_json)
- def sync_calc_fib(n):
- if n in [1, 2]:
- return 1
- return sync_calc_fib(n - 1) + sync_calc_fib(n - 2)
- def calc_fib(n):
- result = sync_calc_fib(n)
- print(f'第 {n} 項計算完成,結(jié)果是:{result}')
- return result
- async def main():
- start = time.perf_counter()
- loop = asyncio.get_event_loop()
- with ThreadPoolExecutor(max_workers=4) as executor:
- tasks_list = [
- loop.run_in_executor(executor, calc_fib, 36),
- asyncio.create_task(request(5))
- ]
- await asyncio.gather(*tasks_list)
- end = time.perf_counter()
- print(f'總計耗時:{end - start}')
- asyncio.run(main())
運行效果如下圖所示:
在5秒鐘的時間,就把計算斐波那契數(shù)列和請求5秒延遲的網(wǎng)站都做完了。
實現(xiàn)這樣的轉(zhuǎn)變,關(guān)鍵的代碼就是:loop.run_in_executor(executor, calc_fib, 36)
其中的 loop就是主線程的事件循環(huán)(event loop),它是用來調(diào)度同一個線程里面的多個協(xié)程。
executor是我們使用ThreadPoolExecutor(max_workers=4)創(chuàng)建的一個有4個線程的線程池,calc_fib是一個耗時的同步函數(shù),36是傳入calc_fib的參數(shù)。loop.run_in_executor(executor, calc_fib, 36)的意思是說:
- 把calc_fib函數(shù)放到線程池里面去運行
- 給線程池增加一個回調(diào)函數(shù),這個回調(diào)函數(shù)會在運行結(jié)束后的下一次事件循環(huán)把結(jié)果保存下來。
請注意上圖中紅色箭頭對應(yīng)的calc_fib這是一個同步函數(shù),請與上一篇文章中的異步函數(shù)區(qū)分開。run_in_executor的第二個參數(shù)需要是一個同步函數(shù)的函數(shù)名。
在上面的例子中,我們創(chuàng)建的是有4個線程的線程池。所以這個線程池最多允許4個阻塞式的同步函數(shù)“并行”。

































