
用Python多線程實現(xiàn)生產(chǎn)者消費者模式
什么是生產(chǎn)者消費者模式
在軟件開發(fā)的過程中,經(jīng)常碰到這樣的場景:
某些模塊負責生產(chǎn)數(shù)據(jù),這些數(shù)據(jù)由其他模塊來負責處理(此處的模塊可能是:函數(shù)、線程、進程等)。產(chǎn)生數(shù)據(jù)的模塊稱為生產(chǎn)者,而處理數(shù)據(jù)的模塊稱為消費者。在生產(chǎn)者與消費者之間的緩沖區(qū)稱之為倉庫。生產(chǎn)者負責往倉庫運輸商品,而消費者負責從倉庫里取出商品,這就構(gòu)成了生產(chǎn)者消費者模式。
結(jié)構(gòu)圖如下:
為了大家容易理解,我們舉一個寄信的例子。假設(shè)你要寄一封信,大致過程如下:
你把信寫好——相當于生產(chǎn)者生產(chǎn)數(shù)據(jù)
你把信放入郵箱——相當于生產(chǎn)者把數(shù)據(jù)放入緩沖區(qū)
郵遞員把信從郵箱取出,做相應(yīng)處理——相當于消費者把數(shù)據(jù)取出緩沖區(qū),處理數(shù)據(jù)
生產(chǎn)者消費者模式的優(yōu)點
解耦
假設(shè)生產(chǎn)者和消費者分別是兩個線程。如果讓生產(chǎn)者直接調(diào)用消費者的某個方法,那么生產(chǎn)者對于消費者就會產(chǎn)生依賴(也就是耦合)。如果未來消費者的代碼發(fā)生變化,可能會影響到生產(chǎn)者的代碼。而如果兩者都依賴于某個緩沖區(qū),兩者之間不直接依賴,耦合也就相應(yīng)降低了。
舉個例子,我們?nèi)ム]局投遞信件,如果不使用郵箱(也就是緩沖區(qū)),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他。這就產(chǎn)生了你和郵遞員之間的依賴(相當于生產(chǎn)者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要重新認識一下(相當于消費者變化導致修改生產(chǎn)者代碼)。而郵箱相對來說比較固定,你依賴它的成本就比較低(相當于和緩沖區(qū)之間的弱耦合)。
并發(fā)
由于生產(chǎn)者與消費者是兩個獨立的并發(fā)體,他們之間是用緩沖區(qū)通信的,生產(chǎn)者只需要往緩沖區(qū)里丟數(shù)據(jù),就可以繼續(xù)生產(chǎn)下一個數(shù)據(jù),而消費者只需要從緩沖區(qū)拿數(shù)據(jù)即可,這樣就不會因為彼此的處理速度而發(fā)生阻塞。
繼續(xù)上面的例子,如果我們不使用郵箱,就得在郵局等郵遞員,直到他回來,把信件交給他,這期間我們啥事兒都不能干(也就是生產(chǎn)者阻塞)。或者郵遞員得挨家挨戶問,誰要寄信(相當于消費者輪詢)。
支持忙閑不均
當生產(chǎn)者制造數(shù)據(jù)快的時候,消費者來不及處理,未處理的數(shù)據(jù)可以暫時存在緩沖區(qū)中,慢慢處理掉。而不至于因為消費者的性能造成數(shù)據(jù)丟失或影響生產(chǎn)者生產(chǎn)。
我們再拿寄信的例子,假設(shè)郵遞員一次只能帶走1000封信,萬一碰上情人節(jié)(或是圣誕節(jié))送賀卡,需要寄出去的信超過了1000封,這時候郵箱這個緩沖區(qū)就派上用場了。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走。
通過上面的介紹大家應(yīng)該已經(jīng)明白了生產(chǎn)者消費者模式。
Python中的多線程編程
在實現(xiàn)生產(chǎn)者消費者模式之前,我們先學習下Python中的多線程編程。
線程是操作系統(tǒng)直接支持的執(zhí)行單元,高級語言通常都內(nèi)置多線程的支持,Python也不例外,并且Python的線程是真正的Posix Thread,而不是模擬出來的線程。
Python的標準庫提供了兩個模塊:_thread和threading,_thread是低級模塊,threading是高級模塊,對_thread進行了封裝。絕大多數(shù)情況下,我們只需要使用threading這個高級模塊。
下面我們先看一段在Python中實現(xiàn)多線程的代碼。
import time,threading
#線程代碼
class TaskThread(threading.Thread):
def __init__(self,name):
threading.Thread.__init__(self,name=name)
def run(self):
print('thread %s is running...' % self.getName())
for i in range(6):
print('thread %s >>> %s' % (self.getName(), i))
time.sleep(1)
print('thread %s finished.' % self.getName())
taskthread = TaskThread('TaskThread')
taskthread.start()
taskthread.join()
下面是程序的執(zhí)行結(jié)果:
thread TaskThread is running...
thread TaskThread >>> 0
thread TaskThread >>> 1
thread TaskThread >>> 2
thread TaskThread >>> 3
thread TaskThread >>> 4
thread TaskThread >>> 5
thread TaskThread finished.
TaskThread類繼承自threading模塊中的Thread線程類。構(gòu)造函數(shù)的name參數(shù)指定線程的名字,通過重載基類run函數(shù)實現(xiàn)具體任務(wù)。
在簡單熟悉了Python的線程后,下面我們實現(xiàn)一個生產(chǎn)者消費者模式。
from Queue import Queue
import random,threading,time
#生產(chǎn)者類
class Producer(threading.Thread):
def __init__(self, name,queue):
threading.Thread.__init__(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getName(), i))
self.data.put(i)
time.sleep(random.randrange(10)/5)
print("%s finished!" % self.getName())
#消費者類
class Consumer(threading.Thread):
def __init__(self,name,queue):
threading.Thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("%s is consuming. %d in the queue is consumed!" % (self.getName(),val))
time.sleep(random.randrange(10))
print("%s finished!" % self.getName())
def main():
queue = Queue()
producer = Producer('Producer',queue)
consumer = Consumer('Consumer',queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'All threads finished!'
if __name__ == '__main__':
main()
執(zhí)行結(jié)果可能如下:
Producer is producing 0 to the queue!
Consumer is consuming. 0 in the queue is consumed!
Producer is producing 1 to the queue!
Producer is producing 2 to the queue!
Consumer is consuming. 1 in the queue is consumed!
Consumer is consuming. 2 in the queue is consumed!
Producer is producing 3 to the queue!
Producer is producing 4 to the queue!
Producer finished!
Consumer is consuming. 3 in the queue is consumed!
Consumer is consuming. 4 in the queue is consumed!
Consumer finished!
All threads finished!
因為多線程是搶占式執(zhí)行的,所以打印出的運行結(jié)果不一定和上面的完全一致。
小結(jié)
本例通過Python實現(xiàn)了一個簡單的生產(chǎn)者消費者模型。Python中的Queue模塊已經(jīng)提供了對線程同步的支持,所以本文并沒有涉及鎖、同步、死鎖等多線程問題。
數(shù)據(jù)分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
CDA 數(shù)據(jù)分析師報考條件詳解與準備指南? ? 在數(shù)據(jù)驅(qū)動決策的時代浪潮下,CDA 數(shù)據(jù)分析師認證愈發(fā)受到矚目,成為眾多有志投身數(shù) ...
2025-07-18剛?cè)肼殘龌蚴窃诼殘稣媾R崗位替代、技能更新、人機協(xié)作等焦慮的打工人,想要找到一條破解職場焦慮和升職瓶頸的系統(tǒng)化學習提升 ...
2025-07-182025被稱為“AI元年”,而AI,與數(shù)據(jù)密不可分。網(wǎng)易公司創(chuàng)始人丁磊在《AI思維:從數(shù)據(jù)中創(chuàng)造價值的煉金術(shù) ...
2025-07-18CDA 數(shù)據(jù)分析師:數(shù)據(jù)時代的價值挖掘者 在大數(shù)據(jù)席卷全球的今天,數(shù)據(jù)已成為企業(yè)核心競爭力的重要組成部分。從海量數(shù)據(jù)中提取有 ...
2025-07-18SPSS 賦值后數(shù)據(jù)不顯示?原因排查與解決指南? 在 SPSS( Statistical Package for the Social Sciences)數(shù)據(jù)分析過程中,變量 ...
2025-07-18在 DBeaver 中利用 MySQL 實現(xiàn)表數(shù)據(jù)同步操作指南? ? 在數(shù)據(jù)庫管理工作中,將一張表的數(shù)據(jù)同步到另一張表是常見需求,這有助于 ...
2025-07-18數(shù)據(jù)分析師的技能圖譜:從數(shù)據(jù)到價值的橋梁? 在數(shù)據(jù)驅(qū)動決策的時代,數(shù)據(jù)分析師如同 “數(shù)據(jù)翻譯官”,將冰冷的數(shù)字轉(zhuǎn)化為清晰的 ...
2025-07-17Pandas 寫入指定行數(shù)據(jù):數(shù)據(jù)精細化管理的核心技能? 在數(shù)據(jù)處理的日常工作中,我們常常需要面對這樣的場景:在龐大的數(shù)據(jù)集里精 ...
2025-07-17解碼 CDA:數(shù)據(jù)時代的通行證? 在數(shù)字化浪潮席卷全球的今天,當企業(yè)決策者盯著屏幕上跳動的數(shù)據(jù)曲線尋找增長密碼,當科研人員在 ...
2025-07-17CDA 精益業(yè)務(wù)數(shù)據(jù)分析:數(shù)據(jù)驅(qū)動業(yè)務(wù)增長的實戰(zhàn)方法論 在企業(yè)數(shù)字化轉(zhuǎn)型的浪潮中,“數(shù)據(jù)分析” 已從 “加分項” 成為 “必修課 ...
2025-07-16MySQL 中 ADD KEY 與 ADD INDEX 詳解:用法、差異與優(yōu)化實踐 在 MySQL 數(shù)據(jù)庫表結(jié)構(gòu)設(shè)計中,索引是提升查詢性能的核心手段。無論 ...
2025-07-16解析 MySQL Update 語句中 “query end” 狀態(tài):含義、成因與優(yōu)化指南? 在 MySQL 數(shù)據(jù)庫的日常運維與開發(fā)中,開發(fā)者和 DBA 常會 ...
2025-07-16如何考取數(shù)據(jù)分析師證書:以 CDA 為例? ? 在數(shù)字化浪潮席卷各行各業(yè)的當下,數(shù)據(jù)分析師已然成為企業(yè)挖掘數(shù)據(jù)價值、驅(qū)動決策的 ...
2025-07-15CDA 精益業(yè)務(wù)數(shù)據(jù)分析:驅(qū)動企業(yè)高效決策的核心引擎? 在數(shù)字經(jīng)濟時代,企業(yè)面臨著前所未有的數(shù)據(jù)洪流,如何從海量數(shù)據(jù)中提取有 ...
2025-07-15MySQL 無外鍵關(guān)聯(lián)表的 JOIN 實戰(zhàn):數(shù)據(jù)整合的靈活之道? 在 MySQL 數(shù)據(jù)庫的日常操作中,我們經(jīng)常會遇到需要整合多張表數(shù)據(jù)的場景 ...
2025-07-15Python Pandas:數(shù)據(jù)科學的瑞士軍刀? ? 在數(shù)據(jù)驅(qū)動的時代,面對海量、復雜的數(shù)據(jù),如何高效地進行處理、分析和挖掘成為關(guān)鍵。 ...
2025-07-15用 SQL 生成逆向回滾 SQL:數(shù)據(jù)操作的 “后悔藥” 指南? 在數(shù)據(jù)庫操作中,誤刪數(shù)據(jù)、錯改字段或誤執(zhí)行批量更新等問題時有發(fā)生。 ...
2025-07-14t檢驗與Wilcoxon檢驗的選擇:何時用t.test,何時用wilcox.test? t 檢驗與 Wilcoxon 檢驗的選擇:何時用 t.test,何時用 wilcox. ...
2025-07-14AI 浪潮下的生存與進階: CDA數(shù)據(jù)分析師—開啟新時代職業(yè)生涯的鑰匙(深度研究報告、發(fā)展指導白皮書) 發(fā)布機構(gòu):CDA數(shù)據(jù)科 ...
2025-07-13LSTM 模型輸入長度選擇技巧:提升序列建模效能的關(guān)鍵? 在循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)家族中,長短期記憶網(wǎng)絡(luò)(LSTM)憑借其解決長序列 ...
2025-07-11