日韩久久久精品,亚洲精品久久久久久久久久久,亚洲欧美一区二区三区国产精品 ,一区二区福利

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響

系統(tǒng) 1899 0

?

?

?

?1、常見(jiàn)并發(fā)類型

I/ O密集型:

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第1張圖片

            藍(lán)色框表示程序執(zhí)行工作的時(shí)間,紅色框表示等待I/O操作完成的時(shí)間。此圖沒(méi)有按比例顯示,因?yàn)閕nternet上的請(qǐng)求可能比CPU指令要多花費(fèi)幾個(gè)數(shù)量級(jí)的時(shí)間,所以你的程序可能會(huì)花費(fèi)大部分時(shí)間進(jìn)行等待。
          

?CPU密集型:

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第2張圖片

            IO密集型程序?qū)r(shí)間花在cpu計(jì)算上。
          

常見(jiàn)并發(fā)類型以及區(qū)別:

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第3張圖片

?

?

?2、同步版本

?我們將使用requests訪問(wèn)100個(gè)網(wǎng)頁(yè),使用同步的方式,requests的請(qǐng)求是同步的,所以代碼就很好寫了。

同步的版本代碼邏輯簡(jiǎn)單,編寫也會(huì)很相對(duì)容易。

            
              import
            
            
               requests

            
            
              import
            
            
               time

            
            
              def
            
            
               download_site(url,session):
    with session.get(url) as response:
        
            
            
              print
            
            
              (len(response.content))


            
            
              def
            
            
               download_all_site(sites):
    with requests.Session() as session:
        
            
            
              for
            
             url 
            
              in
            
            
               sites:
            download_site(url,session)


            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    sites 
            
            = [
            
              "
            
            
              https://www.baidu.com
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    download_all_site(sites)
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              執(zhí)行時(shí)間:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            
              )

            
            
              #
            
            
              download_site()只從一個(gè)URL下載內(nèi)容并打印其大小
            
            
              
#
            
            
              需要知道的是我們這里沒(méi)有使用requests.get(),而使用了session.get(),我們使用requests.Session()創(chuàng)建了一個(gè)Session對(duì)象,每次請(qǐng)求使用了session.get(url,因?yàn)榭梢宰宺equests運(yùn)用一些神奇的網(wǎng)絡(luò)小技巧,從而真正使程序加速。
            
            
              
#
            
            
              執(zhí)行時(shí)間:33.91123294830322秒
            
          

?

?

?3、多線程

?ThreadPoolExecutor,: ThreadPoolExecutor =Thread+Pool+ Executor。

你已經(jīng)了解了Thread部分。那只是我們之前提到的一個(gè)思路。Pool部分是開(kāi)始變得有趣的地方。這個(gè)對(duì)象將創(chuàng)建一個(gè)線程池,其中的每個(gè)線程都可以并發(fā)運(yùn)行。最后,Executor是控制線程池中的每個(gè)線程如何以及何時(shí)運(yùn)行的部分。它將在線程池中執(zhí)行請(qǐng)求。

對(duì)我們很有幫助的是,標(biāo)準(zhǔn)庫(kù)將ThreadPoolExecutor實(shí)現(xiàn)為一個(gè)上下文管理器,因此你可以使用with語(yǔ)法來(lái)管理Threads池的創(chuàng)建和釋放。

一旦有了ThreadPoolExecutor,你就可以使用它方便的.map()方法。此方法在列表中的每個(gè)站點(diǎn)上運(yùn)行傳入函數(shù)。最重要的是,它使用自己管理的線程池自動(dòng)并發(fā)地運(yùn)行它們。

來(lái)自其他語(yǔ)言,甚至Python 2的人可能想知道,在處理threading時(shí),管理你習(xí)慣的細(xì)節(jié)的常用對(duì)象和函數(shù)在哪里,比如Thread.start()、Thread.join()和Queue。

