
Python中使用多進(jìn)程來(lái)實(shí)現(xiàn)并行處理的方法小結(jié)
本篇文章主要介紹了Python中使用多進(jìn)程來(lái)實(shí)現(xiàn)并行處理的方法小結(jié),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
進(jìn)程和線程是計(jì)算機(jī)軟件領(lǐng)域里很重要的概念,進(jìn)程和線程有區(qū)別,也有著密切的聯(lián)系,先來(lái)辨析一下這兩個(gè)概念:
1.定義
進(jìn)程是具有一定獨(dú)立功能的程序關(guān)于某個(gè)數(shù)據(jù)集合上的一次運(yùn)行活動(dòng),進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位.
線程是進(jìn)程的一個(gè)實(shí)體,是CPU調(diào)度和分派的基本單位,它是比進(jìn)程更小的能獨(dú)立運(yùn)行的基本單位.線程自己基本上不擁有系統(tǒng)資源,只擁有一點(diǎn)在運(yùn)行中必不可少的資源(如程序計(jì)數(shù)器,一組寄存器和棧),但是它可與同屬一個(gè)進(jìn)程的其他的線程共享進(jìn)程所擁有的全部資源.
2.關(guān)系
一個(gè)線程可以創(chuàng)建和撤銷(xiāo)另一個(gè)線程;同一個(gè)進(jìn)程中的多個(gè)線程之間可以并發(fā)執(zhí)行.
相對(duì)進(jìn)程而言,線程是一個(gè)更加接近于執(zhí)行體的概念,它可以與同進(jìn)程中的其他線程共享數(shù)據(jù),但擁有自己的棧空間,擁有獨(dú)立的執(zhí)行序列。
3.區(qū)別
進(jìn)程和線程的主要差別在于它們是不同的操作系統(tǒng)資源管理方式。進(jìn)程有獨(dú)立的地址空間,一個(gè)進(jìn)程崩潰后,在保護(hù)模式下不會(huì)對(duì)其它進(jìn)程產(chǎn)生影響,而線程只是一個(gè)進(jìn)程中的不同執(zhí)行路徑。線程有自己的堆棧和局部變量,但線程之間沒(méi)有單獨(dú)的地址空間,一個(gè)線程死掉就等于整個(gè)進(jìn)程死掉,所以多進(jìn)程的程序要比多線程的程序健壯,但在進(jìn)程切換時(shí),耗費(fèi)資源較大,效率要差一些。但對(duì)于一些要求同時(shí)進(jìn)行并且又要共享某些變量的并發(fā)操作,只能用線程,不能用進(jìn)程。
1) 簡(jiǎn)而言之,一個(gè)程序至少有一個(gè)進(jìn)程,一個(gè)進(jìn)程至少有一個(gè)線程.
2) 線程的劃分尺度小于進(jìn)程,使得多線程程序的并發(fā)性高。
3) 另外,進(jìn)程在執(zhí)行過(guò)程中擁有獨(dú)立的內(nèi)存單元,而多個(gè)線程共享內(nèi)存,從而極大地提高了程序的運(yùn)行效率。
4) 線程在執(zhí)行過(guò)程中與進(jìn)程還是有區(qū)別的。每個(gè)獨(dú)立的線程有一個(gè)程序運(yùn)行的入口、順序執(zhí)行序列和程序的出口。但是線程不能夠獨(dú)立執(zhí)行,必須依存在應(yīng)用程序中,由應(yīng)用程序提供多個(gè)線程執(zhí)行控制。
5) 從邏輯角度來(lái)看,多線程的意義在于一個(gè)應(yīng)用程序中,有多個(gè)執(zhí)行部分可以同時(shí)執(zhí)行。但操作系統(tǒng)并沒(méi)有將多個(gè)線程看做多個(gè)獨(dú)立的應(yīng)用,來(lái)實(shí)現(xiàn)進(jìn)程的調(diào)度和管理以及資源分配。這就是進(jìn)程和線程的重要區(qū)別。
4.優(yōu)缺點(diǎn)
線程和進(jìn)程在使用上各有優(yōu)缺點(diǎn):線程執(zhí)行開(kāi)銷(xiāo)小,但不利于資源的管理和保護(hù);而進(jìn)程正相反。同時(shí),線程適合于在SMP機(jī)器上運(yùn)行,而進(jìn)程則可以跨機(jī)器遷移。
這篇文章主要講多進(jìn)程在Python中的應(yīng)用
Unix/Linux操作系統(tǒng)提供了一個(gè)fork()系統(tǒng)調(diào)用,它非常特殊。普通的函數(shù)調(diào)用,調(diào)用一次,返回一次,但是fork()調(diào)用一次,返回兩次,因?yàn)椴僮飨到y(tǒng)自動(dòng)把當(dāng)前進(jìn)程(稱為父進(jìn)程)復(fù)制了一份(稱為子進(jìn)程),然后,分別在父進(jìn)程和子進(jìn)程內(nèi)返回。
子進(jìn)程永遠(yuǎn)返回0,而父進(jìn)程返回子進(jìn)程的ID。這樣做的理由是,一個(gè)父進(jìn)程可以fork出很多子進(jìn)程,所以,父進(jìn)程要記下每個(gè)子進(jìn)程的ID,而子進(jìn)程只需要調(diào)用getpid()就可以拿到父進(jìn)程的ID。
python的os模塊封裝了常見(jiàn)的系統(tǒng)調(diào)用,其中就包括fork,可以在Python程序中輕松創(chuàng)建子進(jìn)程:
import os
print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print('I (%s) just created a child process (%s).' % (os.getpid(), pid))
運(yùn)行結(jié)果如下:
Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.
由于Windows沒(méi)有fork調(diào)用,上面的代碼在Windows上無(wú)法運(yùn)行。
有了fork調(diào)用,一個(gè)進(jìn)程在接到新任務(wù)時(shí)就可以復(fù)制出一個(gè)子進(jìn)程來(lái)處理新任務(wù),常見(jiàn)的Apache服務(wù)器就是由父進(jìn)程監(jiān)聽(tīng)端口,每當(dāng)有新的http請(qǐng)求時(shí),就fork出子進(jìn)程來(lái)處理新的http請(qǐng)求。
multiprocessing
如果你打算編寫(xiě)多進(jìn)程的服務(wù)程序,Unix/linux無(wú)疑是正確的選擇。由于Windows沒(méi)有fork調(diào)用,難道在Windows上無(wú)法用Python編寫(xiě)多進(jìn)程的程序?
由于Python是跨平臺(tái)的,自然也應(yīng)該提供一個(gè)跨平臺(tái)的多進(jìn)程支持。multiprocessing模塊就是跨平臺(tái)版本的多進(jìn)程模塊。
multiprocessing模塊提供了一個(gè)Process類來(lái)代表一個(gè)進(jìn)程對(duì)象,下面的例子演示了啟動(dòng)一個(gè)子進(jìn)程并等待其結(jié)束:
from multiprocessing import Process
import os
# 子進(jìn)程要執(zhí)行的代碼
def run_proc(name):
print('Run child process %s (%s)...' % (name, os.getpid()))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Process(target=run_proc, args=('test',))
print('Child process will start.')
p.start()
p.join()
print('Child process end.')
創(chuàng)建子進(jìn)程時(shí),只需要傳入一個(gè)執(zhí)行函數(shù)和函數(shù)的參數(shù),創(chuàng)建一個(gè)Process實(shí)例,用start()方法啟動(dòng),這樣創(chuàng)建進(jìn)程比f(wàn)ork()還要簡(jiǎn)單。
join()方法可以等待子進(jìn)程結(jié)束后再繼續(xù)往下運(yùn)行,通常用于進(jìn)程間的同步。
Pool
如果要啟動(dòng)大量的子進(jìn)程,可以用進(jìn)程池的方式批量創(chuàng)建子進(jìn)程:
from multiprocessing import Pool
import os, time, random
def long_time_task(name):
print('Run task %s (%s)...' % (name, os.getpid()))
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__=='__main__':
print('Parent process %s.' % os.getpid())
p = Pool(4)
for i in range(5):
p.apply_async(long_time_task, args=(i,))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
執(zhí)行結(jié)果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.
代碼解讀:
對(duì)Pool對(duì)象調(diào)用join()方法會(huì)等待所有子進(jìn)程執(zhí)行完畢,調(diào)用join()之前必須先調(diào)用close(),調(diào)用close()之后就不能繼續(xù)添加新的Process了。
請(qǐng)注意輸出的結(jié)果,task 0,1,2,3是立刻執(zhí)行的,而task 4要等待前面某個(gè)task完成后才執(zhí)行,這是因?yàn)镻ool的默認(rèn)大小在我的電腦上是4,因此,最多同時(shí)執(zhí)行4個(gè)進(jìn)程。這是Pool有意設(shè)計(jì)的限制,并不是操作系統(tǒng)的限制。如果改成:
p = Pool(5)
就可以同時(shí)跑5個(gè)進(jìn)程。
由于Pool的默認(rèn)大小是CPU的核數(shù),如果你不幸擁有8核CPU,你要提交至少9個(gè)子進(jìn)程才能看到上面的等待效果。
子進(jìn)程
很多時(shí)候,子進(jìn)程并不是自身,而是一個(gè)外部進(jìn)程。我們創(chuàng)建了子進(jìn)程后,還需要控制子進(jìn)程的輸入和輸出。
subprocess模塊可以讓我們非常方便地啟動(dòng)一個(gè)子進(jìn)程,然后控制其輸入和輸出。
下面的例子演示了如何在Python代碼中運(yùn)行命令nslookup www.python.org,這和命令行直接運(yùn)行的效果是一樣的:
import subprocess
print('$ nslookup www.python.org')
r = subprocess.call(['nslookup', 'www.python.org'])
print('Exit code:', r)
運(yùn)行結(jié)果:
$ nslookup www.python.org
Server: 192.168.19.4
Address: 192.168.19.4#53
Non-authoritative answer:
www.python.org canonical name = python.map.fastly.net.
Name: python.map.fastly.net
Address: 199.27.79.223
Exit code: 0
如果子進(jìn)程還需要輸入,則可以通過(guò)communicate()方法輸入:
import subprocess
print('$ nslookup')
p = subprocess.Popen(['nslookup'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, err = p.communicate(b'set q=mx\npython.org\nexit\n')
print(output.decode('utf-8'))
print('Exit code:', p.returncode)
上面的代碼相當(dāng)于在命令行執(zhí)行命令nslookup,然后手動(dòng)輸入:
set q=mx
python.org
exit
進(jìn)程間通信
Process之間肯定是需要通信的,操作系統(tǒng)提供了很多機(jī)制來(lái)實(shí)現(xiàn)進(jìn)程間的通信。Python的multiprocessing模塊包裝了底層的機(jī)制,提供了Queue、Pipes等多種方式來(lái)交換數(shù)據(jù)。
我們以Queue為例,在父進(jìn)程中創(chuàng)建兩個(gè)子進(jìn)程,一個(gè)往Queue里寫(xiě)數(shù)據(jù),一個(gè)從Queue里讀數(shù)據(jù):
from multiprocessing import Process, Queue
import os, time, random
# 寫(xiě)數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def write(q):
print('Process to write: %s' % os.getpid())
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 讀數(shù)據(jù)進(jìn)程執(zhí)行的代碼:
def read(q):
print('Process to read: %s' % os.getpid())
while True:
value = q.get(True)
print('Get %s from queue.' % value)
if __name__=='__main__':
# 父進(jìn)程創(chuàng)建Queue,并傳給各個(gè)子進(jìn)程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 啟動(dòng)子進(jìn)程pw,寫(xiě)入:
pw.start()
# 啟動(dòng)子進(jìn)程pr,讀取:
pr.start()
# 等待pw結(jié)束:
pw.join()
# pr進(jìn)程里是死循環(huán),無(wú)法等待其結(jié)束,只能強(qiáng)行終止:
pr.terminate()
運(yùn)行結(jié)果如下:
Process to write: 50563
Put A to queue...
Process to read: 50564
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.
在Unix/Linux下,multiprocessing模塊封裝了fork()調(diào)用,使我們不需要關(guān)注fork()的細(xì)節(jié)。由于Windows沒(méi)有fork調(diào)用,因此,multiprocessing需要“模擬”出fork的效果,父進(jìn)程所有Python對(duì)象都必須通過(guò)pickle序列化再傳到子進(jìn)程去,所有,如果multiprocessing在Windows下調(diào)用失敗了,要先考慮是不是pickle失敗了。
小結(jié)
在Unix/Linux下,可以使用fork()調(diào)用實(shí)現(xiàn)多進(jìn)程。
要實(shí)現(xiàn)跨平臺(tái)的多進(jìn)程,可以使用multiprocessing模塊。
進(jìn)程間通信是通過(guò)Queue、Pipes等實(shí)現(xiàn)的。
多線程
多任務(wù)可以由多進(jìn)程完成,也可以由一個(gè)進(jìn)程內(nèi)的多線程完成。進(jìn)程是由若干線程組成的,一個(gè)進(jìn)程至少有一個(gè)線程。
由于線程是操作系統(tǒng)直接支持的執(zhí)行單元,因此,高級(jí)語(yǔ)言通常都內(nèi)置多線程的支持,Python也不例外,并且,Python的線程是真正的Posix Thread,而不是模擬出來(lái)的線程。
Python的標(biāo)準(zhǔn)庫(kù)提供了兩個(gè)模塊:_thread 和 threading,_thread是低級(jí)模塊,threading是高級(jí)模塊,對(duì)_thread進(jìn)行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個(gè)高級(jí)模塊。
啟動(dòng)一個(gè)線程就是把一個(gè)函數(shù)傳入并創(chuàng)建Thread實(shí)例,然后調(diào)用start()開(kāi)始執(zhí)行:
import time, threading
# 新線程執(zhí)行的代碼:
def loop():
print('thread %s is running...' % threading.current_thread().name)
n = 0
while n < 5:
n = n + 1
print('thread %s >>> %s' % (threading.current_thread().name, n))
time.sleep(1)
print('thread %s ended.' % threading.current_thread().name)
print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
thread MainThread is running...
thread LoopThread is running...
thread LoopThread >>> 1
thread LoopThread >>> 2
thread LoopThread >>> 3
thread LoopThread >>> 4
thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
由于任何進(jìn)程默認(rèn)就會(huì)啟動(dòng)一個(gè)線程,我們把該線程稱為主線程,主線程又可以啟動(dòng)新的線程,Python的threading模塊有個(gè)current_thread()函數(shù),它永遠(yuǎn)返回當(dāng)前線程的實(shí)例。主線程實(shí)例的名字叫MainThread,子線程的名字在創(chuàng)建時(shí)指定,我們用LoopThread命名子線程。名字僅僅在打印時(shí)用來(lái)顯示,完全沒(méi)有其他意義,如果不起名字Python就自動(dòng)給線程命名為T(mén)hread-1,Thread-2……
Lock
多線程和多進(jìn)程最大的不同在于,多進(jìn)程中,同一個(gè)變量,各自有一份拷貝存在于每個(gè)進(jìn)程中,互不影響,而多線程中,所有變量都由所有線程共享,所以,任何一個(gè)變量都可以被任何一個(gè)線程修改,因此,線程之間共享數(shù)據(jù)最大的危險(xiǎn)在于多個(gè)線程同時(shí)改一個(gè)變量,把內(nèi)容給改亂了。
來(lái)看看多個(gè)線程同時(shí)操作一個(gè)變量怎么把內(nèi)容給改亂了:
import time, threading
# 假定這是你的銀行存款:
balance = 0
def change_it(n):
# 先存后取,結(jié)果應(yīng)該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print(balance)
我們定義了一個(gè)共享變量balance,初始值為0,并且啟動(dòng)兩個(gè)線程,先存后取,理論上結(jié)果應(yīng)該為0,但是,由于線程的調(diào)度是由操作系統(tǒng)決定的,當(dāng)t1、t2交替執(zhí)行時(shí),只要循環(huán)次數(shù)足夠多,balance的結(jié)果就不一定是0了。
原因是因?yàn)楦呒?jí)語(yǔ)言的一條語(yǔ)句在CPU執(zhí)行時(shí)是若干條語(yǔ)句,即使一個(gè)簡(jiǎn)單的計(jì)算:
balance = balance + n
也分兩步:
計(jì)算balance + n,存入臨時(shí)變量中;
將臨時(shí)變量的值賦給balance。
也就是可以看成:
x = balance + n
balance = x
數(shù)據(jù)錯(cuò)誤的原因:是因?yàn)樾薷腷alance需要多條語(yǔ)句,而執(zhí)行這幾條語(yǔ)句時(shí),線程可能中斷,從而導(dǎo)致多個(gè)線程把同一個(gè)對(duì)象的內(nèi)容改亂了。
兩個(gè)線程同時(shí)一存一取,就可能導(dǎo)致余額不對(duì),你肯定不希望你的銀行存款莫名其妙地變成了負(fù)數(shù),所以,我們必須確保一個(gè)線程在修改balance的時(shí)候,別的線程一定不能改。
如果我們要確保balance計(jì)算正確,就要給change_it()上一把鎖,當(dāng)某個(gè)線程開(kāi)始執(zhí)行change_it()時(shí),我們說(shuō),該線程因?yàn)楂@得了鎖,因此其他線程不能同時(shí)執(zhí)行change_it(),只能等待,直到鎖被釋放后,獲得該鎖以后才能改。由于鎖只有一個(gè),無(wú)論多少線程,同一時(shí)刻最多只有一個(gè)線程持有該鎖,所以,不會(huì)造成修改的沖突。創(chuàng)建一個(gè)鎖就是通過(guò)threading.Lock()來(lái)實(shí)現(xiàn):
balance = 0
lock = threading.Lock()
def run_thread(n):
for i in range(100000):
# 先要獲取鎖:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要釋放鎖:
lock.release()
當(dāng)多個(gè)線程同時(shí)執(zhí)行l(wèi)ock.acquire()時(shí),只有一個(gè)線程能成功地獲取鎖,然后繼續(xù)執(zhí)行代碼,其他線程就繼續(xù)等待直到獲得鎖為止。
獲得鎖的線程用完后一定要釋放鎖,否則那些苦苦等待鎖的線程將永遠(yuǎn)等待下去,成為死線程。所以我們用try...finally來(lái)確保鎖一定會(huì)被釋放。
鎖的好處就是確保了某段關(guān)鍵代碼只能由一個(gè)線程從頭到尾完整地執(zhí)行,壞處當(dāng)然也很多,首先是阻止了多線程并發(fā)執(zhí)行,包含鎖的某段代碼實(shí)際上只能以單線程模式執(zhí)行,效率就大大地下降了。其次,由于可以存在多個(gè)鎖,不同的線程持有不同的鎖,并試圖獲取對(duì)方持有的鎖時(shí),可能會(huì)造成死鎖,導(dǎo)致多個(gè)線程全部掛起,既不能執(zhí)行,也無(wú)法結(jié)束,只能靠操作系統(tǒng)強(qiáng)制終止。
多核CPU
如果你不幸擁有一個(gè)多核CPU,你肯定在想,多核應(yīng)該可以同時(shí)執(zhí)行多個(gè)線程。
如果寫(xiě)一個(gè)死循環(huán)的話,會(huì)出現(xiàn)什么情況呢?
打開(kāi)Mac OS X的Activity Monitor,或者Windows的Task Manager,都可以監(jiān)控某個(gè)進(jìn)程的CPU使用率。
我們可以監(jiān)控到一個(gè)死循環(huán)線程會(huì)100%占用一個(gè)CPU。如果有兩個(gè)死循環(huán)線程,在多核CPU中,可以監(jiān)控到會(huì)占用200%的CPU,也就是占用兩個(gè)CPU核心。要想把N核CPU的核心全部跑滿,就必須啟動(dòng)N個(gè)死循環(huán)線程。
試試用Python寫(xiě)個(gè)死循環(huán):
import threading, multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
啟動(dòng)與CPU核心數(shù)量相同的N個(gè)線程,在4核CPU上可以監(jiān)控到CPU占用率僅有102%,也就是僅使用了一核。
但是用C、C++或Java來(lái)改寫(xiě)相同的死循環(huán),直接可以把全部核心跑滿,4核就跑到400%,8核就跑到800%,為什么Python不行呢?
因?yàn)镻ython的線程雖然是真正的線程,但解釋器執(zhí)行代碼時(shí),有一個(gè)GIL鎖:Global Interpreter Lock,任何Python線程執(zhí)行前,必須先獲得GIL鎖,然后,每執(zhí)行100條字節(jié)碼,解釋器就自動(dòng)釋放GIL鎖,讓別的線程有機(jī)會(huì)執(zhí)行。這個(gè)GIL全局鎖實(shí)際上把所有線程的執(zhí)行代碼都給上了鎖,所以,多線程在Python中只能交替執(zhí)行,即使100個(gè)線程跑在100核CPU上,也只能用到1個(gè)核。
GIL是Python解釋器設(shè)計(jì)的歷史遺留問(wèn)題,通常我們用的解釋器是官方實(shí)現(xiàn)的CPython,要真正利用多核,除非重寫(xiě)一個(gè)不帶GIL的解釋器。
所以,在Python中,可以使用多線程,但不要指望能有效利用多核。如果一定要通過(guò)多線程利用多核,那只能通過(guò)C擴(kuò)展來(lái)實(shí)現(xiàn),不過(guò)這樣就失去了Python簡(jiǎn)單易用的特點(diǎn)。
不過(guò),也不用過(guò)于擔(dān)心,Python雖然不能利用多線程實(shí)現(xiàn)多核任務(wù),但可以通過(guò)多進(jìn)程實(shí)現(xiàn)多核任務(wù)。多個(gè)Python進(jìn)程有各自獨(dú)立的GIL鎖,互不影響。
多線程編程,模型復(fù)雜,容易發(fā)生沖突,必須用鎖加以隔離,同時(shí),又要小心死鎖的發(fā)生。
Python解釋器由于設(shè)計(jì)時(shí)有GIL全局鎖,導(dǎo)致了多線程無(wú)法利用多核。
ThreadLocal
在多線程環(huán)境下,每個(gè)線程都有自己的數(shù)據(jù)。一個(gè)線程使用自己的局部變量比使用全局變量好,因?yàn)榫植孔兞恐挥芯€程自己能看見(jiàn),不會(huì)影響其他線程,而全局變量的修改必須加鎖。但是局部變量也有問(wèn)題,就是在函數(shù)調(diào)用的時(shí)候,傳遞起來(lái)很麻煩:
import threading
# 創(chuàng)建全局ThreadLocal對(duì)象:
local_school = threading.local()
def process_student():
# 獲取當(dāng)前線程關(guān)聯(lián)的student:
std = local_school.student
print('Hello, %s (in %s)' % (std, threading.current_thread().name))
def process_thread(name):
# 綁定ThreadLocal的student:
local_school.student = name
process_student()
t1 = threading.Thread(target= process_thread, args=('Alice',), name='Thread-A')
t2 = threading.Thread(target= process_thread, args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
全局變量local_school就是一個(gè)ThreadLocal對(duì)象,每個(gè)Thread對(duì)它都可以讀寫(xiě)student屬性,但互不影響。你可以把local_school看成全局變量,但每個(gè)屬性如local_school.student都是線程的局部變量,可以任意讀寫(xiě)而互不干擾,也不用管理鎖的問(wèn)題,ThreadLocal內(nèi)部會(huì)處理。
可以理解為全局變量local_school是一個(gè)dict,不但可以用local_school.student,還可以綁定其他變量,如local_school.teacher等等。
ThreadLocal最常用的地方就是為每個(gè)線程綁定一個(gè)數(shù)據(jù)庫(kù)連接,HTTP請(qǐng)求,用戶身份信息等,這樣一個(gè)線程的所有調(diào)用到的處理函數(shù)都可以非常方便地訪問(wèn)這些資源。
一個(gè)ThreadLocal變量雖然是全局變量,但每個(gè)線程都只能讀寫(xiě)自己線程的獨(dú)立副本,互不干擾。ThreadLocal解決了參數(shù)在一個(gè)線程中各個(gè)函數(shù)之間互相傳遞的問(wèn)題。
進(jìn)程 vs. 線程
我們介紹了多進(jìn)程和多線程,這是實(shí)現(xiàn)多任務(wù)最常用的兩種方式?,F(xiàn)在,我們來(lái)討論一下這兩種方式的優(yōu)缺點(diǎn)。
首先,要實(shí)現(xiàn)多任務(wù),通常我們會(huì)設(shè)計(jì)Master-Worker模式,Master負(fù)責(zé)分配任務(wù),Worker負(fù)責(zé)執(zhí)行任務(wù),因此,多任務(wù)環(huán)境下,通常是一個(gè)Master,多個(gè)Worker。
如果用多進(jìn)程實(shí)現(xiàn)Master-Worker,主進(jìn)程就是Master,其他進(jìn)程就是Worker。
如果用多線程實(shí)現(xiàn)Master-Worker,主線程就是Master,其他線程就是Worker。
多進(jìn)程模式最大的優(yōu)點(diǎn)就是穩(wěn)定性高,因?yàn)橐粋€(gè)子進(jìn)程崩潰了,不會(huì)影響主進(jìn)程和其他子進(jìn)程。(當(dāng)然主進(jìn)程掛了所有進(jìn)程就全掛了,但是Master進(jìn)程只負(fù)責(zé)分配任務(wù),掛掉的概率低)著名的Apache最早就是采用多進(jìn)程模式。
多進(jìn)程模式的缺點(diǎn)是創(chuàng)建進(jìn)程的代價(jià)大,在Unix/Linux系統(tǒng)下,用fork調(diào)用還行,在Windows下創(chuàng)建進(jìn)程開(kāi)銷(xiāo)巨大。另外,操作系統(tǒng)能同時(shí)運(yùn)行的進(jìn)程數(shù)也是有限的,在內(nèi)存和CPU的限制下,如果有幾千個(gè)進(jìn)程同時(shí)運(yùn)行,操作系統(tǒng)連調(diào)度都會(huì)成問(wèn)題。
多線程模式通常比多進(jìn)程快一點(diǎn),但是也快不到哪去,而且,多線程模式致命的缺點(diǎn)就是任何一個(gè)線程掛掉都可能直接造成整個(gè)進(jìn)程崩潰,因?yàn)樗芯€程共享進(jìn)程的內(nèi)存。在Windows上,如果一個(gè)線程執(zhí)行的代碼出了問(wèn)題,你經(jīng)常可以看到這樣的提示:“該程序執(zhí)行了非法操作,即將關(guān)閉”,其實(shí)往往是某個(gè)線程出了問(wèn)題,但是操作系統(tǒng)會(huì)強(qiáng)制結(jié)束整個(gè)進(jìn)程。
在Windows下,多線程的效率比多進(jìn)程要高,所以微軟的IIS服務(wù)器默認(rèn)采用多線程模式。由于多線程存在穩(wěn)定性的問(wèn)題,IIS的穩(wěn)定性就不如Apache。為了緩解這個(gè)問(wèn)題,IIS和Apache現(xiàn)在又有多進(jìn)程+多線程的混合模式,真是把問(wèn)題越搞越復(fù)雜。
線程切換
無(wú)論是多進(jìn)程還是多線程,只要數(shù)量一多,效率肯定上不去,為什么呢?
我們打個(gè)比方,假設(shè)你不幸正在準(zhǔn)備中考,每天晚上需要做語(yǔ)文、數(shù)學(xué)、英語(yǔ)、物理、化學(xué)這5科的作業(yè),每項(xiàng)作業(yè)耗時(shí)1小時(shí)。
如果你先花1小時(shí)做語(yǔ)文作業(yè),做完了,再花1小時(shí)做數(shù)學(xué)作業(yè),這樣,依次全部做完,一共花5小時(shí),這種方式稱為單任務(wù)模型,或者批處理任務(wù)模型。
假設(shè)你打算切換到多任務(wù)模型,可以先做1分鐘語(yǔ)文,再切換到數(shù)學(xué)作業(yè),做1分鐘,再切換到英語(yǔ),以此類推,只要切換速度足夠快,這種方式就和單核CPU執(zhí)行多任務(wù)是一樣的了,以幼兒園小朋友的眼光來(lái)看,你就正在同時(shí)寫(xiě)5科作業(yè)。
但是,切換作業(yè)是有代價(jià)的,比如從語(yǔ)文切到數(shù)學(xué),要先收拾桌子上的語(yǔ)文書(shū)本、鋼筆(這叫保存現(xiàn)場(chǎng)),然后,打開(kāi)數(shù)學(xué)課本、找出圓規(guī)直尺(這叫準(zhǔn)備新環(huán)境),才能開(kāi)始做數(shù)學(xué)作業(yè)。操作系統(tǒng)在切換進(jìn)程或者線程時(shí)也是一樣的,它需要先保存當(dāng)前執(zhí)行的現(xiàn)場(chǎng)環(huán)境(CPU寄存器狀態(tài)、內(nèi)存頁(yè)等),然后,把新任務(wù)的執(zhí)行環(huán)境準(zhǔn)備好(恢復(fù)上次的寄存器狀態(tài),切換內(nèi)存頁(yè)等),才能開(kāi)始執(zhí)行。這個(gè)切換過(guò)程雖然很快,但是也需要耗費(fèi)時(shí)間。如果有幾千個(gè)任務(wù)同時(shí)進(jìn)行,操作系統(tǒng)可能就主要忙著切換任務(wù),根本沒(méi)有多少時(shí)間去執(zhí)行任務(wù)了,這種情況最常見(jiàn)的就是硬盤(pán)狂響,點(diǎn)窗口無(wú)反應(yīng),系統(tǒng)處于假死狀態(tài)。
所以,多任務(wù)一旦多到一個(gè)限度,就會(huì)消耗掉系統(tǒng)所有的資源,結(jié)果效率急劇下降,所有任務(wù)都做不好。
計(jì)算密集型 vs. IO密集型
是否采用多任務(wù)的第二個(gè)考慮是任務(wù)的類型。我們可以把任務(wù)分為計(jì)算密集型和IO密集型。
計(jì)算密集型任務(wù)的特點(diǎn)是要進(jìn)行大量的計(jì)算,消耗CPU資源,比如計(jì)算圓周率、對(duì)視頻進(jìn)行高清解碼等等,全靠CPU的運(yùn)算能力。這種計(jì)算密集型任務(wù)雖然也可以用多任務(wù)完成,但是任務(wù)越多,花在任務(wù)切換的時(shí)間就越多,CPU執(zhí)行任務(wù)的效率就越低,所以,要最高效地利用CPU,計(jì)算密集型任務(wù)同時(shí)進(jìn)行的數(shù)量應(yīng)當(dāng)?shù)扔贑PU的核心數(shù)。
計(jì)算密集型任務(wù)由于主要消耗CPU資源,因此,代碼運(yùn)行效率至關(guān)重要。Python這樣的腳本語(yǔ)言運(yùn)行效率很低,完全不適合計(jì)算密集型任務(wù)。對(duì)于計(jì)算密集型任務(wù),最好用C語(yǔ)言編寫(xiě)。
第二種任務(wù)的類型是IO密集型,涉及到網(wǎng)絡(luò)、磁盤(pán)IO的任務(wù)都是IO密集型任務(wù),這類任務(wù)的特點(diǎn)是CPU消耗很少,任務(wù)的大部分時(shí)間都在等待IO操作完成(因?yàn)镮O的速度遠(yuǎn)遠(yuǎn)低于CPU和內(nèi)存的速度)。對(duì)于IO密集型任務(wù),任務(wù)越多,CPU效率越高,但也有一個(gè)限度。常見(jiàn)的大部分任務(wù)都是IO密集型任務(wù),比如Web應(yīng)用。
IO密集型任務(wù)執(zhí)行期間,99%的時(shí)間都花在IO上,花在CPU上的時(shí)間很少,因此,用運(yùn)行速度極快的c語(yǔ)言替換用Python這樣運(yùn)行速度極低的腳本語(yǔ)言,完全無(wú)法提升運(yùn)行效率。對(duì)于IO密集型任務(wù),最合適的語(yǔ)言就是開(kāi)發(fā)效率最高(代碼量最少)的語(yǔ)言,腳本語(yǔ)言是首選,C語(yǔ)言最差。
異步IO
考慮到CPU和IO之間巨大的速度差異,一個(gè)任務(wù)在執(zhí)行的過(guò)程中大部分時(shí)間都在等待IO操作,單進(jìn)程單線程模型會(huì)導(dǎo)致別的任務(wù)無(wú)法并行執(zhí)行,因此,我們才需要多進(jìn)程模型或者多線程模型來(lái)支持多任務(wù)并發(fā)執(zhí)行。
現(xiàn)代操作系統(tǒng)對(duì)IO操作已經(jīng)做了巨大的改進(jìn),最大的特點(diǎn)就是支持異步IO。如果充分利用操作系統(tǒng)提供的異步IO支持,就可以用單進(jìn)程單線程模型來(lái)執(zhí)行多任務(wù),這種全新的模型稱為事件驅(qū)動(dòng)模型,Nginx就是支持異步IO的Web服務(wù)器,它在單核CPU上采用單進(jìn)程模型就可以高效地支持多任務(wù)。在多核CPU上,可以運(yùn)行多個(gè)進(jìn)程(數(shù)量與CPU核心數(shù)相同),充分利用多核CPU。由于系統(tǒng)總的進(jìn)程數(shù)量十分有限,因此操作系統(tǒng)調(diào)度非常高效。用異步IO編程模型來(lái)實(shí)現(xiàn)多任務(wù)是一個(gè)主要的趨勢(shì)。
對(duì)應(yīng)到Python語(yǔ)言,單進(jìn)程的異步編程模型稱為協(xié)程,有了協(xié)程的支持,就可以基于事件驅(qū)動(dòng)編寫(xiě)高效的多任務(wù)程序。我們會(huì)在后面討論如何編寫(xiě)協(xié)程。
分布式進(jìn)程
在Thread和Process中,應(yīng)當(dāng)優(yōu)選Process,因?yàn)镻rocess更穩(wěn)定,而且,Process可以分布到多臺(tái)機(jī)器上,而Thread最多只能分布到同一臺(tái)機(jī)器的多個(gè)CPU上。
Python的multiprocessing模塊不但支持多進(jìn)程,其中managers子模塊還支持把多進(jìn)程分布到多臺(tái)機(jī)器上。一個(gè)服務(wù)進(jìn)程可以作為調(diào)度者,將任務(wù)分布到其他多個(gè)進(jìn)程中,依靠網(wǎng)絡(luò)通信。由于managers模塊封裝很好,不必了解網(wǎng)絡(luò)通信的細(xì)節(jié),就可以很容易地編寫(xiě)分布式多進(jìn)程程序。
舉個(gè)例子:如果我們已經(jīng)有一個(gè)通過(guò)Queue通信的多進(jìn)程程序在同一臺(tái)機(jī)器上運(yùn)行,現(xiàn)在,由于處理任務(wù)的進(jìn)程任務(wù)繁重,希望把發(fā)送任務(wù)的進(jìn)程和處理任務(wù)的進(jìn)程分布到兩臺(tái)機(jī)器上。怎么用分布式進(jìn)程實(shí)現(xiàn)?
原有的Queue可以繼續(xù)使用,但是,通過(guò)managers模塊把Queue通過(guò)網(wǎng)絡(luò)暴露出去,就可以讓其他機(jī)器的進(jìn)程訪問(wèn)Queue了。
我們先看服務(wù)進(jìn)程,服務(wù)進(jìn)程負(fù)責(zé)啟動(dòng)Queue,把Queue注冊(cè)到網(wǎng)絡(luò)上,然后往Queue里面寫(xiě)入任務(wù):
import random, time, queue
from multiprocessing.managers import BaseManager
# 發(fā)送任務(wù)的隊(duì)列:
task_queue = queue.Queue()
# 接收結(jié)果的隊(duì)列:
result_queue = queue.Queue()
# 從BaseManager繼承的QueueManager:
class QueueManager(BaseManager):
pass
# 把兩個(gè)Queue都注冊(cè)到網(wǎng)絡(luò)上, callable參數(shù)關(guān)聯(lián)了Queue對(duì)象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 綁定端口5000, 設(shè)置驗(yàn)證碼'abc':
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 啟動(dòng)Queue:
manager.start()
# 獲得通過(guò)網(wǎng)絡(luò)訪問(wèn)的Queue對(duì)象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放幾個(gè)任務(wù)進(jìn)去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 從result隊(duì)列讀取結(jié)果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=10)
print('Result: %s' % r)
# 關(guān)閉:
manager.shutdown()
print('master exit.')
當(dāng)我們?cè)谝慌_(tái)機(jī)器上寫(xiě)多進(jìn)程程序時(shí),創(chuàng)建的Queue可以直接拿來(lái)用,但是,在分布式多進(jìn)程環(huán)境下,添加任務(wù)到Queue不可以直接對(duì)原始的task_queue進(jìn)行操作,那樣就繞過(guò)了QueueManager的封裝,必須通過(guò)manager.get_task_queue()獲得的Queue接口添加。
然后,在另一臺(tái)機(jī)器上啟動(dòng)任務(wù)進(jìn)程(本機(jī)上啟動(dòng)也可以):
import time, sys, queue
from multiprocessing.managers import BaseManager
# 創(chuàng)建類似的QueueManager:
class QueueManager(BaseManager):
pass
# 由于這個(gè)QueueManager只從網(wǎng)絡(luò)上獲取Queue,所以注冊(cè)時(shí)只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 連接到服務(wù)器,也就是運(yùn)行task_master.py的機(jī)器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和驗(yàn)證碼注意保持與task_master.py設(shè)置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
# 從網(wǎng)絡(luò)連接:
m.connect()
# 獲取Queue的對(duì)象:
task = m.get_task_queue()
result = m.get_result_queue()
# 從task隊(duì)列取任務(wù),并把結(jié)果寫(xiě)入result隊(duì)列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 處理結(jié)束:
print('worker exit.')
任務(wù)進(jìn)程要通過(guò)網(wǎng)絡(luò)連接到服務(wù)進(jìn)程,所以要指定服務(wù)進(jìn)程的IP。http://www.jb51.net/article/65112.htm
小結(jié)
Python的分布式進(jìn)程接口簡(jiǎn)單,封裝良好,適合需要把繁重任務(wù)分布到多臺(tái)機(jī)器的環(huán)境下。
注意Queue的作用是用來(lái)傳遞任務(wù)和接收結(jié)果,每個(gè)任務(wù)的描述數(shù)據(jù)量要盡量小。比如發(fā)送一個(gè)處理日志文件的任務(wù),就不要發(fā)送幾百兆的日志文件本身,而是發(fā)送日志文件存放的完整路徑,由Worker進(jìn)程再去共享的磁盤(pán)上讀取文件。
數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼
若不方便掃碼,搜微信號(hào):CDAshujufenxi
LSTM 模型輸入長(zhǎng)度選擇技巧:提升序列建模效能的關(guān)鍵? 在循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)家族中,長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM)憑借其解決長(zhǎng)序列 ...
2025-07-11CDA 數(shù)據(jù)分析師報(bào)考條件詳解與準(zhǔn)備指南? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代浪潮下,CDA 數(shù)據(jù)分析師認(rèn)證愈發(fā)受到矚目,成為眾多有志投身數(shù) ...
2025-07-11數(shù)據(jù)透視表中兩列相乘合計(jì)的實(shí)用指南? 在數(shù)據(jù)分析的日常工作中,數(shù)據(jù)透視表憑借其強(qiáng)大的數(shù)據(jù)匯總和分析功能,成為了 Excel 用戶 ...
2025-07-11尊敬的考生: 您好! 我們誠(chéng)摯通知您,CDA Level I和 Level II考試大綱將于 2025年7月25日 實(shí)施重大更新。 此次更新旨在確保認(rèn) ...
2025-07-10BI 大數(shù)據(jù)分析師:連接數(shù)據(jù)與業(yè)務(wù)的價(jià)值轉(zhuǎn)化者? ? 在大數(shù)據(jù)與商業(yè)智能(Business Intelligence,簡(jiǎn)稱 BI)深度融合的時(shí)代,BI ...
2025-07-10SQL 在預(yù)測(cè)分析中的應(yīng)用:從數(shù)據(jù)查詢到趨勢(shì)預(yù)判? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代,預(yù)測(cè)分析作為挖掘數(shù)據(jù)潛在價(jià)值的核心手段,正被廣泛 ...
2025-07-10數(shù)據(jù)查詢結(jié)束后:分析師的收尾工作與價(jià)值深化? ? 在數(shù)據(jù)分析的全流程中,“query end”(查詢結(jié)束)并非工作的終點(diǎn),而是將數(shù) ...
2025-07-10CDA 數(shù)據(jù)分析師考試:從報(bào)考到取證的全攻略? 在數(shù)字經(jīng)濟(jì)蓬勃發(fā)展的今天,數(shù)據(jù)分析師已成為各行業(yè)爭(zhēng)搶的核心人才,而 CDA(Certi ...
2025-07-09【CDA干貨】單樣本趨勢(shì)性檢驗(yàn):捕捉數(shù)據(jù)背后的時(shí)間軌跡? 在數(shù)據(jù)分析的版圖中,單樣本趨勢(shì)性檢驗(yàn)如同一位耐心的偵探,專注于從單 ...
2025-07-09year_month數(shù)據(jù)類型:時(shí)間維度的精準(zhǔn)切片? ? 在數(shù)據(jù)的世界里,時(shí)間是最不可或缺的維度之一,而year_month數(shù)據(jù)類型就像一把精準(zhǔn) ...
2025-07-09CDA 備考干貨:Python 在數(shù)據(jù)分析中的核心應(yīng)用與實(shí)戰(zhàn)技巧? ? 在 CDA 數(shù)據(jù)分析師認(rèn)證考試中,Python 作為數(shù)據(jù)處理與分析的核心 ...
2025-07-08SPSS 中的 Mann-Kendall 檢驗(yàn):數(shù)據(jù)趨勢(shì)與突變分析的有力工具? ? ? 在數(shù)據(jù)分析的廣袤領(lǐng)域中,準(zhǔn)確捕捉數(shù)據(jù)的趨勢(shì)變化以及識(shí)別 ...
2025-07-08備戰(zhàn) CDA 數(shù)據(jù)分析師考試:需要多久?如何規(guī)劃? CDA(Certified Data Analyst)數(shù)據(jù)分析師認(rèn)證作為國(guó)內(nèi)權(quán)威的數(shù)據(jù)分析能力認(rèn)證 ...
2025-07-08LSTM 輸出不確定的成因、影響與應(yīng)對(duì)策略? 長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM)作為循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)的一種變體,憑借獨(dú)特的門(mén)控機(jī)制,在 ...
2025-07-07統(tǒng)計(jì)學(xué)方法在市場(chǎng)調(diào)研數(shù)據(jù)中的深度應(yīng)用? 市場(chǎng)調(diào)研是企業(yè)洞察市場(chǎng)動(dòng)態(tài)、了解消費(fèi)者需求的重要途徑,而統(tǒng)計(jì)學(xué)方法則是市場(chǎng)調(diào)研數(shù) ...
2025-07-07CDA數(shù)據(jù)分析師證書(shū)考試全攻略? 在數(shù)字化浪潮席卷全球的當(dāng)下,數(shù)據(jù)已成為企業(yè)決策、行業(yè)發(fā)展的核心驅(qū)動(dòng)力,數(shù)據(jù)分析師也因此成為 ...
2025-07-07剖析 CDA 數(shù)據(jù)分析師考試題型:解鎖高效備考與答題策略? CDA(Certified Data Analyst)數(shù)據(jù)分析師考試作為衡量數(shù)據(jù)專業(yè)能力的 ...
2025-07-04SQL Server 字符串截取轉(zhuǎn)日期:解鎖數(shù)據(jù)處理的關(guān)鍵技能? 在數(shù)據(jù)處理與分析工作中,數(shù)據(jù)格式的規(guī)范性是保證后續(xù)分析準(zhǔn)確性的基礎(chǔ) ...
2025-07-04CDA 數(shù)據(jù)分析師視角:從數(shù)據(jù)迷霧中探尋商業(yè)真相? 在數(shù)字化浪潮席卷全球的今天,數(shù)據(jù)已成為企業(yè)決策的核心驅(qū)動(dòng)力,CDA(Certifie ...
2025-07-04CDA 數(shù)據(jù)分析師:開(kāi)啟數(shù)據(jù)職業(yè)發(fā)展新征程? ? 在數(shù)據(jù)成為核心生產(chǎn)要素的今天,數(shù)據(jù)分析師的職業(yè)價(jià)值愈發(fā)凸顯。CDA(Certified D ...
2025-07-03