?
?
?
?1、常見(jiàn)并發(fā)類型
I/ O密集型:
藍(lán)色框表示程序執(zhí)行工作的時(shí)間,紅色框表示等待I/O操作完成的時(shí)間。此圖沒(méi)有按比例顯示,因?yàn)閕nternet上的請(qǐng)求可能比CPU指令要多花費(fèi)幾個(gè)數(shù)量級(jí)的時(shí)間,所以你的程序可能會(huì)花費(fèi)大部分時(shí)間進(jìn)行等待。
?CPU密集型:
IO密集型程序?qū)r(shí)間花在cpu計(jì)算上。
常見(jiàn)并發(fā)類型以及區(qū)別:
?
?
?2、同步版本
?我們將使用requests訪問(wèn)100個(gè)網(wǎng)頁(yè),使用同步的方式,requests的請(qǐng)求是同步的,所以代碼就很好寫了。
同步的版本代碼邏輯簡(jiǎn)單,編寫也會(huì)很相對(duì)容易。
import requests import time def download_site(url,session): with session.get(url) as response: print (len(response.content)) def download_all_site(sites): with requests.Session() as session: for url in sites: download_site(url,session) if __name__ == " __main__ " : sites = [ " https://www.baidu.com " , " https://www.jython.org " ] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print ( " 執(zhí)行時(shí)間:%s " % (end_time - start_time) + " 秒 " ) # download_site()只從一個(gè)URL下載內(nèi)容并打印其大小 # 需要知道的是我們這里沒(méi)有使用requests.get(),而使用了session.get(),我們使用requests.Session()創(chuàng)建了一個(gè)Session對(duì)象,每次請(qǐng)求使用了session.get(url,因?yàn)榭梢宰宺equests運(yùn)用一些神奇的網(wǎng)絡(luò)小技巧,從而真正使程序加速。 # 執(zhí)行時(shí)間:33.91123294830322秒
?
?
?3、多線程
?ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。
你已經(jīng)了解了Thread部分。那只是我們之前提到的一個(gè)思路。Pool部分是開(kāi)始變得有趣的地方。這個(gè)對(duì)象將創(chuàng)建一個(gè)線程池,其中的每個(gè)線程都可以并發(fā)運(yùn)行。最后,Executor是控制線程池中的每個(gè)線程如何以及何時(shí)運(yùn)行的部分。它將在線程池中執(zhí)行請(qǐng)求。
對(duì)我們很有幫助的是,標(biāo)準(zhǔn)庫(kù)將ThreadPoolExecutor實(shí)現(xiàn)為一個(gè)上下文管理器,因此你可以使用with語(yǔ)法來(lái)管理Threads池的創(chuàng)建和釋放。
一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每個(gè)站點(diǎn)上運(yùn)行傳入函數(shù)。最重要的是,它使用自己管理的線程池自動(dòng)并發(fā)地運(yùn)行它們。
來(lái)自其他語(yǔ)言,甚至Python 2的人可能想知道,在處理threading時(shí),管理你習(xí)慣的細(xì)節(jié)的常用對(duì)象和函數(shù)在哪里,比如Thread.start()、Thread.join()和Queue。
這些都還在那里,你可以使用它們來(lái)實(shí)現(xiàn)對(duì)線程運(yùn)行方式的精細(xì)控制。但是,從Python 3.2開(kāi)始,標(biāo)準(zhǔn)庫(kù)添加了一個(gè)更高級(jí)別的抽象,稱為Executor,如果你不需要精細(xì)控制,它可以為你管理許多細(xì)節(jié)。
本例中另一個(gè)有趣的更改是,每個(gè)線程都需要?jiǎng)?chuàng)建自己的request . Session()對(duì)象。當(dāng)你查看requests的文檔時(shí),不一定就能很容易地看出,但在閱讀這個(gè)問(wèn)題(https://github.com/requests/requests/issues/2766? )時(shí),你會(huì)清晰地發(fā)現(xiàn)每個(gè)線程都需要一個(gè)單獨(dú)的Session。
這是threading中有趣且困難的問(wèn)題之一。因?yàn)椴僮飨到y(tǒng)可以控制任務(wù)何時(shí)中斷,何時(shí)啟動(dòng)另一個(gè)任務(wù),所以線程之間共享的任何數(shù)據(jù)都需要被保護(hù)起來(lái),或者說(shuō)是線程安全的。不幸的是,requests . Session()不是線程安全的。
根據(jù)數(shù)據(jù)是什么以及如何你使用它們,有幾種策略可以使數(shù)據(jù)訪問(wèn)變成線程安全的。其中之一是使用線程安全的數(shù)據(jù)結(jié)構(gòu),比如來(lái)自 Python的queue模塊的Queue。
這些對(duì)象使用低級(jí)基本數(shù)據(jù)類型,比如threading.Lock,以確保只有一個(gè)線程可以同時(shí)訪問(wèn)代碼塊或內(nèi)存塊。你可以通過(guò)ThreadPoolExecutor對(duì)象間接地使用此策略。
import requests import concurrent.futures import threading import time # 創(chuàng)建線程池 thread_local= threading.local() def get_session(): if not getattr(thread_local, " session " ,None): thread_local.session = requests.Session() return thread_local.session def download_site(url): session = get_session() with session.get(url) as response: print (len(response.content)) def download_all_site(sites): with concurrent.futures.ThreadPoolExecutor(max_workers =5 ) as exector: exector.map(download_site,sites) if __name__ == " __main__ " : sites = [ " https://www.baidu.com " , " https://www.jython.org " ] * 50 start_time = time.time() download_all_site(sites) end_time = time.time() print ( " 執(zhí)行時(shí)間:%s " % (end_time - start_time) + " 秒 " ) # 執(zhí)行時(shí)間:6.152076244354248秒
?
這里要使用的另一種策略是線程本地存儲(chǔ)。Threading.local()會(huì)創(chuàng)建一個(gè)對(duì)象,它看起來(lái)像一個(gè)全局對(duì)象但又是特定于每個(gè)線程的。在我們的示例中,這是通過(guò)threadLocal和get_session()完成的:
ThreadLocal是threading模塊中專門用來(lái)解決這個(gè)問(wèn)題的。它看起來(lái)有點(diǎn)奇怪,但是你只想創(chuàng)建其中一個(gè)對(duì)象,而不是為每個(gè)線程創(chuàng)建一個(gè)對(duì)象。對(duì)象本身將負(fù)責(zé)從不同的線程到不同的數(shù)據(jù)的分開(kāi)訪問(wèn)。
當(dāng)get_session()被調(diào)用時(shí),它所查找的session是特定于它所運(yùn)行的線程的。因此,每個(gè)線程都將在第一次調(diào)用get_session()時(shí)創(chuàng)建一個(gè)單個(gè)的會(huì)話,然后在整個(gè)生命周期中對(duì)每個(gè)后續(xù)調(diào)用使用該會(huì)話。
最后,簡(jiǎn)要介紹一下選擇線程的數(shù)量。你可以看到示例代碼使用了5個(gè)線程。隨意改變這個(gè)數(shù)字,看看總時(shí)間是如何變化的。你可能認(rèn)為每次下載只有一個(gè)線程是最快的,但至少在我的系統(tǒng)上不是這樣。我在5到10個(gè)線程之間找到了最快的結(jié)果。如果超過(guò)這個(gè)值,那么創(chuàng)建和銷毀線程的額外開(kāi)銷就會(huì)抵消程序節(jié)省的時(shí)間。
這里比較困難的答案是,從一個(gè)任務(wù)到另一個(gè)任務(wù)的正確線程數(shù)不是一個(gè)常量。需要進(jìn)行一些實(shí)驗(yàn)來(lái)得到。
?注意:request . Session()不是線程安全的。這意味著,如果多個(gè)線程使用同一個(gè)Session,那么在某些地方可能會(huì)發(fā)生上面描述的交互類型問(wèn)題。
?
多線程代碼的執(zhí)行時(shí)序表:
?
?
?4、異步IO
asyncio的一般概念是一個(gè)單個(gè)的Python對(duì)象,稱為事件循環(huán),它控制每個(gè)任務(wù)如何以及何時(shí)運(yùn)行。事件循環(huán)會(huì)關(guān)注每個(gè)任務(wù)并知道它處于什么狀態(tài)。在實(shí)際中,任務(wù)可以處于許多狀態(tài),但現(xiàn)在我們假設(shè)一個(gè)簡(jiǎn)化的只有兩種狀態(tài)的事件循環(huán)。
就緒狀態(tài)將表明一個(gè)任務(wù)有工作要做,并且已經(jīng)準(zhǔn)備好運(yùn)行,而等待狀態(tài)意味著該任務(wù)正在等待一些外部工作完成,例如網(wǎng)絡(luò)操作。
我們簡(jiǎn)化的事件循環(huán)維護(hù)兩個(gè)任務(wù)列表,每一個(gè)對(duì)應(yīng)這些狀態(tài)。它會(huì)選擇一個(gè)就緒的任務(wù),然后重新啟動(dòng)它。該任務(wù)處于完全控制之中,直到它配合地將控制權(quán)交還給事件循環(huán)為止。
當(dāng)正在運(yùn)行的任務(wù)將控制權(quán)交還給事件循環(huán)時(shí),事件循環(huán)將該任務(wù)放入就緒或等待列表中,然后遍歷等待列表中的每個(gè)任務(wù),以查看I/O操作完成后某個(gè)任務(wù)是否已經(jīng)就緒。時(shí)間循環(huán)知道就緒列表中的任務(wù)仍然是就緒的,因?yàn)樗浪鼈冞€沒(méi)有運(yùn)行。
一旦所有的任務(wù)都重新排序到正確的列表中,事件循環(huán)將選擇下一個(gè)要運(yùn)行的任務(wù),然后重復(fù)這個(gè)過(guò)程。我們簡(jiǎn)化的事件循環(huán)會(huì)選擇等待時(shí)間最長(zhǎng)的任務(wù)并運(yùn)行該任務(wù)。此過(guò)程會(huì)一直重復(fù),直到事件循環(huán)結(jié)束。
asyncio的一個(gè)重要之處在于,如果沒(méi)有刻意去釋放控制權(quán),任務(wù)是永遠(yuǎn)不會(huì)放棄控制權(quán)的。它們?cè)诓僮鬟^(guò)程中從不會(huì)被打斷。這使得我們?cè)赼syncio中比在threading中能更容易地共享資源。你不必?fù)?dān)心代碼是否是線程安全的。
import time import asyncio from aiohttp import ClientSession async def download_site(session,url): global i try : async with session.get(url) as response: i =i+1 print (i) return await response.read() except Exception as e: pass async def download_all_site(sites): async with ClientSession() as session: tasks = [] for url in sites: task = asyncio.create_task(download_site(session,url)) tasks.append(task) result = await asyncio.gather(*tasks) # 等待一組協(xié)程運(yùn)行結(jié)束并接收結(jié)果 print (result) if __name__ == " __main__ " : i = 0 sites = [ " http://www.360kuai.com/ " , " https://www.jython.org " ] * 50 start_time = time.time() asyncio.run(download_all_site(sites)) end_time = time.time() print ( " 執(zhí)行時(shí)間:%s " % (end_time - start_time) + " 秒 " )
#執(zhí)行時(shí)間:5.29184889793396秒
異步IO的執(zhí)行時(shí)序表:
asyncio版本的問(wèn)題
此時(shí)asyncio有兩個(gè)問(wèn)題。你需要特殊的異步版本的庫(kù)來(lái)充分利用asycio。如果你只是使用requests下載站點(diǎn),那么速度會(huì)慢得多,因?yàn)閞equests的設(shè)計(jì)目的不是通知事件循環(huán)它被阻塞了。隨著時(shí)間的推移,這個(gè)問(wèn)題變得微不足道,因?yàn)樵絹?lái)越多的庫(kù)包含了asyncio。
另一個(gè)更微妙的問(wèn)題是,如果其中一個(gè)任務(wù)不合作,那么協(xié)作多任務(wù)處理的所有優(yōu)勢(shì)都將不存在。代碼中的一個(gè)小錯(cuò)誤可能會(huì)導(dǎo)致任務(wù)運(yùn)行超時(shí)并長(zhǎng)時(shí)間占用處理器,使需要運(yùn)行的其他任務(wù)無(wú)法運(yùn)行。如果一個(gè)任務(wù)沒(méi)有將控制權(quán)交還給事件循環(huán),則事件循環(huán)無(wú)法中斷它。
考慮到這一點(diǎn),我們來(lái)開(kāi)始討論一種完全不同的并發(fā)性——multiprocessing。
?
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