這些都還在那里,你可以使用它們來(lái)實(shí)現(xiàn)對(duì)線程運(yùn)行方式的精細(xì)控制。但是,從Python 3.2開(kāi)始,標(biāo)準(zhǔn)庫(kù)添加了一個(gè)更高級(jí)別的抽象,稱為Executor,如果你不需要精細(xì)控制,它可以為你管理許多細(xì)節(jié)。

本例中另一個(gè)有趣的更改是,每個(gè)線程都需要?jiǎng)?chuàng)建自己的request . Session()對(duì)象。當(dāng)你查看requests的文檔時(shí),不一定就能很容易地看出,但在閱讀這個(gè)問(wèn)題(https://github.com/requests/requests/issues/2766? )時(shí),你會(huì)清晰地發(fā)現(xiàn)每個(gè)線程都需要一個(gè)單獨(dú)的Session。

這是threading中有趣且困難的問(wèn)題之一。因?yàn)椴僮飨到y(tǒng)可以控制任務(wù)何時(shí)中斷,何時(shí)啟動(dòng)另一個(gè)任務(wù),所以線程之間共享的任何數(shù)據(jù)都需要被保護(hù)起來(lái),或者說(shuō)是線程安全的。不幸的是,requests . Session()不是線程安全的。

根據(jù)數(shù)據(jù)是什么以及如何你使用它們,有幾種策略可以使數(shù)據(jù)訪問(wèn)變成線程安全的。其中之一是使用線程安全的數(shù)據(jù)結(jié)構(gòu),比如來(lái)自 Python的queue模塊的Queue。

這些對(duì)象使用低級(jí)基本數(shù)據(jù)類型,比如threading.Lock,以確保只有一個(gè)線程可以同時(shí)訪問(wèn)代碼塊或內(nèi)存塊。你可以通過(guò)ThreadPoolExecutor對(duì)象間接地使用此策略。

            
              import
            
            
               requests

            
            
              import
            
            
               concurrent.futures

            
            
              import
            
            
               threading

            
            
              import
            
            
               time


            
            
              #
            
            
              創(chuàng)建線程池
            
            
thread_local=
            
               threading.local()


            
            
              def
            
            
               get_session():
    
            
            
              if
            
            
              not
            
             getattr(thread_local,
            
              "
            
            
              session
            
            
              "
            
            
              ,None):
        thread_local.session 
            
            =
            
               requests.Session()
    
            
            
              return
            
            
               thread_local.session


            
            
              def
            
            
               download_site(url):
    session 
            
            =
            
               get_session()
    with session.get(url) as response:
        
            
            
              print
            
            
              (len(response.content))


            
            
              def
            
            
               download_all_site(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers
            
            =5
            
              ) as exector:
        exector.map(download_site,sites)


            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    sites 
            
            = [
            
              "
            
            
              https://www.baidu.com
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    download_all_site(sites)
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              執(zhí)行時(shí)間:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            
              )

            
            
              #
            
            
              執(zhí)行時(shí)間:6.152076244354248秒
            
          

?

這里要使用的另一種策略是線程本地存儲(chǔ)。Threading.local()會(huì)創(chuàng)建一個(gè)對(duì)象,它看起來(lái)像一個(gè)全局對(duì)象但又是特定于每個(gè)線程的。在我們的示例中,這是通過(guò)threadLocal和get_session()完成的:

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第4張圖片

ThreadLocal是threading模塊中專門用來(lái)解決這個(gè)問(wèn)題的。它看起來(lái)有點(diǎn)奇怪,但是你只想創(chuàng)建其中一個(gè)對(duì)象,而不是為每個(gè)線程創(chuàng)建一個(gè)對(duì)象。對(duì)象本身將負(fù)責(zé)從不同的線程到不同的數(shù)據(jù)的分開(kāi)訪問(wèn)。

