
python線程、進程和協(xié)程詳解
我們都知道python網(wǎng)絡(luò)編程的兩大必學(xué)模塊socket和socketserver,其中的socketserver是一個支持IO多路復(fù)用和多線程、多進程的模塊。一般我們在socketserver服務(wù)端代碼中都會寫這么一句:
server = socketserver.ThreadingTCPServer(settings.IP_PORT, MyServer)
ThreadingTCPServer這個類是一個支持多線程和TCP協(xié)議的socketserver,它的繼承關(guān)系是這樣的:
class ThreadingTCPServer(ThreadingMixIn, TCPServer): pass
右邊的TCPServer實際上是它主要的功能父類,而左邊的ThreadingMixIn則是實現(xiàn)了多線程的類,它自己本身則沒有任何代碼。
MixIn在python的類命名中,很常見,一般被稱為“混入”,戲稱“亂入”,通常為了某種重要功能被子類繼承。
class ThreadingMixIn:
daemon_threads = False
def process_request_thread(self, request, client_address):
try:
self.finish_request(request, client_address)
self.shutdown_request(request)
except:
self.handle_error(request, client_address)
self.shutdown_request(request)
def process_request(self, request, client_address):
t = threading.Thread(target = self.process_request_thread,
args = (request, client_address))
t.daemon = self.daemon_threads
t.start()
在ThreadingMixIn類中,其實就定義了一個屬性,兩個方法。在process_request方法中實際調(diào)用的正是python內(nèi)置的多線程模塊threading。這個模塊是python中所有多線程的基礎(chǔ),socketserver本質(zhì)上也是利用了這個模塊。
一、線程
復(fù)制代碼 代碼如下:
線程,有時被稱為輕量級進程(Lightweight Process,LWP),是程序執(zhí)行流的最小單元。一個標(biāo)準(zhǔn)的線程由線程ID,當(dāng)前指令指針(PC),寄存器集合和堆棧組成。另外,線程是進程中的一個實體,是被系統(tǒng)獨立調(diào)度和分派的基本單位,線程自己不獨立擁有系統(tǒng)資源,但它可與同屬一個進程的其它線程共享該進程所擁有的全部資源。一個線程可以創(chuàng)建和撤消另一個線程,同一進程中的多個線程之間可以并發(fā)執(zhí)行。由于線程之間的相互制約,致使線程在運行中呈現(xiàn)出間斷性。線程也有就緒、阻塞和運行三種基本狀態(tài)。就緒狀態(tài)是指線程具備運行的所有條件,邏輯上可以運行,在等待處理機;運行狀態(tài)是指線程占有處理機正在運行;阻塞狀態(tài)是指線程在等待一個事件(如某個信號量),邏輯上不可執(zhí)行。每一個應(yīng)用程序都至少有一個進程和一個線程。線程是程序中一個單一的順序控制流程。在單個程序中同時運行多個線程完成不同的被劃分成一塊一塊的工作,稱為多線程。
以上那一段,可以不用看!舉個例子,廠家要生產(chǎn)某個產(chǎn)品,在它的生產(chǎn)基地建設(shè)了很多廠房,每個廠房內(nèi)又有多條流水生產(chǎn)線。所有廠房配合將整個產(chǎn)品生產(chǎn)出來,某個廠房內(nèi)的所有流水線將這個廠房負(fù)責(zé)的產(chǎn)品部分生產(chǎn)出來。每個廠房擁有自己的材料庫,廠房內(nèi)的生產(chǎn)線共享這些材料。而每一個廠家要實現(xiàn)生產(chǎn)必須擁有至少一個廠房一條生產(chǎn)線。那么這個廠家就是某個應(yīng)用程序;每個廠房就是一個進程;每條生產(chǎn)線都是一個線程。
1.1 普通的多線程
在python中,threading模塊提供線程的功能。通過它,我們可以輕易的在進程中創(chuàng)建多個線程。下面是個例子:
import threading
import time
def show(arg):
time.sleep(1)
print('thread'+str(arg))
for i in range(10):
t = threading.Thread(target=show, args=(i,))
t.start()
print('main thread stop')
上述代碼創(chuàng)建了10個“前臺”線程,然后控制器就交給了CPU,CPU根據(jù)指定算法進行調(diào)度,分片執(zhí)行指令。
下面是Thread類的主要方法:
start 線程準(zhǔn)備就緒,等待CPU調(diào)度
setName 為線程設(shè)置名稱
getName 獲取線程名稱
setDaemon 設(shè)置為后臺線程或前臺線程(默認(rèn))
如果是后臺線程,主線程執(zhí)行過程中,后臺線程也在進行,主線程執(zhí)行完畢后,后臺線程不論成功與否,均停止。如果是前臺線程,主線程執(zhí)行過程中,前臺線程也在進行,主線程執(zhí)行完畢后,等待前臺線程也執(zhí)行完成后,程序停止。
join 逐個執(zhí)行每個線程,執(zhí)行完畢后繼續(xù)往下執(zhí)行,該方法使得多線程變得無意義。
run 線程被cpu調(diào)度后自動執(zhí)行線程對象的run方法
1.2 自定義線程類
對于threading模塊中的Thread類,本質(zhì)上是執(zhí)行了它的run方法。因此可以自定義線程類,讓它繼承Thread類,然后重寫run方法。
import threading
class MyThreading(threading.Thread):
def __init__(self,func,arg):
super(MyThreading,self).__init__()
self.func = func
self.arg = arg
def run(self):
self.func(self.arg)
def f1(args):
print(args)
obj = MyThreading(f1, 123)
obj.start()
1.3 線程鎖
CPU執(zhí)行任務(wù)時,在線程之間是進行隨機調(diào)度的,并且每個線程可能只執(zhí)行n條代碼后就轉(zhuǎn)而執(zhí)行另外一條線程。由于在一個進程中的多個線程之間是共享資源和數(shù)據(jù)的,這就容易造成資源搶奪或臟數(shù)據(jù),于是就有了鎖的概念,限制某一時刻只有一個線程能訪問某個指定的數(shù)據(jù)。
1.3.1 未使用鎖
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
import time
NUM = 0
def show():
global NUM
NUM += 1
name = t.getName()
time.sleep(1) # 注意,這行語句的位置很重要,必須在NUM被修改后,否則觀察不到臟數(shù)據(jù)的現(xiàn)象。
print(name, "執(zhí)行完畢后,NUM的值為: ", NUM)
for i in range(10):
t = threading.Thread(target=show)
t.start()
print('main thread stop')
上述代碼運行后,結(jié)果如下:
main thread stop
Thread-1 執(zhí)行完畢后,NUM的值為: 10
Thread-2 執(zhí)行完畢后,NUM的值為: 10
Thread-4 執(zhí)行完畢后,NUM的值為: 10
Thread-9 執(zhí)行完畢后,NUM的值為: 10
Thread-3 執(zhí)行完畢后,NUM的值為: 10
Thread-6 執(zhí)行完畢后,NUM的值為: 10
Thread-8 執(zhí)行完畢后,NUM的值為: 10
Thread-7 執(zhí)行完畢后,NUM的值為: 10
Thread-5 執(zhí)行完畢后,NUM的值為: 10
Thread-10 執(zhí)行完畢后,NUM的值為: 10
由此可見,由于線程同時訪問一個數(shù)據(jù),產(chǎn)生了錯誤的結(jié)果。為了解決這個問題,python在threading模塊中定義了幾種線程鎖類,分別是:
Lock 普通鎖(不可嵌套)
RLock 普通鎖(可嵌套)常用
Semaphore 信號量
event 事件
condition 條件
1.3.2 普通鎖Lock和RLock
類名:Lock或RLock
普通鎖,也叫互斥鎖,是獨占的,同一時刻只有一個線程被放行。
import time
import threading
NUM = 10
def func(lock):
global NUM
lock.acquire() # 讓鎖開始起作用
NUM -= 1
time.sleep(1)
print(NUM)
lock.release() # 釋放鎖
lock = threading.Lock() # 實例化一個鎖對象
for i in range(10):
t = threading.Thread(target=func, args=(lock,)) # 記得把鎖當(dāng)作參數(shù)傳遞給func參數(shù)
t.start()
以上是threading模塊的Lock類,它不支持嵌套鎖。RLcok類的用法和Lock一模一樣,但它支持嵌套,因此我們一般直接使用RLcok類。
1.3.3 信號量(Semaphore)
類名:BoundedSemaphore
這種鎖允許一定數(shù)量的線程同時更改數(shù)據(jù),它不是互斥鎖。比如地鐵安檢,排隊人很多,工作人員只允許一定數(shù)量的人進入安檢區(qū),其它的人繼續(xù)排隊。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
def run(n):
semaphore.acquire()
print("run the thread: %s" % n)
time.sleep(1)
semaphore.release()
num = 0
semaphore = threading.BoundedSemaphore(5) # 最多允許5個線程同時運行
for i in range(20):
t = threading.Thread(target=run, args=(i,))
t.start()
1.3.4 事件(Event)
類名:Event
事件主要提供了三個方法 set、wait、clear。
事件機制:全局定義了一個“Flag”,如果“Flag”的值為False,那么當(dāng)程序執(zhí)行wait方法時就會阻塞,如果“Flag”值為True,那么wait方法時便不再阻塞。這種鎖,類似交通紅綠燈(默認(rèn)是紅燈),它屬于在紅燈的時候一次性阻擋所有線程,在綠燈的時候,一次性放行所有的排隊中的線程。
clear:將“Flag”設(shè)置為False
set:將“Flag”設(shè)置為True
import threading
def func(e,i):
print(i)
e.wait() # 檢測當(dāng)前event是什么狀態(tài),如果是紅燈,則阻塞,如果是綠燈則繼續(xù)往下執(zhí)行。默認(rèn)是紅燈。
print(i+100)
event = threading.Event()
for i in range(10):
t = threading.Thread(target=func, args=(event, i))
t.start()
event.clear() # 主動將狀態(tài)設(shè)置為紅燈
inp = input(">>>")
if inp == "1":
event.set() # 主動將狀態(tài)設(shè)置為綠燈
1.3.5 條件(condition)
類名:Condition
該機制會使得線程等待,只有滿足某條件時,才釋放n個線程。
import threading
def condition():
ret = False
r = input(">>>")
if r == "yes":
ret = True
return ret
def func(conn, i):
print(i)
conn.acquire()
conn.wait_for(condition) # 這個方法接受一個函數(shù)的返回值
print(i+100)
conn.release()
c = threading.Condition()
for i in range(10):
t = threading.Thread(target=func, args=(c, i,))
t.start()
上面的例子,每輸入一次“yes”放行了一個線程。下面這個,可以選擇一次放行幾個線程。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import threading
def run(n):
con.acquire()
con.wait()
print("run the thread: %s" %n)
con.release()
if __name__ == '__main__':
con = threading.Condition()
for i in range(10):
t = threading.Thread(target=run, args=(i,))
t.start()
while True:
inp = input('>>>')
if inp == "q":
break
# 下面這三行是固定語法
con.acquire()
con.notify(int(inp)) # 這個方法接收一個整數(shù),表示讓多少個線程通過
con.release()
1.3 全局解釋器鎖(GIL)
既然介紹了多線程和線程鎖,那就不得不提及python的GIL,也就是全局解釋器鎖。在編程語言的世界,python因為GIL的問題廣受詬病,因為它在解釋器的層面限制了程序在同一時間只有一個線程被CPU實際執(zhí)行,而不管你的程序里實際開了多少條線程。所以我們經(jīng)常能發(fā)現(xiàn),python中的多線程編程有時候效率還不如單線程,就是因為這個原因。那么,對于這個GIL,一些普遍的問題如下:
每種編程語言都有GIL嗎?
以python官方Cpython解釋器為代表....其他語言好像未見。
為什么要有GIL?
作為解釋型語言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會遇到的問題。解釋器要留意的是避免在不同的線程操作內(nèi)部共享的數(shù)據(jù)。同時它還要保證在管理用戶線程時總是有最大化的計算資源。那么,不同線程同時訪問時,數(shù)據(jù)的保護機制是怎樣的呢?答案是解釋器全局鎖GIL。GIL對諸如當(dāng)前線程狀態(tài)和為垃圾回收而用的堆分配對象這樣的東西的訪問提供著保護。
為什么不能去掉GIL?
首先,在早期的python解釋器依賴較多的全局狀態(tài),傳承下來,使得想要移除當(dāng)今的GIL變得更加困難。其次,對于程序員而言,僅僅是想要理解它的實現(xiàn)就需要對操作系統(tǒng)設(shè)計、多線程編程、C語言、解釋器設(shè)計和CPython解釋器的實現(xiàn)有著非常徹底的理解。
在1999年,針對Python1.5,一個“freethreading”補丁已經(jīng)嘗試移除GIL,用細粒度的鎖來代替。然而,GIL的移除給單線程程序的執(zhí)行速度帶來了一定的負(fù)面影響。當(dāng)用單線程執(zhí)行時,速度大約降低了40%。雖然使用兩個線程時在速度上得到了提高,但這個提高并沒有隨著核數(shù)的增加而線性增長。因此這個補丁沒有被采納。
另外,在python的不同解釋器實現(xiàn)中,如PyPy就移除了GIL,其執(zhí)行速度更快(不單單是去除GIL的原因)。然而,我們通常使用的CPython占有著統(tǒng)治地位的使用量,所以,你懂的。
在Python 3.2中實現(xiàn)了一個新的GIL,并且?guī)е恍┓e極的結(jié)果。這是自1992年以來,GIL的一次最主要改變。舊的GIL通過對Python指令進行計數(shù)來確定何時放棄GIL。在新的GIL實現(xiàn)中,用一個固定的超時時間來指示當(dāng)前的線程以放棄這個鎖。在當(dāng)前線程保持這個鎖,且當(dāng)?shù)诙€線程請求這個鎖的時候,當(dāng)前線程就會在5ms后被強制釋放掉這個鎖(這就是說,當(dāng)前線程每5ms就要檢查其是否需要釋放這個鎖)。當(dāng)任務(wù)是可行的時候,這會使得線程間的切換更加可預(yù)測。
GIL對我們有什么影響?
最大的影響是我們不能隨意使用多線程。要區(qū)分任務(wù)場景。
在單核cpu情況下對性能的影響可以忽略不計,多線程多進程都差不多。在多核CPU時,多線程效率較低。GIL對單進程和多進程沒有影響。
在實際使用中有什么好的建議?
建議在IO密集型任務(wù)中使用多線程,在計算密集型任務(wù)中使用多進程。深入研究python的協(xié)程機制,你會有驚喜的。
更多的詳細介紹和說明請參考下面的文獻:
原文:Python's Hardest Problem
譯文:Python 最難的問題
1.4 定時器(Timer)
定時器,指定n秒后執(zhí)行某操作。很簡單但很使用的東西。
from threading import Timer
def hello():
print("hello, world")
t = Timer(1, hello) # 表示1秒后執(zhí)行hello函數(shù)
t.start()
1.5 隊列
通常而言,隊列是一種先進先出的數(shù)據(jù)結(jié)構(gòu),與之對應(yīng)的是堆棧這種后進先出的結(jié)構(gòu)。但是在python中,它內(nèi)置了一個queue模塊,它不但提供普通的隊列,還提供一些特殊的隊列。具體如下:
queue.Queue :先進先出隊列
queue.LifoQueue :后進先出隊列
queue.PriorityQueue :優(yōu)先級隊列
queue.deque :雙向隊列
1.5.1 Queue:先進先出隊列
這是最常用也是最普遍的隊列,先看一個例子。
import queue
q = queue.Queue(5)
q.put(11)
q.put(22)
q.put(33)
print(q.get())
print(q.get())
print(q.get())
Queue類的參數(shù)和方法:
maxsize 隊列的最大元素個數(shù),也就是queue.Queue(5)中的5。當(dāng)隊列內(nèi)的元素達到這個值時,后來的元素默認(rèn)會阻塞,等待隊列騰出位置。
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._init(maxsize)
qsize() 獲取當(dāng)前隊列中元素的個數(shù),也就是隊列的大小
empty() 判斷當(dāng)前隊列是否為空,返回True或者False
full() 判斷當(dāng)前隊列是否已滿,返回True或者False
put(self, block=True, timeout=None)
往隊列里放一個元素,默認(rèn)是阻塞和無時間限制的。如果,block設(shè)置為False,則不阻塞,這時,如果隊列是滿的,放不進去,就會彈出異常。如果timeout設(shè)置為n秒,則會等待這個秒數(shù)后才put,如果put不進去則彈出異常。
get(self, block=True, timeout=None)
從隊列里獲取一個元素。參數(shù)和put是一樣的意思。
join() 阻塞進程,直到所有任務(wù)完成,需要配合另一個方法task_done。
def join(self):
with self.all_tasks_done:
while self.unfinished_tasks:
self.all_tasks_done.wait()
task_done() 表示某個任務(wù)完成。每一條get語句后需要一條task_done。
import queue
q = queue.Queue(5)
q.put(11)
q.put(22)
print(q.get())
q.task_done()
print(q.get())
q.task_done()
q.join()
1.5.2 LifoQueue:后進先出隊列
類似于“堆?!?,后進先出。也較常用。
import queue
q = queue.LifoQueue()
q.put(123)
q.put(456)
print(q.get())
上述代碼運行結(jié)果是:456
1.5.3 PriorityQueue:優(yōu)先級隊列
帶有權(quán)重的隊列,每個元素都是一個元組,前面的數(shù)字表示它的優(yōu)先級,數(shù)字越小優(yōu)先級越高,同樣的優(yōu)先級先進先出
q = queue.PriorityQueue()
q.put((1,"alex1"))
q.put((1,"alex2"))
q.put((1,"alex3"))
q.put((3,"alex3"))
print(q.get())
1.5.4 deque:雙向隊列
Queue和LifoQueue的“綜合體”,雙向進出。方法較多,使用復(fù)雜,慎用!
q = queue.deque()
q.append(123)
q.append(333)
q.appendleft(456)
q.pop()
q.popleft()
1.6 生產(chǎn)者消費者模型
利用多線程和隊列可以搭建一個生產(chǎn)者消費者模型,用于處理大并發(fā)的服務(wù)。
在并發(fā)編程中使用生產(chǎn)者和消費者模式能夠解決絕大多數(shù)并發(fā)問題。該模式通過平衡生產(chǎn)線程和消費線程的工作能力來提高程序的整體處理數(shù)據(jù)的速度。
為什么要使用生產(chǎn)者和消費者模式
在線程世界里,生產(chǎn)者就是生產(chǎn)數(shù)據(jù)的線程,消費者就是消費數(shù)據(jù)的線程。在多線程開發(fā)當(dāng)中,如果生產(chǎn)者處理速度很快,而消費者處理速度很慢,那么生產(chǎn)者就必須等待消費者處理完,才能繼續(xù)生產(chǎn)數(shù)據(jù)。同樣的道理,如果消費者的處理能力大于生產(chǎn)者,那么消費者就必須等待生產(chǎn)者。為了解決這個問題于是引入了生產(chǎn)者和消費者模式。
什么是生產(chǎn)者消費者模式
生產(chǎn)者消費者模式是通過一個容器來解決生產(chǎn)者和消費者的強耦合問題。生產(chǎn)者和消費者彼此之間不直接通訊,而通過阻塞隊列來進行通訊,所以生產(chǎn)者生產(chǎn)完數(shù)據(jù)之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產(chǎn)者要數(shù)據(jù),而是直接從阻塞隊列里取,阻塞隊列就相當(dāng)于一個緩沖區(qū),平衡了生產(chǎn)者和消費者的處理能力。
這個阻塞隊列就是用來給生產(chǎn)者和消費者解耦的??v觀大多數(shù)設(shè)計模式,都會找一個第三者出來進行解耦,如工廠模式的第三者是工廠類,模板模式的第三者是模板類。在學(xué)習(xí)一些設(shè)計模式的過程中,如果先找到這個模式的第三者,能幫助我們快速熟悉一個設(shè)計模式。
以上摘自方騰飛的《聊聊并發(fā)——生產(chǎn)者消費者模式》
下面是一個簡單的廚師做包子,顧客吃包子的例子。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang
import time
import queue
import threading
q = queue.Queue(10)
def productor(i):
while True:
q.put("廚師 %s 做的包子!"%i)
time.sleep(2)
def consumer(k):
while True:
print("顧客 %s 吃了一個 %s"%(k,q.get()))
time.sleep(1)
for i in range(3):
t = threading.Thread(target=productor,args=(i,))
t.start()
for k in range(10):
v = threading.Thread(target=consumer,args=(k,))
v.start()
1.7 線程池
在使用多線程處理任務(wù)時也不是線程越多越好,由于在切換線程的時候,需要切換上下文環(huán)境,依然會造成cpu的大量開銷。為解決這個問題,線程池的概念被提出來了。預(yù)先創(chuàng)建好一個較為優(yōu)化的數(shù)量的線程,讓過來的任務(wù)立刻能夠使用,就形成了線程池。在python中,沒有內(nèi)置的較好的線程池模塊,需要自己實現(xiàn)或使用第三方模塊。下面是一個簡單的線程池:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:Liu Jiang
import queue
import time
import threading
class MyThreadPool:
def __init__(self, maxsize=5):
self.maxsize = maxsize
self._q = queue.Queue(maxsize)
for i in range(maxsize):
self._q.put(threading.Thread)
def get_thread(self):
return self._q.get()
def add_thread(self):
self._q.put(threading.Thread)
def task(i, pool):
print(i)
time.sleep(1)
pool.add_thread()
pool = MyThreadPool(5)
for i in range(100):
t = pool.get_thread()
obj = t(target=task, args=(i,pool))
obj.start()
上面的例子是把線程類當(dāng)做元素添加到隊列內(nèi)。實現(xiàn)方法比較糙,每個線程使用后就被拋棄,一開始就將線程開到滿,因此性能較差。下面是一個相對好一點的例子:
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import queue
import threading
import contextlib
import time
StopEvent = object() # 創(chuàng)建空對象
class ThreadPool(object):
def __init__(self, max_num, max_task_num = None):
if max_task_num:
self.q = queue.Queue(max_task_num)
else:
self.q = queue.Queue()
self.max_num = max_num
self.cancel = False
self.terminal = False
self.generate_list = []
self.free_list = []
def run(self, func, args, callback=None):
"""
線程池執(zhí)行一個任務(wù)
:param func: 任務(wù)函數(shù)
:param args: 任務(wù)函數(shù)所需參數(shù)
:param callback: 任務(wù)執(zhí)行失敗或成功后執(zhí)行的回調(diào)函數(shù),回調(diào)函數(shù)有兩個參數(shù)1、任務(wù)函數(shù)執(zhí)行狀態(tài);2、任務(wù)函數(shù)返回值(默認(rèn)為None,即:不執(zhí)行回調(diào)函數(shù))
:return: 如果線程池已經(jīng)終止,則返回True否則None
"""
if self.cancel:
return
if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
self.generate_thread()
w = (func, args, callback,)
self.q.put(w)
def generate_thread(self):
"""
創(chuàng)建一個線程
"""
t = threading.Thread(target=self.call)
t.start()
def call(self):
"""
循環(huán)去獲取任務(wù)函數(shù)并執(zhí)行任務(wù)函數(shù)
"""
current_thread = threading.currentThread
self.generate_list.append(current_thread)
event = self.q.get()
while event != StopEvent:
func, arguments, callback = event
try:
result = func(*arguments)
success = True
except Exception as e:
success = False
result = None
if callback is not None:
try:
callback(success, result)
except Exception as e:
pass
with self.worker_state(self.free_list, current_thread):
if self.terminal:
event = StopEvent
else:
event = self.q.get()
else:
self.generate_list.remove(current_thread)
def close(self):
"""
執(zhí)行完所有的任務(wù)后,所有線程停止
"""
self.cancel = True
full_size = len(self.generate_list)
while full_size:
self.q.put(StopEvent)
full_size -= 1
def terminate(self):
"""
無論是否還有任務(wù),終止線程
"""
self.terminal = True
while self.generate_list:
self.q.put(StopEvent)
self.q.empty()
@contextlib.contextmanager
def worker_state(self, state_list, worker_thread):
"""
用于記錄線程中正在等待的線程數(shù)
"""
state_list.append(worker_thread)
try:
yield
finally:
state_list.remove(worker_thread)
# How to use
pool = ThreadPool(5)
def callback(status, result):
# status, execute action status
# result, execute action return value
pass
def action(i):
print(i)
for i in range(30):
ret = pool.run(action, (i,), callback)
time.sleep(5)
print(len(pool.generate_list), len(pool.free_list))
print(len(pool.generate_list), len(pool.free_list))
# pool.close()
# pool.terminate()
二、進程
在python中multiprocess模塊提供了Process類,實現(xiàn)進程相關(guān)的功能。但是,由于它是基于fork機制的,因此不被windows平臺支持。想要在windows中運行,必須使用if __name__ == '__main__:的方式,顯然這只能用于調(diào)試和學(xué)習(xí),不能用于實際環(huán)境。
(PS:在這里我必須吐槽一下python的包、模塊和類的組織結(jié)構(gòu)。在multiprocess中你既可以import大寫的Process,也可以import小寫的process,這兩者是完全不同的東西。這種情況在python中很多,新手容易傻傻分不清。)
下面是一個簡單的多進程例子,你會發(fā)現(xiàn)Process的用法和Thread的用法幾乎一模一樣。
from multiprocessing import Process
def foo(i):
print("This is Process ", i)
if __name__ == '__main__':
for i in range(5):
p = Process(target=foo, args=(i,))
p.start()
2.1 進程的數(shù)據(jù)共享
每個進程都有自己獨立的數(shù)據(jù)空間,不同進程之間通常是不能共享數(shù)據(jù),創(chuàng)建一個進程需要非常大的開銷。
from multiprocessing import Process
list_1 = []
def foo(i):
list_1.append(i)
print("This is Process ", i," and list_1 is ", list_1)
if __name__ == '__main__':
for i in range(5):
p = Process(target=foo, args=(i,))
p.start()
print("The end of list_1:", list_1)
運行上面的代碼,你會發(fā)現(xiàn)列表list_1在各個進程中只有自己的數(shù)據(jù),完全無法共享。想要進程之間進行資源共享可以使用queues/Array/Manager這三個multiprocess模塊提供的類。
2.1.1 使用Array共享數(shù)據(jù)
from multiprocessing import Process
from multiprocessing import Array
def Foo(i,temp):
temp[0] += 100
for item in temp:
print(i,'----->',item)
if __name__ == '__main__':
temp = Array('i', [11, 22, 33, 44])
for i in range(2):
p = Process(target=Foo, args=(i,temp))
p.start()
對于Array數(shù)組類,括號內(nèi)的“i”表示它內(nèi)部的元素全部是int類型,而不是指字符i,列表內(nèi)的元素可以預(yù)先指定,也可以指定列表長度。概括的來說就是Array類在實例化的時候就必須指定數(shù)組的數(shù)據(jù)類型和數(shù)組的大小,類似temp = Array('i', 5)。對于數(shù)據(jù)類型有下面的表格對應(yīng):
'c': ctypes.c_char, 'u': ctypes.c_wchar,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
2.1.2 使用Manager共享數(shù)據(jù)
from multiprocessing import Process,Manager
def Foo(i,dic):
dic[i] = 100+i
print(dic.values())
if __name__ == '__main__':
manage = Manager()
dic = manage.dict()
for i in range(10):
p = Process(target=Foo, args=(i,dic))
p.start()
p.join()
Manager比Array要好用一點,因為它可以同時保存多種類型的數(shù)據(jù)格式。
2.1.3 使用queues的Queue類共享數(shù)據(jù)
import multiprocessing
from multiprocessing import Process
from multiprocessing import queues
def foo(i,arg):
arg.put(i)
print('The Process is ', i, "and the queue's size is ", arg.qsize())
if __name__ == "__main__":
li = queues.Queue(20, ctx=multiprocessing)
for i in range(10):
p = Process(target=foo, args=(i,li,))
p.start()
這里就有點類似上面的隊列了。從運行結(jié)果里,你還能發(fā)現(xiàn)數(shù)據(jù)共享中存在的臟數(shù)據(jù)問題。另外,比較悲催的是multiprocessing里還有一個Queue,一樣能實現(xiàn)這個功能。
2.2 進程鎖
為了防止和多線程一樣的出現(xiàn)數(shù)據(jù)搶奪和臟數(shù)據(jù)的問題,同樣需要設(shè)置進程鎖。與threading類似,在multiprocessing里也有同名的鎖類RLock, Lock, Event, Condition, Semaphore,連用法都是一樣樣的?。ㄟ@個我喜歡)
from multiprocessing import Process
from multiprocessing import queues
from multiprocessing import Array
from multiprocessing import RLock, Lock, Event, Condition, Semaphore
import multiprocessing
import time
def foo(i,lis,lc):
lc.acquire()
lis[0] = lis[0] - 1
time.sleep(1)
print('say hi',lis[0])
lc.release()
if __name__ == "__main__":
# li = []
li = Array('i', 1)
li[0] = 10
lock = RLock()
for i in range(10):
p = Process(target=foo,args=(i,li,lock))
p.start()
2.3 進程池
既然有線程池,那必然也有進程池。但是,python給我們內(nèi)置了一個進程池,不需要像線程池那樣需要自定義,你只需要簡單的from multiprocessing import Pool。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from multiprocessing import Pool
import time
def f1(args):
time.sleep(1)
print(args)
if __name__ == '__main__':
p = Pool(5)
for i in range(30):
p.apply_async(func=f1, args= (i,))
p.close() # 等子進程執(zhí)行完畢后關(guān)閉線程池
# time.sleep(2)
# p.terminate() # 立刻關(guān)閉線程池
p.join()
進程池內(nèi)部維護一個進程序列,當(dāng)使用時,去進程池中獲取一個進程,如果進程池序列中沒有可供使用的進程,那么程序就會等待,直到進程池中有可用進程為止。
進程池中有以下幾個主要方法:
apply:從進程池里取一個進程并執(zhí)行
apply_async:apply的異步版本
terminate:立刻關(guān)閉線程池
join:主進程等待所有子進程執(zhí)行完畢,必須在close或terminate之后
close:等待所有進程結(jié)束后,才關(guān)閉線程池
三、協(xié)程
線程和進程的操作是由程序觸發(fā)系統(tǒng)接口,最后的執(zhí)行者是系統(tǒng),它本質(zhì)上是操作系統(tǒng)提供的功能。而協(xié)程的操作則是程序員指定的,在python中通過yield,人為的實現(xiàn)并發(fā)處理。
協(xié)程存在的意義:對于多線程應(yīng)用,CPU通過切片的方式來切換線程間的執(zhí)行,線程切換時需要耗時。協(xié)程,則只使用一個線程,分解一個線程成為多個“微線程”,在一個線程中規(guī)定某個代碼塊的執(zhí)行順序。
協(xié)程的適用場景:當(dāng)程序中存在大量不需要CPU的操作時(IO)。
在不需要自己“造輪子”的年代,同樣有第三方模塊為我們提供了高效的協(xié)程,這里介紹一下greenlet和gevent。本質(zhì)上,gevent是對greenlet的高級封裝,因此一般用它就行,這是一個相當(dāng)高效的模塊。
在使用它們之前,需要先安裝,可以通過源碼,也可以通過pip。
3.1 greenlet
from greenlet import greenlet
def test1():
print(12)
gr2.switch()
print(34)
gr2.switch()
def test2():
print(56)
gr1.switch()
print(78)
gr1 = greenlet(test1)
gr2 = greenlet(test2)
gr1.switch()
實際上,greenlet就是通過switch方法在不同的任務(wù)之間進行切換。
3.2 gevent
from gevent import monkey; monkey.patch_all()
import gevent
import requests
def f(url):
print('GET: %s' % url)
resp = requests.get(url)
data = resp.text
print('%d bytes received from %s.' % (len(data), url))
gevent.joinall([
gevent.spawn(f, 'https://www.python.org/'),
gevent.spawn(f, 'https://www.yahoo.com/'),
gevent.spawn(f, 'https://github.com/'),
])
通過joinall將任務(wù)f和它的參數(shù)進行統(tǒng)一調(diào)度,實現(xiàn)單線程中的協(xié)程。代碼封裝層次很高,實際使用只需要了解它的幾個主要方法即可。
數(shù)據(jù)分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
SQL Server 中 CONVERT 函數(shù)的日期轉(zhuǎn)換:從基礎(chǔ)用法到實戰(zhàn)優(yōu)化 在 SQL Server 的數(shù)據(jù)處理中,日期格式轉(zhuǎn)換是高頻需求 —— 無論 ...
2025-09-18MySQL 大表拆分與關(guān)聯(lián)查詢效率:打破 “拆分必慢” 的認(rèn)知誤區(qū) 在 MySQL 數(shù)據(jù)庫管理中,“大表” 始終是性能優(yōu)化繞不開的話題。 ...
2025-09-18CDA 數(shù)據(jù)分析師:表結(jié)構(gòu)數(shù)據(jù) “獲取 - 加工 - 使用” 全流程的賦能者 表結(jié)構(gòu)數(shù)據(jù)(如數(shù)據(jù)庫表、Excel 表、CSV 文件)是企業(yè)數(shù)字 ...
2025-09-18DSGE 模型中的 Et:理性預(yù)期算子的內(nèi)涵、作用與應(yīng)用解析 動態(tài)隨機一般均衡(Dynamic Stochastic General Equilibrium, DSGE)模 ...
2025-09-17Python 提取 TIF 中地名的完整指南 一、先明確:TIF 中的地名有哪兩種存在形式? 在開始提取前,需先判斷 TIF 文件的類型 —— ...
2025-09-17CDA 數(shù)據(jù)分析師:解鎖表結(jié)構(gòu)數(shù)據(jù)特征價值的專業(yè)核心 表結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 規(guī)范存儲的結(jié)構(gòu)化數(shù)據(jù),如數(shù)據(jù)庫表、Excel 表、 ...
2025-09-17Excel 導(dǎo)入數(shù)據(jù)含缺失值?詳解 dropna 函數(shù)的功能與實戰(zhàn)應(yīng)用 在用 Python(如 pandas 庫)處理 Excel 數(shù)據(jù)時,“缺失值” 是高頻 ...
2025-09-16深入解析卡方檢驗與 t 檢驗:差異、適用場景與實踐應(yīng)用 在數(shù)據(jù)分析與統(tǒng)計學(xué)領(lǐng)域,假設(shè)檢驗是驗證研究假設(shè)、判斷數(shù)據(jù)差異是否 “ ...
2025-09-16CDA 數(shù)據(jù)分析師:掌控表格結(jié)構(gòu)數(shù)據(jù)全功能周期的專業(yè)操盤手 表格結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 存儲的結(jié)構(gòu)化數(shù)據(jù),如 Excel 表、數(shù)據(jù) ...
2025-09-16MySQL 執(zhí)行計劃中 rows 數(shù)量的準(zhǔn)確性解析:原理、影響因素與優(yōu)化 在 MySQL SQL 調(diào)優(yōu)中,EXPLAIN執(zhí)行計劃是核心工具,而其中的row ...
2025-09-15解析 Python 中 Response 對象的 text 與 content:區(qū)別、場景與實踐指南 在 Python 進行 HTTP 網(wǎng)絡(luò)請求開發(fā)時(如使用requests ...
2025-09-15CDA 數(shù)據(jù)分析師:激活表格結(jié)構(gòu)數(shù)據(jù)價值的核心操盤手 表格結(jié)構(gòu)數(shù)據(jù)(如 Excel 表格、數(shù)據(jù)庫表)是企業(yè)最基礎(chǔ)、最核心的數(shù)據(jù)形態(tài) ...
2025-09-15Python HTTP 請求工具對比:urllib.request 與 requests 的核心差異與選擇指南 在 Python 處理 HTTP 請求(如接口調(diào)用、數(shù)據(jù)爬取 ...
2025-09-12解決 pd.read_csv 讀取長浮點數(shù)據(jù)的科學(xué)計數(shù)法問題 為幫助 Python 數(shù)據(jù)從業(yè)者解決pd.read_csv讀取長浮點數(shù)據(jù)時的科學(xué)計數(shù)法問題 ...
2025-09-12CDA 數(shù)據(jù)分析師:業(yè)務(wù)數(shù)據(jù)分析步驟的落地者與價值優(yōu)化者 業(yè)務(wù)數(shù)據(jù)分析是企業(yè)解決日常運營問題、提升執(zhí)行效率的核心手段,其價值 ...
2025-09-12用 SQL 驗證業(yè)務(wù)邏輯:從規(guī)則拆解到數(shù)據(jù)把關(guān)的實戰(zhàn)指南 在業(yè)務(wù)系統(tǒng)落地過程中,“業(yè)務(wù)邏輯” 是連接 “需求設(shè)計” 與 “用戶體驗 ...
2025-09-11塔吉特百貨孕婦營銷案例:數(shù)據(jù)驅(qū)動下的精準(zhǔn)零售革命與啟示 在零售行業(yè) “流量紅利見頂” 的當(dāng)下,精準(zhǔn)營銷成為企業(yè)突圍的核心方 ...
2025-09-11CDA 數(shù)據(jù)分析師與戰(zhàn)略 / 業(yè)務(wù)數(shù)據(jù)分析:概念辨析與協(xié)同價值 在數(shù)據(jù)驅(qū)動決策的體系中,“戰(zhàn)略數(shù)據(jù)分析”“業(yè)務(wù)數(shù)據(jù)分析” 是企業(yè) ...
2025-09-11Excel 數(shù)據(jù)聚類分析:從操作實踐到業(yè)務(wù)價值挖掘 在數(shù)據(jù)分析場景中,聚類分析作為 “無監(jiān)督分組” 的核心工具,能從雜亂數(shù)據(jù)中挖 ...
2025-09-10統(tǒng)計模型的核心目的:從數(shù)據(jù)解讀到?jīng)Q策支撐的價值導(dǎo)向 統(tǒng)計模型作為數(shù)據(jù)分析的核心工具,并非簡單的 “公式堆砌”,而是圍繞特定 ...
2025-09-10