
Python多進程multiprocessing用法實例分析
本文實例講述了Python多進程multiprocessing用法。分享給大家供大家參考,具體如下:
mutilprocess簡介
像線程一樣管理進程,這個是mutilprocess的核心,他與threading很是相像,對多核CPU的利用率會比threading好的多。
簡單的創(chuàng)建進程:
import multiprocessing
def worker(num):
"""thread worker function"""
print 'Worker:', num
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
確定當前的進程,即是給進程命名,方便標識區(qū)分,跟蹤
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(2)
print name, 'Exiting'
def my_service():
name = multiprocessing.current_process().name
print name, 'Starting'
time.sleep(3)
print name, 'Exiting'
if __name__ == '__main__':
service = multiprocessing.Process(name='my_service',
target=my_service)
worker_1 = multiprocessing.Process(name='worker 1',
target=worker)
worker_2 = multiprocessing.Process(target=worker) # default name
worker_1.start()
worker_2.start()
service.start()
守護進程就是不阻擋主程序退出,自己干自己的 mutilprocess.setDaemon(True)就這句等待守護進程退出,要加上join,join可以傳入浮點數(shù)值,等待n久就不等了
守護進程:
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
time.sleep(2)
print 'Exiting :', name
def non_daemon():
name = multiprocessing.current_process().name
print 'Starting:', name
print 'Exiting :', name
if __name__ == '__main__':
d = multiprocessing.Process(name='daemon',
target=daemon)
d.daemon = True
n = multiprocessing.Process(name='non-daemon',
target=non_daemon)
n.daemon = False
d.start()
n.start()
d.join(1)
print 'd.is_alive()', d.is_alive()
n.join()
最好使用 poison pill,強制的使用terminate()注意 terminate之后要join,使其可以更新狀態(tài)
終止進程:
import multiprocessing
import time
def slow_worker():
print 'Starting worker'
time.sleep(0.1)
print 'Finished worker'
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print 'BEFORE:', p, p.is_alive()
p.start()
print 'DURING:', p, p.is_alive()
p.terminate()
print 'TERMINATED:', p, p.is_alive()
p.join()
print 'JOINED:', p, p.is_alive()
①. == 0 未生成任何錯誤
②. 0 進程有一個錯誤,并以該錯誤碼退出
③. < 0 進程由一個-1 * exitcode信號結(jié)束
進程的退出狀態(tài):
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('There was an error!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
for f in [exit_error, exit_ok, return_value, raises, terminated]:
print 'Starting process for', f.func_name
j = multiprocessing.Process(target=f, name=f.func_name)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print '%15s.exitcode = %s' % (j.name, j.exitcode)
方便的調(diào)試,可以用logging
日志:
import multiprocessing
import logging
import sys
def worker():
print 'Doing some work'
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
利用class來創(chuàng)建進程,定制子類
派生進程:
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print 'In %s' % self.name
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
python進程間傳遞消息:
import multiprocessing
class MyFancyClass(object):
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print 'Doing something fancy in %s for %s!' % \
(proc_name, self.name)
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Poison pill means shutdown
print '%s: Exiting' % proc_name
self.task_queue.task_done()
break
print '%s: %s' % (proc_name, next_task)
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
# Establish communication queues
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Start consumers
num_consumers = multiprocessing.cpu_count() * 2
print 'Creating %d consumers' % num_consumers
consumers = [ Consumer(tasks, results)
for i in xrange(num_consumers) ]
for w in consumers:
w.start()
# Enqueue jobs
num_jobs = 10
for i in xrange(num_jobs):
tasks.put(Task(i, i))
# Add a poison pill for each consumer
for i in xrange(num_consumers):
tasks.put(None)
# Wait for all of the tasks to finish
tasks.join()
# Start printing results
while num_jobs:
result = results.get()
print 'Result:', result
num_jobs -= 1
Event提供一種簡單的方法,可以在進程間傳遞狀態(tài)信息。事件可以切換設(shè)置和未設(shè)置狀態(tài)。通過使用一個可選的超時值,時間對象的用戶可以等待其狀態(tài)從未設(shè)置變?yōu)樵O(shè)置。
進程間信號傳遞:
import multiprocessing
import time
def wait_for_event(e):
"""Wait for the event to be set before doing anything"""
print 'wait_for_event: starting'
e.wait()
print 'wait_for_event: e.is_set()->', e.is_set()
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
print 'wait_for_event_timeout: starting'
e.wait(t)
print 'wait_for_event_timeout: e.is_set()->', e.is_set()
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(name='block',
target=wait_for_event,
args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock',
target=wait_for_event_timeout,
args=(e, 2))
w2.start()
print 'main: waiting before calling Event.set()'
time.sleep(3)
e.set()
print 'main: event is set'
Python多進程,一般的情況是Queue來傳遞。
Queue:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print q.get() # prints "[42, None, 'hello']"
p.join()
多線程優(yōu)先隊列Queue:
import Queue
import threading
import time
exitFlag = 0
class myThread (threading.Thread):
def __init__(self, threadID, name, q):
threading.Thread.__init__(self)
self.threadID = threadID
self.name = name
self.q = q
def run(self):
print "Starting " + self.name
process_data(self.name, self.q)
print "Exiting " + self.name
def process_data(threadName, q):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = q.get()
queueLock.release()
print "%s processing %s" % (threadName, data)
else:
queueLock.release()
time.sleep(1)
threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1
# Create new threads
for tName in threadList:
thread = myThread(threadID, tName, workQueue)
thread.start()
threads.append(thread)
threadID += 1
# Fill the queue
queueLock.acquire()
for word in nameList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads it's time to exit
exitFlag = 1
# Wait for all threads to complete
for t in threads:
t.join()
print "Exiting Main Thread"
多進程使用Queue通信的例子
import time
from multiprocessing import Process,Queue
MSG_QUEUE = Queue(5)
def startA(msgQueue):
while True:
if msgQueue.empty() > 0:
print ('queue is empty %d' % (msgQueue.qsize()))
else:
msg = msgQueue.get()
print( 'get msg %s' % (msg,))
time.sleep(1)
def startB(msgQueue):
while True:
msgQueue.put('hello world')
print( 'put hello world queue size is %d' % (msgQueue.qsize(),))
time.sleep(3)
if __name__ == '__main__':
processA = Process(target=startA,args=(MSG_QUEUE,))
processB = Process(target=startB,args=(MSG_QUEUE,))
processA.start()
print( 'processA start..')
主進程定義了一個Queue類型的變量,并作為Process的args參數(shù)傳給子進程processA和processB,兩個進程一個向隊列中寫數(shù)據(jù),一個讀數(shù)據(jù)。
數(shù)據(jù)分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
訓練與驗證損失驟升:機器學習訓練中的異常診斷與解決方案 在機器學習模型訓練過程中,“損失曲線” 是反映模型學習狀態(tài)的核心指 ...
2025-09-19解析 DataHub 與 Kafka:數(shù)據(jù)生態(tài)中兩類核心工具的差異與協(xié)同 在數(shù)字化轉(zhuǎn)型加速的今天,企業(yè)對數(shù)據(jù)的需求已從 “存儲” 轉(zhuǎn)向 “ ...
2025-09-19CDA 數(shù)據(jù)分析師:讓統(tǒng)計基本概念成為業(yè)務(wù)決策的底層邏輯 統(tǒng)計基本概念是商業(yè)數(shù)據(jù)分析的 “基礎(chǔ)語言”—— 從描述數(shù)據(jù)分布的 “均 ...
2025-09-19CDA 數(shù)據(jù)分析師:表結(jié)構(gòu)數(shù)據(jù) “獲取 - 加工 - 使用” 全流程的賦能者 表結(jié)構(gòu)數(shù)據(jù)(如數(shù)據(jù)庫表、Excel 表、CSV 文件)是企業(yè)數(shù)字 ...
2025-09-19SQL Server 中 CONVERT 函數(shù)的日期轉(zhuǎn)換:從基礎(chǔ)用法到實戰(zhàn)優(yōu)化 在 SQL Server 的數(shù)據(jù)處理中,日期格式轉(zhuǎn)換是高頻需求 —— 無論 ...
2025-09-18MySQL 大表拆分與關(guān)聯(lián)查詢效率:打破 “拆分必慢” 的認知誤區(qū) 在 MySQL 數(shù)據(jù)庫管理中,“大表” 始終是性能優(yōu)化繞不開的話題。 ...
2025-09-18DSGE 模型中的 Et:理性預期算子的內(nèi)涵、作用與應(yīng)用解析 動態(tài)隨機一般均衡(Dynamic Stochastic General Equilibrium, DSGE)模 ...
2025-09-17Python 提取 TIF 中地名的完整指南 一、先明確:TIF 中的地名有哪兩種存在形式? 在開始提取前,需先判斷 TIF 文件的類型 —— ...
2025-09-17CDA 數(shù)據(jù)分析師:解鎖表結(jié)構(gòu)數(shù)據(jù)特征價值的專業(yè)核心 表結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 規(guī)范存儲的結(jié)構(gòu)化數(shù)據(jù),如數(shù)據(jù)庫表、Excel 表、 ...
2025-09-17Excel 導入數(shù)據(jù)含缺失值?詳解 dropna 函數(shù)的功能與實戰(zhàn)應(yīng)用 在用 Python(如 pandas 庫)處理 Excel 數(shù)據(jù)時,“缺失值” 是高頻 ...
2025-09-16深入解析卡方檢驗與 t 檢驗:差異、適用場景與實踐應(yīng)用 在數(shù)據(jù)分析與統(tǒng)計學領(lǐng)域,假設(shè)檢驗是驗證研究假設(shè)、判斷數(shù)據(jù)差異是否 “ ...
2025-09-16CDA 數(shù)據(jù)分析師:掌控表格結(jié)構(gòu)數(shù)據(jù)全功能周期的專業(yè)操盤手 表格結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 存儲的結(jié)構(gòu)化數(shù)據(jù),如 Excel 表、數(shù)據(jù) ...
2025-09-16MySQL 執(zhí)行計劃中 rows 數(shù)量的準確性解析:原理、影響因素與優(yōu)化 在 MySQL SQL 調(diào)優(yōu)中,EXPLAIN執(zhí)行計劃是核心工具,而其中的row ...
2025-09-15解析 Python 中 Response 對象的 text 與 content:區(qū)別、場景與實踐指南 在 Python 進行 HTTP 網(wǎng)絡(luò)請求開發(fā)時(如使用requests ...
2025-09-15CDA 數(shù)據(jù)分析師:激活表格結(jié)構(gòu)數(shù)據(jù)價值的核心操盤手 表格結(jié)構(gòu)數(shù)據(jù)(如 Excel 表格、數(shù)據(jù)庫表)是企業(yè)最基礎(chǔ)、最核心的數(shù)據(jù)形態(tài) ...
2025-09-15Python HTTP 請求工具對比:urllib.request 與 requests 的核心差異與選擇指南 在 Python 處理 HTTP 請求(如接口調(diào)用、數(shù)據(jù)爬取 ...
2025-09-12解決 pd.read_csv 讀取長浮點數(shù)據(jù)的科學計數(shù)法問題 為幫助 Python 數(shù)據(jù)從業(yè)者解決pd.read_csv讀取長浮點數(shù)據(jù)時的科學計數(shù)法問題 ...
2025-09-12CDA 數(shù)據(jù)分析師:業(yè)務(wù)數(shù)據(jù)分析步驟的落地者與價值優(yōu)化者 業(yè)務(wù)數(shù)據(jù)分析是企業(yè)解決日常運營問題、提升執(zhí)行效率的核心手段,其價值 ...
2025-09-12用 SQL 驗證業(yè)務(wù)邏輯:從規(guī)則拆解到數(shù)據(jù)把關(guān)的實戰(zhàn)指南 在業(yè)務(wù)系統(tǒng)落地過程中,“業(yè)務(wù)邏輯” 是連接 “需求設(shè)計” 與 “用戶體驗 ...
2025-09-11塔吉特百貨孕婦營銷案例:數(shù)據(jù)驅(qū)動下的精準零售革命與啟示 在零售行業(yè) “流量紅利見頂” 的當下,精準營銷成為企業(yè)突圍的核心方 ...
2025-09-11