當(dāng)get_session()被調(diào)用時(shí),它所查找的session是特定于它所運(yùn)行的線程的。因此,每個(gè)線程都將在第一次調(diào)用get_session()時(shí)創(chuàng)建一個(gè)單個(gè)的會(huì)話,然后在整個(gè)生命周期中對(duì)每個(gè)后續(xù)調(diào)用使用該會(huì)話。

最后,簡(jiǎn)要介紹一下選擇線程的數(shù)量。你可以看到示例代碼使用了5個(gè)線程。隨意改變這個(gè)數(shù)字,看看總時(shí)間是如何變化的。你可能認(rèn)為每次下載只有一個(gè)線程是最快的,但至少在我的系統(tǒng)上不是這樣。我在5到10個(gè)線程之間找到了最快的結(jié)果。如果超過(guò)這個(gè)值,那么創(chuàng)建和銷毀線程的額外開(kāi)銷就會(huì)抵消程序節(jié)省的時(shí)間。

這里比較困難的答案是,從一個(gè)任務(wù)到另一個(gè)任務(wù)的正確線程數(shù)不是一個(gè)常量。需要進(jìn)行一些實(shí)驗(yàn)來(lái)得到。

?注意:request . Session()不是線程安全的。這意味著,如果多個(gè)線程使用同一個(gè)Session,那么在某些地方可能會(huì)發(fā)生上面描述的交互類型問(wèn)題。

?

多線程代碼的執(zhí)行時(shí)序表:

? python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第5張圖片

?

?

?4、異步IO

asyncio的一般概念是一個(gè)單個(gè)的Python對(duì)象,稱為事件循環(huán),它控制每個(gè)任務(wù)如何以及何時(shí)運(yùn)行。事件循環(huán)會(huì)關(guān)注每個(gè)任務(wù)并知道它處于什么狀態(tài)。在實(shí)際中,任務(wù)可以處于許多狀態(tài),但現(xiàn)在我們假設(shè)一個(gè)簡(jiǎn)化的只有兩種狀態(tài)的事件循環(huán)。

就緒狀態(tài)將表明一個(gè)任務(wù)有工作要做,并且已經(jīng)準(zhǔn)備好運(yùn)行,而等待狀態(tài)意味著該任務(wù)正在等待一些外部工作完成,例如網(wǎng)絡(luò)操作。

我們簡(jiǎn)化的事件循環(huán)維護(hù)兩個(gè)任務(wù)列表,每一個(gè)對(duì)應(yīng)這些狀態(tài)。它會(huì)選擇一個(gè)就緒的任務(wù),然后重新啟動(dòng)它。該任務(wù)處于完全控制之中,直到它配合地將控制權(quán)交還給事件循環(huán)為止。

當(dāng)正在運(yùn)行的任務(wù)將控制權(quán)交還給事件循環(huán)時(shí),事件循環(huán)將該任務(wù)放入就緒或等待列表中,然后遍歷等待列表中的每個(gè)任務(wù),以查看I/O操作完成后某個(gè)任務(wù)是否已經(jīng)就緒。時(shí)間循環(huán)知道就緒列表中的任務(wù)仍然是就緒的,因?yàn)樗浪鼈冞€沒(méi)有運(yùn)行。

一旦所有的任務(wù)都重新排序到正確的列表中,事件循環(huán)將選擇下一個(gè)要運(yùn)行的任務(wù),然后重復(fù)這個(gè)過(guò)程。我們簡(jiǎn)化的事件循環(huán)會(huì)選擇等待時(shí)間最長(zhǎng)的任務(wù)并運(yùn)行該任務(wù)。此過(guò)程會(huì)一直重復(fù),直到事件循環(huán)結(jié)束。

asyncio的一個(gè)重要之處在于,如果沒(méi)有刻意去釋放控制權(quán),任務(wù)是永遠(yuǎn)不會(huì)放棄控制權(quán)的。它們?cè)诓僮鬟^(guò)程中從不會(huì)被打斷。這使得我們?cè)赼syncio中比在threading中能更容易地共享資源。你不必?fù)?dān)心代碼是否是線程安全的。

            
              import
            
            
               time

            
            
              import
            
            
               asyncio

            
            
              from
            
             aiohttp 
            
              import
            
            
               ClientSession
