目錄
-
Python并發(fā)編程05/ 死鎖/遞歸鎖/信號(hào)量/GIL鎖/進(jìn)程池/線程池
- 1.昨日回顧
-
2.死鎖現(xiàn)象與遞歸鎖
- 2.1死鎖現(xiàn)象
- 2.2遞歸鎖
- 3.信號(hào)量
-
4.GIL全局解釋器鎖
- 4.1背景
- 4.2為什么加鎖
- 5.GIL與Lock鎖的區(qū)別
-
6.驗(yàn)證計(jì)算密集型IO密集型的效率
- 6.1 IO密集型
- 6.2 計(jì)算密集型
-
7.多線程實(shí)現(xiàn)socket通信
- 7.1服務(wù)端
- 7.2客戶端
- 8.進(jìn)程池,線程池
Python并發(fā)編程05/ 死鎖/遞歸鎖/信號(hào)量/GIL鎖/進(jìn)程池/線程池
1.昨日回顧
#生產(chǎn)者消費(fèi)者模型.
# 生產(chǎn)者: 產(chǎn)生數(shù)據(jù),
# 消費(fèi)者: 接收數(shù)據(jù)并做下一步處理
# 容器: 隊(duì)列.
#進(jìn)程, 線程:
# 進(jìn)程就是資源單位,線程就是執(zhí)行單位.
#進(jìn)程與線程的區(qū)別:
# 線程: 開銷小,速度快,同一個(gè)進(jìn)程下的線程資源內(nèi)存級(jí)別共享.
# 進(jìn)程: 開銷巨大,速度慢, 不同進(jìn)程的數(shù)據(jù)內(nèi)存級(jí)別不共享.
#join: 阻塞,
# t1.join() 阻塞.
# print('主')
#getname, setname .name
#activeCount() 線程的數(shù)量
守護(hù)線程: 如果守護(hù)線程的生命周期小于其他線程,則他肯定先結(jié)束,否則等待其他非守護(hù)線程和主線程結(jié)束之后結(jié)束.
#互斥鎖,鎖
2.死鎖現(xiàn)象與遞歸鎖
#遞歸鎖可以解決死鎖現(xiàn)象,業(yè)務(wù)需要多個(gè)鎖時(shí),先要考慮遞歸鎖
2.1死鎖現(xiàn)象
# from threading import Thread
# from threading import Lock
# import time
#
# lock_A = Lock()
# lock_B = Lock()
#
#
# class MyThread(Thread):
#
# def run(self):
# self.f1()
# self.f2()
#
#
# def f1(self):
#
# lock_A.acquire()
# print(f'{self.name}拿到了A鎖')
#
# lock_B.acquire()
# print(f'{self.name}拿到了B鎖')
#
# lock_B.release()
#
# lock_A.release()
#
# def f2(self):
#
# lock_B.acquire()
# print(f'{self.name}拿到了B鎖')
#
# time.sleep(0.1)
# lock_A.acquire()
# print(f'{self.name}拿到了A鎖')
#
# lock_A.release()
# lock_B.release()
#
# if __name__ == '__main__':
#
# for i in range(3):
# t = MyThread()
# t.start()
2.2遞歸鎖
遞歸鎖有一個(gè)計(jì)數(shù)的功能, 原數(shù)字為0,上一次鎖,計(jì)數(shù)+1,釋放一次鎖,計(jì)數(shù)-1,
只要遞歸鎖上面的數(shù)字不為零,其他線程就不能搶鎖.
# from threading import Thread
# from threading import RLock
# import time
#
# lock_A = lock_B = RLock()
# 遞歸鎖有一個(gè)計(jì)數(shù)的功能, 原數(shù)字為0,上一次鎖,計(jì)數(shù)+1,釋放一次鎖,計(jì)數(shù)-1,
# 只要遞歸鎖上面的數(shù)字不為零,其他線程就不能搶鎖.
# class MyThread(Thread):
#
# def run(self):
# self.f1()
# self.f2()
#
#
# def f1(self):
#
# lock_A.acquire()
# print(f'{self.name}拿到了A鎖')
#
# lock_B.acquire()
# print(f'{self.name}拿到了B鎖')
#
# lock_B.release()
#
# lock_A.release()
#
# def f2(self):
#
# lock_B.acquire()
# print(f'{self.name}拿到了B鎖')
#
# time.sleep(0.1)
# lock_A.acquire()
# print(f'{self.name}拿到了A鎖')
#
# lock_A.release()
#
# lock_B.release()
#
# if __name__ == '__main__':
#
# for i in range(3):
# t = MyThread()
# t.start()
3.信號(hào)量
也是一種鎖,控制并發(fā)數(shù)量
# from threading import Thread, Semaphore, current_thread
# import time
# import random
# sem = Semaphore(5)
#
# def task():
# sem.acquire()
#
# print(f'{current_thread().name} 廁所ing')
# time.sleep(random.randint(1,3))
#
# sem.release()
#
#
# if __name__ == '__main__':
# for i in range(20):
# t = Thread(target=task,)
# t.start()
4.GIL全局解釋器鎖
4.1背景
#理論上來說:單個(gè)進(jìn)程的多線程可以利用多核.
#但是,開發(fā)Cpython解釋器的程序員,給進(jìn)入解釋器的線程加了鎖.
4.2為什么加鎖
1. 當(dāng)時(shí)都是單核時(shí)代,而且cpu價(jià)格非常貴.
2. 如果不加全局解釋器鎖, 開發(fā)Cpython解釋器的程序員就會(huì)在源碼內(nèi)部各種主動(dòng)加鎖,解鎖,非常麻煩,各種死鎖現(xiàn)象等等.他為了省事兒,直接進(jìn)入解釋器時(shí)給線程加一個(gè)鎖.
優(yōu)點(diǎn): 保證了Cpython解釋器的數(shù)據(jù)資源的安全.
缺點(diǎn): 單個(gè)進(jìn)程的多線程不能利用多核.
#Jpython沒有GIL鎖.
#pypy也沒有GIL鎖.
#現(xiàn)在多核時(shí)代, 我將Cpython的GIL鎖去掉行么?
#因?yàn)镃python解釋器所有的業(yè)務(wù)邏輯都是圍繞著單個(gè)線程實(shí)現(xiàn)的,去掉這個(gè)GIL鎖,幾乎不可能.
單個(gè)進(jìn)程的多線程可以并發(fā),但是不能利用多核,不能并行.
多個(gè)進(jìn)程可以并發(fā),并行.
5.GIL與Lock鎖的區(qū)別
相同點(diǎn): 都是同種鎖,互斥鎖.
不同點(diǎn):
GIL鎖全局解釋器鎖,保護(hù)解釋器內(nèi)部的資源數(shù)據(jù)的安全.
GIL鎖 上鎖,釋放無需手動(dòng)操作.
自己代碼中定義的互斥鎖保護(hù)進(jìn)程中的資源數(shù)據(jù)的安全.
自己定義的互斥鎖必須自己手動(dòng)上鎖,釋放鎖.
6.驗(yàn)證計(jì)算密集型IO密集型的效率
6.1 IO密集型
# IO密集型: 單個(gè)進(jìn)程的多線程并發(fā) vs 多個(gè)進(jìn)程的并發(fā)并行
# def task():
# count = 0
# time.sleep(random.randint(1,3))
# count += 1
# if __name__ == '__main__':
# 多進(jìn)程的并發(fā),并行
# start_time = time.time()
# l1 = []
# for i in range(50):
# p = Process(target=task,)
# l1.append(p)
# p.start()
#
# for p in l1:
# p.join()
#
# print(f'執(zhí)行效率:{time.time()- start_time}') # 8.000000000
# 多線程的并發(fā)
# start_time = time.time()
# l1 = []
# for i in range(50):
# p = Thread(target=task,)
# l1.append(p)
# p.start()
#
# for p in l1:
# p.join()
#
# print(f'執(zhí)行效率:{time.time()- start_time}') # 3.0294392108917236
對(duì)于IO密集型: 單個(gè)進(jìn)程的多線程的并發(fā)效率高.
6.2 計(jì)算密集型
#from threading import Thread
#from multiprocessing import Process
#import time
#import random
# # 計(jì)算密集型: 單個(gè)進(jìn)程的多線程并發(fā) vs 多個(gè)進(jìn)程的并發(fā)并行
#
# def task():
# count = 0
# for i in range(10000000):
# count += 1
#
#
# if __name__ == '__main__':
#
# # 多進(jìn)程的并發(fā),并行
# # start_time = time.time()
# # l1 = []
# # for i in range(4):
# # p = Process(target=task,)
# # l1.append(p)
# # p.start()
# #
# # for p in l1:
# # p.join()
# #
# # print(f'執(zhí)行效率:{time.time()- start_time}') # 3.1402080059051514
#
# # 多線程的并發(fā)
# # start_time = time.time()
# # l1 = []
# # for i in range(4):
# # p = Thread(target=task,)
# # l1.append(p)
# # p.start()
# #
# # for p in l1:
# # p.join()
# #
# # print(f'執(zhí)行效率:{time.time()- start_time}') # 4.5913777351379395
總結(jié): 計(jì)算密集型: 多進(jìn)程的并發(fā)并行效率高.
7.多線程實(shí)現(xiàn)socket通信
#無論是多線程還是多進(jìn)程,如果按照上面的寫法,來一個(gè)客戶端請(qǐng)求,我就開一個(gè)線程,來一個(gè)請(qǐng)求開一個(gè)線程,
#應(yīng)該是這樣: 你的計(jì)算機(jī)允許范圍內(nèi),開啟的線程進(jìn)程數(shù)量越多越好.
7.1服務(wù)端
# import socket
# from threading import Thread
#
# def communicate(conn,addr):
# while 1:
# try:
# from_client_data = conn.recv(1024)
# print(f'來自客戶端{(lán)addr[1]}的消息: {from_client_data.decode("utf-8")}')
# to_client_data = input('>>>').strip()
# conn.send(to_client_data.encode('utf-8'))
# except Exception:
# break
# conn.close()
#
# def _accept():
# server = socket.socket()
# server.bind(('127.0.0.1', 8848))
# server.listen(5)
#
# while 1:
# conn, addr = server.accept()
# t = Thread(target=communicate,args=(conn,addr))
# t.start()
#
# if __name__ == '__main__':
# _accept()
7.2客戶端
# import socket
# client = socket.socket()
# client.connect(('127.0.0.1',8848))
#
# while 1:
# try:
# to_server_data = input('>>>').strip()
# client.send(to_server_data.encode('utf-8'))
#
# from_server_data = client.recv(1024)
# print(f'來自服務(wù)端的消息: {from_server_data.decode("utf-8")}')
#
# except Exception:
# break
# client.close()
8.進(jìn)程池,線程池
#線程池: 一個(gè)容器,這個(gè)容器限制住你開啟線程的數(shù)量,比如4個(gè),第一次肯定只能并發(fā)的處理4個(gè)任務(wù),只要有任務(wù)完成,線程馬上就會(huì)接下一個(gè)任務(wù).
以時(shí)間換空間.
# from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# import os
# import time
# import random
#
# # print(os.cpu_count())
# def task(n):
# print(f'{os.getpid()} 接客')
# time.sleep(random.randint(1,3))
#
#
# if __name__ == '__main__':
# 開啟進(jìn)程池 (并行(并行+并發(fā)))
# p = ProcessPoolExecutor() # 默認(rèn)不寫,進(jìn)程池里面的進(jìn)程數(shù)與cpu個(gè)數(shù)相等
#
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# # p.submit(task,1)
# for i in range(20):
# p.submit(task,i)
#
# 開啟線程池 (并發(fā))
# t = ThreadPoolExecutor() # 默認(rèn)不寫, cpu個(gè)數(shù)*5 線程數(shù)
# t = ThreadPoolExecutor(100) # 100個(gè)線程
# for i in range(20):
# t.submit(task,i)
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長非常感激您!手機(jī)微信長按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
