
深入解析Python中的線程同步方法
同步訪問共享資源
在使用線程的時(shí)候,一個(gè)很重要的問題是要避免多個(gè)線程對(duì)同一變量或其它資源的訪問沖突。一旦你稍不留神,重疊訪問、在多個(gè)線程中修改(共享資源)等這些操作會(huì)導(dǎo)致各種各樣的問題;更嚴(yán)重的是,這些問題一般只會(huì)在比較極端(比如高并發(fā)、生產(chǎn)服務(wù)器、甚至在性能更好的硬件設(shè)備上)的情況下才會(huì)出現(xiàn)。
比如有這樣一個(gè)情況:需要追蹤對(duì)一事件處理的次數(shù)
counter = 0
def process_item(item):
global counter
... do something with item ...
counter += 1
如果你在多個(gè)線程中同時(shí)調(diào)用這個(gè)函數(shù),你會(huì)發(fā)現(xiàn)counter的值不是那么準(zhǔn)確。在大多數(shù)情況下它是對(duì)的,但有時(shí)它會(huì)比實(shí)際的少幾個(gè)。
出現(xiàn)這種情況的原因是,計(jì)數(shù)增加操作實(shí)際上分三步執(zhí)行:
考慮一下這種情況:在當(dāng)前線程獲取到counter值后,另一個(gè)線程搶占到了CPU,然后同樣也獲取到了counter值,并進(jìn)一步將counter值重新計(jì)算并完成回寫;之后時(shí)間片重新輪到當(dāng)前線程(這里僅作標(biāo)識(shí)區(qū)分,并非實(shí)際當(dāng)前),此時(shí)當(dāng)前線程獲取到counter值還是原來的,完成后續(xù)兩步操作后counter的值實(shí)際只加上1。
另一種常見情況是訪問不完整或不一致狀態(tài)。這類情況主要發(fā)生在一個(gè)線程正在初始化或更新數(shù)據(jù)時(shí),另一個(gè)進(jìn)程卻嘗試讀取正在更改的數(shù)據(jù)。
原子操作
實(shí)現(xiàn)對(duì)共享變量或其它資源的同步訪問最簡(jiǎn)單的方法是依靠解釋器的原子操作。原子操作是在一步完成執(zhí)行的操作,在這一步中其它線程無法獲得該共享資源。
通常情況下,這種同步方法只對(duì)那些只由單個(gè)核心數(shù)據(jù)類型組成的共享資源有效,譬如,字符串變量、數(shù)字、列表或者字典等。下面是幾個(gè)線程安全的操作:
注意,上面提到過,對(duì)一個(gè)變量或者屬性進(jìn)行讀操作,然后修改它,最終將其回寫不是線程安全的。因?yàn)榱硗庖粋€(gè)線程會(huì)在這個(gè)線程讀完卻沒有修改或回寫完成之前更改這個(gè)共享變量/屬性。
鎖
鎖是Python的threading模塊提供的最基本的同步機(jī)制。在任一時(shí)刻,一個(gè)鎖對(duì)象可能被一個(gè)線程獲取,或者不被任何線程獲取。如果一個(gè)線程嘗試去獲取一個(gè)已經(jīng)被另一個(gè)線程獲取到的鎖對(duì)象,那么這個(gè)想要獲取鎖對(duì)象的線程只能暫時(shí)終止執(zhí)行直到鎖對(duì)象被另一個(gè)線程釋放掉。
鎖通常被用來實(shí)現(xiàn)對(duì)共享資源的同步訪問。為每一個(gè)共享資源創(chuàng)建一個(gè)Lock對(duì)象,當(dāng)你需要訪問該資源時(shí),調(diào)用acquire方法來獲取鎖對(duì)象(如果其它線程已經(jīng)獲得了該鎖,則當(dāng)前線程需等待其被釋放),待資源訪問完后,再調(diào)用release方法釋放鎖:
lock = Lock()
lock.acquire() #: will block if lock is already held
... access shared resource
lock.release()
注意,即使在訪問共享資源的過程中出錯(cuò)了也應(yīng)該釋放鎖,可以用try-finally來達(dá)到這一目的:
lock.acquire()
try:
... access shared resource
finally:
lock.release() #: release lock, no matter what
在Python 2.5及以后的版本中,你可以使用with語句。在使用鎖的時(shí)候,with語句會(huì)在進(jìn)入語句塊之前自動(dòng)的獲取到該鎖對(duì)象,然后在語句塊執(zhí)行完成后自動(dòng)釋放掉鎖:
from __future__ import with_statement #: 2.5 only
with lock:
... access shared resource
acquire方法帶一個(gè)可選的等待標(biāo)識(shí),它可用于設(shè)定當(dāng)有其它線程占有鎖時(shí)是否阻塞。如果你將其值設(shè)為False,那么acquire方法將不再阻塞,只是如果該鎖被占有時(shí)它會(huì)返回False:
if not lock.acquire(False):
... 鎖資源失敗
else:
try:
... access shared resource
finally:
lock.release()
你可以使用locked方法來檢查一個(gè)鎖對(duì)象是否已被獲取,注意不能用該方法來判斷調(diào)用acquire方法時(shí)是否會(huì)阻塞,因?yàn)樵趌ocked方法調(diào)用完成到下一條語句(比如acquire)執(zhí)行之間該鎖有可能被其它線程占有。
if not lock.locked():
#: 其它線程可能在下一條語句執(zhí)行之前占有了該鎖
lock.acquire() #: 可能會(huì)阻塞
簡(jiǎn)單鎖的缺點(diǎn)
標(biāo)準(zhǔn)的鎖對(duì)象并不關(guān)心當(dāng)前是哪個(gè)線程占有了該鎖;如果該鎖已經(jīng)被占有了,那么任何其它嘗試獲取該鎖的線程都會(huì)被阻塞,即使是占有鎖的這個(gè)線程。考慮一下下面這個(gè)例子:
lock = threading.Lock()
def get_first_part():
lock.acquire()
try:
... 從共享對(duì)象中獲取第一部分?jǐn)?shù)據(jù)
finally:
lock.release()
return data
def get_second_part():
lock.acquire()
try:
... 從共享對(duì)象中獲取第二部分?jǐn)?shù)據(jù)
finally:
lock.release()
return data
示例中,我們有一個(gè)共享資源,有兩個(gè)分別取這個(gè)共享資源第一部分和第二部分的函數(shù)。兩個(gè)訪問函數(shù)都使用了鎖來確保在獲取數(shù)據(jù)時(shí)沒有其它線程修改對(duì)應(yīng)的共享數(shù)據(jù)。
現(xiàn)在,如果我們想添加第三個(gè)函數(shù)來獲取兩個(gè)部分的數(shù)據(jù),我們將會(huì)陷入泥潭。一個(gè)簡(jiǎn)單的方法是依次調(diào)用這兩個(gè)函數(shù),然后返回結(jié)合的結(jié)果:
def get_both_parts():
first = get_first_part()
seconde = get_second_part()
return first, second
這里的問題是,如有某個(gè)線程在兩個(gè)函數(shù)調(diào)用之間修改了共享資源,那么我們最終會(huì)得到不一致的數(shù)據(jù)。最明顯的解決方法是在這個(gè)函數(shù)中也使用lock:
def get_both_parts():
lock.acquire()
try:
first = get_first_part()
seconde = get_second_part()
finally:
lock.release()
return first, second
然而,這是不可行的。里面的兩個(gè)訪問函數(shù)將會(huì)阻塞,因?yàn)橥鈱诱Z句已經(jīng)占有了該鎖。為了解決這個(gè)問題,你可以通過使用標(biāo)記在訪問函數(shù)中讓外層語句釋放鎖,但這樣容易失去控制并導(dǎo)致出錯(cuò)。幸運(yùn)的是,threading模塊包含了一個(gè)更加實(shí)用的鎖實(shí)現(xiàn):re-entrant鎖。
Re-Entrant Locks (RLock)
RLock類是簡(jiǎn)單鎖的另一個(gè)版本,它的特點(diǎn)在于,同一個(gè)鎖對(duì)象只有在被其它的線程占有時(shí)嘗試獲取才會(huì)發(fā)生阻塞;而簡(jiǎn)單鎖在同一個(gè)線程中同時(shí)只能被占有一次。如果當(dāng)前線程已經(jīng)占有了某個(gè)RLock鎖對(duì)象,那么當(dāng)前線程仍能再次獲取到該RLock鎖對(duì)象。
lock = threading.Lock()
lock.acquire()
lock.acquire() #: 這里將會(huì)阻塞
lock = threading.RLock()
lock.acquire()
lock.acquire() #: 這里不會(huì)發(fā)生阻塞
RLock的主要作用是解決嵌套訪問共享資源的問題,就像前面描述的示例。要想解決前面示例中的問題,我們只需要將Lock換為RLock對(duì)象,這樣嵌套調(diào)用也會(huì)OK.
lock = threading.RLock()
def get_first_part():
... see above
def get_second_part():
... see above
def get_both_parts():
... see above
這樣既可以單獨(dú)訪問兩部分?jǐn)?shù)據(jù)也可以一次訪問兩部分?jǐn)?shù)據(jù)而不會(huì)被鎖阻塞或者獲得不一致的數(shù)據(jù)。
注意RLock會(huì)追蹤遞歸層級(jí),因此記得在acquire后進(jìn)行release操作。
Semaphores
信號(hào)量是一個(gè)更高級(jí)的鎖機(jī)制。信號(hào)量?jī)?nèi)部有一個(gè)計(jì)數(shù)器而不像鎖對(duì)象內(nèi)部有鎖標(biāo)識(shí),而且只有當(dāng)占用信號(hào)量的線程數(shù)超過信號(hào)量時(shí)線程才阻塞。這允許了多個(gè)線程可以同時(shí)訪問相同的代碼區(qū)。
semaphore = threading.BoundedSemaphore()
semaphore.acquire() #: counter減小
... 訪問共享資源
semaphore.release() #: counter增大
當(dāng)信號(hào)量被獲取的時(shí)候,計(jì)數(shù)器減??;當(dāng)信號(hào)量被釋放的時(shí)候,計(jì)數(shù)器增大。當(dāng)獲取信號(hào)量的時(shí)候,如果計(jì)數(shù)器值為0,則該進(jìn)程將阻塞。當(dāng)某一信號(hào)量被釋放,counter值增加為1時(shí),被阻塞的線程(如果有的話)中會(huì)有一個(gè)得以繼續(xù)運(yùn)行。
信號(hào)量通常被用來限制對(duì)容量有限的資源的訪問,比如一個(gè)網(wǎng)絡(luò)連接或者數(shù)據(jù)庫服務(wù)器。在這類場(chǎng)景中,只需要將計(jì)數(shù)器初始化為最大值,信號(hào)量的實(shí)現(xiàn)將為你完成剩下的事情。
max_connections = 10
semaphore = threading.BoundedSemaphore(max_connections)
如果你不傳任何初始化參數(shù),計(jì)數(shù)器的值會(huì)被初始化為1.
Python的threading模塊提供了兩種信號(hào)量實(shí)現(xiàn)。Semaphore類提供了一個(gè)無限大小的信號(hào)量,你可以調(diào)用release任意次來增大計(jì)數(shù)器的值。為了避免錯(cuò)誤出現(xiàn),最好使用BoundedSemaphore類,這樣當(dāng)你調(diào)用release的次數(shù)大于acquire次數(shù)時(shí)程序會(huì)出錯(cuò)提醒。
線程同步
鎖可以用在線程間的同步上。threading模塊包含了一些用于線程間同步的類。
Events
一個(gè)事件是一個(gè)簡(jiǎn)單的同步對(duì)象,事件表示為一個(gè)內(nèi)部標(biāo)識(shí)(internal flag),線程等待這個(gè)標(biāo)識(shí)被其它線程設(shè)定,或者自己設(shè)定、清除這個(gè)標(biāo)識(shí)。
event = threading.Event()
#: 一個(gè)客戶端線程等待flag被設(shè)定
event.wait()
#: 服務(wù)端線程設(shè)置或者清除flag
event.set()
event.clear()
一旦標(biāo)識(shí)被設(shè)定,wait方法就不做任何處理(不會(huì)阻塞),當(dāng)標(biāo)識(shí)被清除時(shí),wait將被阻塞直至其被重新設(shè)定。任意數(shù)量的線程可能會(huì)等待同一個(gè)事件。
Conditions
條件是事件對(duì)象的高級(jí)版本。條件表現(xiàn)為程序中的某種狀態(tài)改變,線程可以等待給定條件或者條件發(fā)生的信號(hào)。
下面是一個(gè)簡(jiǎn)單的生產(chǎn)者/消費(fèi)者實(shí)例。首先你需要?jiǎng)?chuàng)建一個(gè)條件對(duì)象:
#: 表示一個(gè)資源的附屬項(xiàng)
condition = threading.Condition()
生產(chǎn)者線程在通知消費(fèi)者線程有新生成資源之前需要獲得條件:
#: 生產(chǎn)者線程
... 生產(chǎn)資源項(xiàng)
condition.acquire()
... 將資源項(xiàng)添加到資源中
condition.notify() #: 發(fā)出有可用資源的信號(hào)
condition.release()
消費(fèi)者必須獲取條件(以及相關(guān)聯(lián)的鎖),然后嘗試從資源中獲取資源項(xiàng):
#: 消費(fèi)者線程
condition.acquire()
while True:
...從資源中獲取資源項(xiàng)
if item:
break
condition.wait() #: 休眠,直至有新的資源
condition.release()
... 處理資源
wait方法釋放了鎖,然后將當(dāng)前線程阻塞,直到有其它線程調(diào)用了同一條件對(duì)象的notify或者notifyAll方法,然后又重新拿到鎖。如果同時(shí)有多個(gè)線程在等待,那么notify方法只會(huì)喚醒其中的一個(gè)線程,而notifyAll則會(huì)喚醒全部線程。
為了避免在wait方法處阻塞,你可以傳入一個(gè)超時(shí)參數(shù),一個(gè)以秒為單位的浮點(diǎn)數(shù)。如果設(shè)置了超時(shí)參數(shù),wait將會(huì)在指定時(shí)間返回,即使notify沒被調(diào)用。一旦使用了超時(shí),你必須檢查資源來確定發(fā)生了什么。
注意,條件對(duì)象關(guān)聯(lián)著一個(gè)鎖,你必須在訪問條件之前獲取這個(gè)鎖;同樣的,你必須在完成對(duì)條件的訪問時(shí)釋放這個(gè)鎖。在生產(chǎn)代碼中,你應(yīng)該使用try-finally或者with.
可以通過將鎖對(duì)象作為條件構(gòu)造函數(shù)的參數(shù)來讓條件關(guān)聯(lián)一個(gè)已經(jīng)存在的鎖,這可以實(shí)現(xiàn)多個(gè)條件公用一個(gè)資源:
lock = threading.RLock()
condition_1 = threading.Condition(lock)
condition_2 = threading.Condition(lock)
互斥鎖同步
我們先來看一個(gè)例子:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import time, threading
# 假定這是你的銀行存款:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 先存后取,結(jié)果應(yīng)該為0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
# 循環(huán)次數(shù)一旦多起來,最后的數(shù)字就變成非0
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t3 = threading.Thread(target=run_thread, args=(9,))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print balance
結(jié)果 :
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
61
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
24
上面的例子引出了多線程編程的最常見問題:數(shù)據(jù)共享。當(dāng)多個(gè)線程都修改某一個(gè)共享數(shù)據(jù)的時(shí)候,需要進(jìn)行同步控制。
線程同步能夠保證多個(gè)線程安全訪問競(jìng)爭(zhēng)資源,最簡(jiǎn)單的同步機(jī)制是引入互斥鎖?;コ怄i為資源引入一個(gè)狀態(tài):鎖定/非鎖定。某個(gè)線程要更改共享數(shù)據(jù)時(shí),先將其鎖定,此時(shí)資源的狀態(tài)為“鎖定”,其他線程不能更改;直到該線程釋放資源,將資源的狀態(tài)變成“非鎖定”,其他的線程才能再次鎖定該資源?;コ怄i保證了每次只有一個(gè)線程進(jìn)行寫入操作,從而保證了多線程情況下數(shù)據(jù)的正確性。
threading模塊中定義了Lock類,可以方便的處理鎖定:
#創(chuàng)建鎖mutex = threading.Lock()
#鎖定mutex.acquire([timeout])
#釋放mutex.release()
其中,鎖定方法acquire可以有一個(gè)超時(shí)時(shí)間的可選參數(shù)timeout。如果設(shè)定了timeout,則在超時(shí)后通過返回值可以判斷是否得到了鎖,從而可以進(jìn)行一些其他的處理。
使用互斥鎖實(shí)現(xiàn)上面的例子的代碼如下:
balance = 0
muxlock = threading.Lock()
def change_it(n):
# 獲取鎖,確保只有一個(gè)線程操作這個(gè)數(shù)
muxlock.acquire()
global balance
balance = balance + n
balance = balance - n
# 釋放鎖,給其他被阻塞的線程繼續(xù)操作
muxlock.release()
def run_thread(n):
for i in range(10000):
change_it(n)
加鎖后的結(jié)果,就能確保數(shù)據(jù)正確:
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
[/data/web/test_python]$ python multhread_threading.py
0
數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼
若不方便掃碼,搜微信號(hào):CDAshujufenxi
訓(xùn)練與驗(yàn)證損失驟升:機(jī)器學(xué)習(xí)訓(xùn)練中的異常診斷與解決方案 在機(jī)器學(xué)習(xí)模型訓(xùn)練過程中,“損失曲線” 是反映模型學(xué)習(xí)狀態(tài)的核心指 ...
2025-09-19解析 DataHub 與 Kafka:數(shù)據(jù)生態(tài)中兩類核心工具的差異與協(xié)同 在數(shù)字化轉(zhuǎn)型加速的今天,企業(yè)對(duì)數(shù)據(jù)的需求已從 “存儲(chǔ)” 轉(zhuǎn)向 “ ...
2025-09-19CDA 數(shù)據(jù)分析師:讓統(tǒng)計(jì)基本概念成為業(yè)務(wù)決策的底層邏輯 統(tǒng)計(jì)基本概念是商業(yè)數(shù)據(jù)分析的 “基礎(chǔ)語言”—— 從描述數(shù)據(jù)分布的 “均 ...
2025-09-19CDA 數(shù)據(jù)分析師:表結(jié)構(gòu)數(shù)據(jù) “獲取 - 加工 - 使用” 全流程的賦能者 表結(jié)構(gòu)數(shù)據(jù)(如數(shù)據(jù)庫表、Excel 表、CSV 文件)是企業(yè)數(shù)字 ...
2025-09-19SQL Server 中 CONVERT 函數(shù)的日期轉(zhuǎn)換:從基礎(chǔ)用法到實(shí)戰(zhàn)優(yōu)化 在 SQL Server 的數(shù)據(jù)處理中,日期格式轉(zhuǎn)換是高頻需求 —— 無論 ...
2025-09-18MySQL 大表拆分與關(guān)聯(lián)查詢效率:打破 “拆分必慢” 的認(rèn)知誤區(qū) 在 MySQL 數(shù)據(jù)庫管理中,“大表” 始終是性能優(yōu)化繞不開的話題。 ...
2025-09-18DSGE 模型中的 Et:理性預(yù)期算子的內(nèi)涵、作用與應(yīng)用解析 動(dòng)態(tài)隨機(jī)一般均衡(Dynamic Stochastic General Equilibrium, DSGE)模 ...
2025-09-17Python 提取 TIF 中地名的完整指南 一、先明確:TIF 中的地名有哪兩種存在形式? 在開始提取前,需先判斷 TIF 文件的類型 —— ...
2025-09-17CDA 數(shù)據(jù)分析師:解鎖表結(jié)構(gòu)數(shù)據(jù)特征價(jià)值的專業(yè)核心 表結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 規(guī)范存儲(chǔ)的結(jié)構(gòu)化數(shù)據(jù),如數(shù)據(jù)庫表、Excel 表、 ...
2025-09-17Excel 導(dǎo)入數(shù)據(jù)含缺失值?詳解 dropna 函數(shù)的功能與實(shí)戰(zhàn)應(yīng)用 在用 Python(如 pandas 庫)處理 Excel 數(shù)據(jù)時(shí),“缺失值” 是高頻 ...
2025-09-16深入解析卡方檢驗(yàn)與 t 檢驗(yàn):差異、適用場(chǎng)景與實(shí)踐應(yīng)用 在數(shù)據(jù)分析與統(tǒng)計(jì)學(xué)領(lǐng)域,假設(shè)檢驗(yàn)是驗(yàn)證研究假設(shè)、判斷數(shù)據(jù)差異是否 “ ...
2025-09-16CDA 數(shù)據(jù)分析師:掌控表格結(jié)構(gòu)數(shù)據(jù)全功能周期的專業(yè)操盤手 表格結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 存儲(chǔ)的結(jié)構(gòu)化數(shù)據(jù),如 Excel 表、數(shù)據(jù) ...
2025-09-16MySQL 執(zhí)行計(jì)劃中 rows 數(shù)量的準(zhǔn)確性解析:原理、影響因素與優(yōu)化 在 MySQL SQL 調(diào)優(yōu)中,EXPLAIN執(zhí)行計(jì)劃是核心工具,而其中的row ...
2025-09-15解析 Python 中 Response 對(duì)象的 text 與 content:區(qū)別、場(chǎng)景與實(shí)踐指南 在 Python 進(jìn)行 HTTP 網(wǎng)絡(luò)請(qǐng)求開發(fā)時(shí)(如使用requests ...
2025-09-15CDA 數(shù)據(jù)分析師:激活表格結(jié)構(gòu)數(shù)據(jù)價(jià)值的核心操盤手 表格結(jié)構(gòu)數(shù)據(jù)(如 Excel 表格、數(shù)據(jù)庫表)是企業(yè)最基礎(chǔ)、最核心的數(shù)據(jù)形態(tài) ...
2025-09-15Python HTTP 請(qǐng)求工具對(duì)比:urllib.request 與 requests 的核心差異與選擇指南 在 Python 處理 HTTP 請(qǐng)求(如接口調(diào)用、數(shù)據(jù)爬取 ...
2025-09-12解決 pd.read_csv 讀取長(zhǎng)浮點(diǎn)數(shù)據(jù)的科學(xué)計(jì)數(shù)法問題 為幫助 Python 數(shù)據(jù)從業(yè)者解決pd.read_csv讀取長(zhǎng)浮點(diǎn)數(shù)據(jù)時(shí)的科學(xué)計(jì)數(shù)法問題 ...
2025-09-12CDA 數(shù)據(jù)分析師:業(yè)務(wù)數(shù)據(jù)分析步驟的落地者與價(jià)值優(yōu)化者 業(yè)務(wù)數(shù)據(jù)分析是企業(yè)解決日常運(yùn)營(yíng)問題、提升執(zhí)行效率的核心手段,其價(jià)值 ...
2025-09-12用 SQL 驗(yàn)證業(yè)務(wù)邏輯:從規(guī)則拆解到數(shù)據(jù)把關(guān)的實(shí)戰(zhàn)指南 在業(yè)務(wù)系統(tǒng)落地過程中,“業(yè)務(wù)邏輯” 是連接 “需求設(shè)計(jì)” 與 “用戶體驗(yàn) ...
2025-09-11塔吉特百貨孕婦營(yíng)銷案例:數(shù)據(jù)驅(qū)動(dòng)下的精準(zhǔn)零售革命與啟示 在零售行業(yè) “流量紅利見頂” 的當(dāng)下,精準(zhǔn)營(yíng)銷成為企業(yè)突圍的核心方 ...
2025-09-11