async 
            
            
              def
            
            
               download_site(session,url):
    
            
            
              global
            
            
               i
    
            
            
              try
            
            
              :
        async with session.get(url) as response:
            i
            
            =i+1
            
            
              print
            
            
              (i)
            
            
            
              return
            
            
               await response.read()
    
            
            
              except
            
            
               Exception as e:
         
            
            
              pass
            
            
              
async 
            
            
              def
            
            
               download_all_site(sites):
    async with ClientSession() as session:
        tasks 
            
            =
            
               []
        
            
            
              for
            
             url 
            
              in
            
            
               sites:
            task 
            
            =
            
               asyncio.create_task(download_site(session,url))
            tasks.append(task)
        result 
            
            = await asyncio.gather(*tasks) 
            
              #
            
            
              等待一組協(xié)程運(yùn)行結(jié)束并接收結(jié)果
            
            
              print
            
            
              (result)



            
            
              if
            
            
              __name__
            
             ==
            
              "
            
            
              __main__
            
            
              "
            
            
              :
    i
            
            =
            
              0
    sites 
            
            = [
            
              "
            
            
              http://www.360kuai.com/
            
            
              "
            
            ,
            
              "
            
            
              https://www.jython.org
            
            
              "
            
            ] * 50
            
              
    start_time 
            
            =
            
               time.time()
    asyncio.run(download_all_site(sites))
    end_time 
            
            =
            
               time.time()
    
            
            
              print
            
            (
            
              "
            
            
              執(zhí)行時(shí)間:%s
            
            
              "
            
             % (end_time - start_time) + 
            
              "
            
            
            
              "
            
            )
            

#執(zhí)行時(shí)間:5.29184889793396秒

異步IO的執(zhí)行時(shí)序表:

python中同步、多線程、異步IO、多線程對(duì)IO密集型的影響_第6張圖片

asyncio版本的問(wèn)題

此時(shí)asyncio有兩個(gè)問(wèn)題。你需要特殊的異步版本的庫(kù)來(lái)充分利用asycio。如果你只是使用requests下載站點(diǎn),那么速度會(huì)慢得多,因?yàn)閞equests的設(shè)計(jì)目的不是通知事件循環(huán)它被阻塞了。隨著時(shí)間的推移,這個(gè)問(wèn)題變得微不足道,因?yàn)樵絹?lái)越多的庫(kù)包含了asyncio。

另一個(gè)更微妙的問(wèn)題是,如果其中一個(gè)任務(wù)不合作,那么協(xié)作多任務(wù)處理的所有優(yōu)勢(shì)都將不存在。代碼中的一個(gè)小錯(cuò)誤可能會(huì)導(dǎo)致任務(wù)運(yùn)行超時(shí)并長(zhǎng)時(shí)間占用處理器,使需要運(yùn)行的其他任務(wù)無(wú)法運(yùn)行。如果一個(gè)任務(wù)沒(méi)有將控制權(quán)交還給事件循環(huán),則事件循環(huán)無(wú)法中斷它。

考慮到這一點(diǎn),我們來(lái)開(kāi)始討論一種完全不同的并發(fā)性——multiprocessing。

?


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺(jué)我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦?。?!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 镇康县| 麻城市| 五常市| 柘城县| 沁水县| 高州市| 固始县| 高邮市| 岢岚县| 洛浦县| 民权县| 酒泉市| 澄迈县| 定边县| 孟津县| 出国| 诸城市| 思茅市| 永仁县| 杭锦旗| 福建省| 汉川市| 区。| 维西| 临西县| 伽师县| 囊谦县| 黄石市| 大余县| 大田县| 布尔津县| 鲁甸县| 婺源县| 上虞市| 万荣县| 阳东县| 仪征市| 阳朔县| 岗巴县| 莆田市| 花垣县|