
Python多進程并行編程實踐: mpi4py的使用
在高性能計算的項目中我們通常都會使用效率更高的編譯型的語言例如C、C++、Fortran等,但是由于Python的靈活性和易用性使得它在發(fā)展和驗證算法方面?zhèn)涫苋藗兊那嗖A于是在高性能計算領域也經(jīng)常能看到Python的身影了。本文簡單介紹在Python環(huán)境下使用MPI接口在集群上進行多進程并行計算的方法。
MPI(Message Passing Interface)
這里我先對MPI進行一下簡單的介紹,MPI的全稱是Message Passing Interface,即消息傳遞接口。
它并不是一門語言,而是一個庫,我們可以用Fortran、C、C++結合MPI提供的接口來將串行的程序進行并行化處理,也可以認為Fortran+MPI或者C+MPI是一種再原來串行語言的基礎上擴展出來的并行語言。
它是一種標準而不是特定的實現(xiàn),具體的可以有很多不同的實現(xiàn),例如MPICH、OpenMPI等。
它是一種消息傳遞編程模型,顧名思義,它就是專門服務于進程間通信的。
MPI的工作方式很好理解,我們可以同時啟動一組進程,在同一個通信域中不同的進程都有不同的編號,程序員可以利用MPI提供的接口來給不同編號的進程分配不同的任務和幫助進程相互交流最終完成同一個任務。就好比包工頭給工人們編上了工號然后指定一個方案來給不同編號的工人分配任務并讓工人相互溝通完成任務。
Python中的并行
由于CPython中的GIL的存在我們可以暫時不奢望能在CPython中使用多線程利用多核資源進行并行計算了,因此我們在Python中可以利用多進程的方式充分利用多核資源。
Python中我們可以使用很多方式進行多進程編程,例如os.fork()來創(chuàng)建進程或者通過multiprocessing模塊來更方便的創(chuàng)建進程和進程池等。在上一篇《Python多進程并行編程實踐-multiprocessing模塊》中我們使用進程池來方便的管理Python進程并且通過multiprocessing模塊中的Manager管理分布式進程實現(xiàn)了計算的多機分布式計算。
與多線程的共享式內(nèi)存不同,由于各個進程都是相互獨立的,因此進程間通信再多進程中扮演這非常重要的角色,Python中我們可以使用multiprocessing模塊中的pipe、queue、Array、Value等等工具來實現(xiàn)進程間通訊和數(shù)據(jù)共享,但是在編寫起來仍然具有很大的不靈活性。而這一方面正是MPI所擅長的領域,因此如果能夠在Python中調用MPI的接口那真是太完美了不是么。
MPI與mpi4py
mpi4py是一個構建在MPI之上的Python庫,主要使用Cython編寫。mpi4py使得Python的數(shù)據(jù)結構可以方便的在多進程中傳遞。
mpi4py是一個很強大的庫,它實現(xiàn)了很多MPI標準中的接口,包括點對點通信,組內(nèi)集合通信、非阻塞通信、重復非阻塞通信、組間通信等,基本上我能想到用到的MPI接口mpi4py中都有相應的實現(xiàn)。不僅是Python對象,mpi4py對numpy也有很好的支持并且傳遞效率很高。同時它還提供了SWIG和F2PY的接口能夠讓我們將自己的Fortran或者C/C++程序在封裝成Python后仍然能夠使用mpi4py的對象和接口來進行并行處理??梢妋pi4py的作者的功力的確是非常了得。
mpi4py
這里我開始對在Python環(huán)境中使用mpi4py的接口進行并行編程進行介紹。
MPI環(huán)境管理
mpi4py提供了相應的接口Init()和Finalize()來初始化和結束mpi環(huán)境。但是mpi4py通過在__init__.py中寫入了初始化的操作,因此在我們from mpi4py import MPI的時候就已經(jīng)自動初始化mpi環(huán)境。
MPI_Finalize()被注冊到了Python的C接口Py_AtExit(),這樣在Python進程結束時候就會自動調用MPI_Finalize(), 因此不再需要我們顯式的去掉用Finalize()。
通信域(Communicator)
mpi4py直接提供了相應的通信域的Python類,其中Comm是通信域的基類,Intracomm和Intercomm是其派生類,這根MPI的C++實現(xiàn)中是相同的。
同時它也提供了兩個預定義的通信域對象:
包含所有進程的COMM_WORLD
只包含調用進程本身的COMM_SELF
In[1]:frommpi4pyimportMPI
In[2]:MPI.COMM_SELF
Out[2]: <mpi4py.MPI.Intracommat0x7f2fa2fd59d0>
In[3]:MPI.COMM_WORLD
Out[3]: <mpi4py.MPI.Intracommat0x7f2fa2fd59f0>
通信域對象則提供了與通信域相關的接口,例如獲取當前進程號、獲取通信域內(nèi)的進程數(shù)、獲取進程組、對進程組進行集合運算、分割合并等等。
In[4]:comm=MPI.COMM_WORLD
In[5]:comm.Get_rank()
Out[5]:0
In[6]:comm.Get_size()
Out[6]:1
In[7]:comm.Get_group()
Out[7]: <mpi4py.MPI.Groupat0x7f2fa40fec30>
In[9]:comm.Split(0,0)
Out[9]: <mpi4py.MPI.Intracommat0x7f2fa2fd5bd0>
關于通信域與進程組的操作這里就不細講了,可以參考Introduction to Groups and Communicators
點對點通信
mpi4py提供了點對點通信的接口使得多個進程間能夠互相傳遞Python的內(nèi)置對象(基于pickle序列化),同時也提供了直接的數(shù)組傳遞(numpy數(shù)組,接近C語言的效率)。
如果我們需要傳遞通用的Python對象,則需要使用通信域對象的方法中小寫的接口,例如send(),recv(),isend()等。
如果需要直接傳遞數(shù)據(jù)對象,則需要調用大寫的接口,例如Send(),Recv(),Isend()等,這與C++接口中的拼寫是一樣的。
MPI中的點到點通信有很多中,其中包括標準通信,緩存通信,同步通信和就緒通信,同時上面這些通信又有非阻塞的異步版本等等。這些在mpi4py中都有相應的Python版本的接口來讓我們更靈活的處理進程間通信。這里我只用標準通信的阻塞和非阻塞版本來做個舉例:
阻塞標準通信
這里我嘗試使用mpi4py的接口在兩個進程中傳遞Python list對象。
frommpi4pyimportMPI
importnumpyasnp
comm=MPI.COMM_WORLD
rank=comm.Get_rank()
size=comm.Get_size()
ifrank==0:
data=range(10)
comm.send(data,dest=1,tag=11)
print("process {} send {}...".format(rank,data))
else:
data=comm.recv(source=0,tag=11)
print("process {} recv {}...".format(rank,data))
執(zhí)行效果:
zjshao@vaio:~/temp_codes/mpipy$mpiexec-np2pythontemp.py
process0send[0,1,2,3,4,5,6,7,8,9]...
process1recv[0,1,2,3,4,5,6,7,8,9]...
非阻塞標準通信
所有的阻塞通信mpi都提供了一個非阻塞的版本,類似與我們編寫異步程序不阻塞在耗時的IO上是一樣的,MPI的非阻塞通信也不會阻塞消息的傳遞過程中,這樣能夠充分利用處理器資源提升整個程序的效率。
來張圖看看阻塞通信與非阻塞通信的對比:
非阻塞通信的消息發(fā)送和接受:
同樣的,我們也可以寫一個上面例子的非阻塞版本。
frommpi4pyimportMPI
importnumpyasnp
comm=MPI.COMM_WORLD
rank=comm.Get_rank()
size=comm.Get_size()
ifrank==0:
data=range(10)
comm.isend(data,dest=1,tag=11)
print("process {} immediate send {}...".format(rank,data))
else:
data=comm.recv(source=0,tag=11)
print("process {} recv {}...".format(rank,data))
執(zhí)行結果,注意非阻塞發(fā)送也可以用阻塞接收來接收消息:
zjshao@vaio:~/temp_codes/mpipy$mpiexec-np2pythontemp.py
process0immediatesend[0,1,2,3,4,5,6,7,8,9]...
process1recv[0,1,2,3,4,5,6,7,8,9]...
支持Numpy數(shù)組
mpi4py的一個很好的特點就是他對Numpy數(shù)組有很好的支持,我們可以通過其提供的接口來直接傳遞數(shù)據(jù)對象,這種方式具有很高的效率,基本上和C/Fortran直接調用MPI接口差不多(方式和效果)
例如我想傳遞長度為10的int數(shù)組,MPI的C++接口是:
void Comm::Send(const void * buf, int count, const Datatype & datatype, int dest, int tag) const
在mpi4py的接口中也及其類似, Comm.Send()中需要接收一個Python list作為參數(shù),其中包含所傳數(shù)據(jù)的地址,長度和類型。
來個阻塞標準通信的例子:
frommpi4pyimportMPI
importnumpyasnp
comm=MPI.COMM_WORLD
rank=comm.Get_rank()
size=comm.Get_size()
ifrank==0:
data=np.arange(10,dtype='i')
comm.Send([data,MPI.INT],dest=1,tag=11)
print("process {} Send buffer-like array {}...".format(rank,data))
else:
data=np.empty(10,dtype='i')
comm.Recv([data,MPI.INT],source=0,tag=11)
print("process {} recv buffer-like array {}...".format(rank,data))
執(zhí)行效果:
zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np2pythontemp.py
process0Sendbuffer-likearray[0123456789]...
process1recvbuffer-likearray[0123456789]...
組通信
MPI組通信和點到點通信的一個重要區(qū)別就是,在某個進程組內(nèi)所有的進程同時參加通信,mpi4py提供了方便的接口讓我們完成Python中的組內(nèi)集合通信,方便編程同時提高程序的可讀性和可移植性。
下面就幾個常用的集合通信來小試牛刀吧。
廣播
廣播操作是典型的一對多通信,將跟進程的數(shù)據(jù)復制到同組內(nèi)其他所有進程中。
在Python中我想將一個列表廣播到其他進程中:
frommpi4pyimportMPI
comm=MPI.COMM_WORLD
rank=comm.Get_rank()
size=comm.Get_size()
ifrank==0:
data=range(10)
print("process {} bcast data {} to other processes".format(rank,data))
else:
data=None
data=comm.bcast(data,root=0)
print("process {} recv data {}...".format(rank,data))
執(zhí)行結果:
zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np5pythontemp.py
process0bcastdata[0,1,2,3,4,5,6,7,8,9]toother processes
process0recvdata[0,1,2,3,4,5,6,7,8,9]...
process1recvdata[0,1,2,3,4,5,6,7,8,9]...
process3recvdata[0,1,2,3,4,5,6,7,8,9]...
process2recvdata[0,1,2,3,4,5,6,7,8,9]...
process4recvdata[0,1,2,3,4,5,6,7,8,9]...
發(fā)散
與廣播不同,發(fā)散可以向不同的進程發(fā)送不同的數(shù)據(jù),而不是完全復制。
例如我想將0-9發(fā)送到不同的進程中:
frommpi4pyimportMPI
importnumpyasnp
comm=MPI.COMM_WORLD
rank=comm.Get_rank()
size=comm.Get_size()
recv_data=None
ifrank==0:
send_data=range(10)
print("process {} scatter data {} to other processes".format(rank,send_data))
else:
send_data=None
recv_data=comm.scatter(send_data,root=0)
print("process {} recv data {}...".format(rank,recv_data))
發(fā)散結果:
zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np10pythontemp.py
process0scatterdata[0,1,2,3,4,5,6,7,8,9]toother processes
process0recvdata0...
process3recvdata3...
process5recvdata5...
process8recvdata8...
process2recvdata2...
process7recvdata7...
process4recvdata4...
process1recvdata1...
process9recvdata9...
process6recvdata6..
收集
收集過程是發(fā)散過程的逆過程,每個進程將發(fā)送緩沖區(qū)的消息發(fā)送給根進程,根進程根據(jù)發(fā)送進程的進程號將各自的消息存放到自己的消息緩沖區(qū)中。
收集結果:
zjshao@vaio:~/temp_codes/mpipy$/usr/bin/mpiexec-np5pythontemp.py
process2senddata2toroot...
process3senddata3toroot...
process0senddata0toroot...
process4senddata4toroot...
process1senddata1toroot...
process0gather alldata[0,1,2,3,4]...
其他的組內(nèi)通信還有歸約操作等等由于篇幅限制就不多講了,有興趣的可以去看看MPI的官方文檔和相應的教材。
mpi4py并行編程實踐
這里我就上篇《Python 多進程并行編程實踐: multiprocessing 模塊》中的二重循環(huán)繪制map的例子來使用mpi4py進行并行加速處理。
我打算同時啟動10個進程來將每個0軸需要計算和繪制的數(shù)據(jù)發(fā)送到不同的進程進行并行計算。
因此我需要將pO2s數(shù)組發(fā)散到10個進程中:
之后我需要在每個進程中根據(jù)接受到的pO2s的數(shù)據(jù)再進行一次pCOs循環(huán)來進行計算。
最終將每個進程計算的結果(TOF)進行收集操作:
comm.gather(tofs_1d, root=0)
由于代碼都是涉及的專業(yè)相關的東西我就不全列出來了,將mpi4py改過的并行版本放到10個進程中執(zhí)行可見:
效率提升了10倍左右。
總結
本文簡單介紹了mpi4py的接口在python中進行多進程編程的方法,MPI的接口非常龐大,相應的mpi4py也非常龐大,mpi4py還有實現(xiàn)了相應的SWIG和F2PY的封裝文件和類型映射,能夠幫助我們將Python同真正的C/C++以及Fortran程序在消息傳遞上實現(xiàn)統(tǒng)一。
數(shù)據(jù)分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
SQL Server 中 CONVERT 函數(shù)的日期轉換:從基礎用法到實戰(zhàn)優(yōu)化 在 SQL Server 的數(shù)據(jù)處理中,日期格式轉換是高頻需求 —— 無論 ...
2025-09-18MySQL 大表拆分與關聯(lián)查詢效率:打破 “拆分必慢” 的認知誤區(qū) 在 MySQL 數(shù)據(jù)庫管理中,“大表” 始終是性能優(yōu)化繞不開的話題。 ...
2025-09-18CDA 數(shù)據(jù)分析師:表結構數(shù)據(jù) “獲取 - 加工 - 使用” 全流程的賦能者 表結構數(shù)據(jù)(如數(shù)據(jù)庫表、Excel 表、CSV 文件)是企業(yè)數(shù)字 ...
2025-09-18DSGE 模型中的 Et:理性預期算子的內(nèi)涵、作用與應用解析 動態(tài)隨機一般均衡(Dynamic Stochastic General Equilibrium, DSGE)模 ...
2025-09-17Python 提取 TIF 中地名的完整指南 一、先明確:TIF 中的地名有哪兩種存在形式? 在開始提取前,需先判斷 TIF 文件的類型 —— ...
2025-09-17CDA 數(shù)據(jù)分析師:解鎖表結構數(shù)據(jù)特征價值的專業(yè)核心 表結構數(shù)據(jù)(以 “行 - 列” 規(guī)范存儲的結構化數(shù)據(jù),如數(shù)據(jù)庫表、Excel 表、 ...
2025-09-17Excel 導入數(shù)據(jù)含缺失值?詳解 dropna 函數(shù)的功能與實戰(zhàn)應用 在用 Python(如 pandas 庫)處理 Excel 數(shù)據(jù)時,“缺失值” 是高頻 ...
2025-09-16深入解析卡方檢驗與 t 檢驗:差異、適用場景與實踐應用 在數(shù)據(jù)分析與統(tǒng)計學領域,假設檢驗是驗證研究假設、判斷數(shù)據(jù)差異是否 “ ...
2025-09-16CDA 數(shù)據(jù)分析師:掌控表格結構數(shù)據(jù)全功能周期的專業(yè)操盤手 表格結構數(shù)據(jù)(以 “行 - 列” 存儲的結構化數(shù)據(jù),如 Excel 表、數(shù)據(jù) ...
2025-09-16MySQL 執(zhí)行計劃中 rows 數(shù)量的準確性解析:原理、影響因素與優(yōu)化 在 MySQL SQL 調優(yōu)中,EXPLAIN執(zhí)行計劃是核心工具,而其中的row ...
2025-09-15解析 Python 中 Response 對象的 text 與 content:區(qū)別、場景與實踐指南 在 Python 進行 HTTP 網(wǎng)絡請求開發(fā)時(如使用requests ...
2025-09-15CDA 數(shù)據(jù)分析師:激活表格結構數(shù)據(jù)價值的核心操盤手 表格結構數(shù)據(jù)(如 Excel 表格、數(shù)據(jù)庫表)是企業(yè)最基礎、最核心的數(shù)據(jù)形態(tài) ...
2025-09-15Python HTTP 請求工具對比:urllib.request 與 requests 的核心差異與選擇指南 在 Python 處理 HTTP 請求(如接口調用、數(shù)據(jù)爬取 ...
2025-09-12解決 pd.read_csv 讀取長浮點數(shù)據(jù)的科學計數(shù)法問題 為幫助 Python 數(shù)據(jù)從業(yè)者解決pd.read_csv讀取長浮點數(shù)據(jù)時的科學計數(shù)法問題 ...
2025-09-12CDA 數(shù)據(jù)分析師:業(yè)務數(shù)據(jù)分析步驟的落地者與價值優(yōu)化者 業(yè)務數(shù)據(jù)分析是企業(yè)解決日常運營問題、提升執(zhí)行效率的核心手段,其價值 ...
2025-09-12用 SQL 驗證業(yè)務邏輯:從規(guī)則拆解到數(shù)據(jù)把關的實戰(zhàn)指南 在業(yè)務系統(tǒng)落地過程中,“業(yè)務邏輯” 是連接 “需求設計” 與 “用戶體驗 ...
2025-09-11塔吉特百貨孕婦營銷案例:數(shù)據(jù)驅動下的精準零售革命與啟示 在零售行業(yè) “流量紅利見頂” 的當下,精準營銷成為企業(yè)突圍的核心方 ...
2025-09-11CDA 數(shù)據(jù)分析師與戰(zhàn)略 / 業(yè)務數(shù)據(jù)分析:概念辨析與協(xié)同價值 在數(shù)據(jù)驅動決策的體系中,“戰(zhàn)略數(shù)據(jù)分析”“業(yè)務數(shù)據(jù)分析” 是企業(yè) ...
2025-09-11Excel 數(shù)據(jù)聚類分析:從操作實踐到業(yè)務價值挖掘 在數(shù)據(jù)分析場景中,聚類分析作為 “無監(jiān)督分組” 的核心工具,能從雜亂數(shù)據(jù)中挖 ...
2025-09-10統(tǒng)計模型的核心目的:從數(shù)據(jù)解讀到?jīng)Q策支撐的價值導向 統(tǒng)計模型作為數(shù)據(jù)分析的核心工具,并非簡單的 “公式堆砌”,而是圍繞特定 ...
2025-09-10