
CDA數(shù)據(jù)分析師 出品
相信大家在做一些算法經(jīng)常會被龐大的數(shù)據(jù)量所造成的超多計(jì)算量需要的時(shí)間而折磨的痛苦不已,接下來我們圍繞四個方法來幫助大家加快一下python的計(jì)算時(shí)間,減少大家在算法上的等待時(shí)間。今天給大家講述最后一方面的內(nèi)容,關(guān)于Dask的方法運(yùn)用。
隨著對機(jī)器學(xué)習(xí)算法并行化的需求不斷增加,由于數(shù)據(jù)大小甚至模型大小呈指數(shù)級增長,如果我們擁有一個工具,可以幫助我們并行化處理Pandas的DataFrame,可以并行化處理Numpy的計(jì)算,甚至并行化我們的機(jī)器學(xué)習(xí)算法(可能是來自sklearn和Tensorflow的算法)也沒有太多的麻煩,那它對我們會非常有幫助。
好消息是確實(shí)存在這樣的庫,其名稱為Dask。Dask是一個并行計(jì)算庫,它不僅有助于并行化現(xiàn)有的機(jī)器學(xué)習(xí)工具(Pandas和Numpy)(即使用高級集合),而且還有助于并行化低級任務(wù)/功能,并且可以通過制作任務(wù)圖來處理這些功能之間的復(fù)雜交互。[ 即使用低級調(diào)度程序 ]這類似于Python的線程或多處理模塊。
他們也有一個單獨(dú)的機(jī)器學(xué)習(xí)庫dask-ml,這與如現(xiàn)有的庫集成如sklearn,xgboost和tensorflow。
Dask通過繪制任務(wù)之間的交互圖來并行化分配給它的任務(wù)。使用Dask的.visualize方法來可視化你的工作將非常有幫助,該方法可用于所有數(shù)據(jù)類型以及你計(jì)算的復(fù)雜任務(wù)鏈。此方法將輸出你的任務(wù)圖,并且如果你的任務(wù)在每個級別具有多個節(jié)點(diǎn)(即,你的任務(wù)鏈結(jié)構(gòu)在多個層次上具有許多獨(dú)立的任務(wù),例如數(shù)據(jù)塊上的并行任務(wù)),然后Dask將能夠并行化它們。
注意: Dask仍是一個相對較新的項(xiàng)目。它還有很長的路要走。不過,如果你不想學(xué)習(xí)全新的API(例如PySpark),Dask是你的最佳選擇,將來肯定會越來越好。Spark / PySpark仍然遙遙領(lǐng)先,并且仍將繼續(xù)改進(jìn)。這是一個完善的Apache項(xiàng)目。
Dask中的每種數(shù)據(jù)類型都提供現(xiàn)有數(shù)據(jù)類型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。這些數(shù)據(jù)類型可以大于你的內(nèi)存,Dask將以Blocked方式對數(shù)據(jù)并行(y)運(yùn)行計(jì)算。Blocked從某種意義上說,它們是通過執(zhí)行許多小的計(jì)算(即,以塊為單位)來執(zhí)行大型計(jì)算的,而塊的數(shù)量為chunks的總數(shù)。
a)數(shù)組:
網(wǎng)格中的許多Numpy數(shù)組作為Dask數(shù)組
Dask Array對非常大的數(shù)組進(jìn)行操作,將它們劃分為塊并并行執(zhí)行這些塊。它有許多可用的numpy方法,你可以使用這些方法來加快速度。但是其中一些沒有實(shí)現(xiàn)。
只要支持numpy切片,Dask Array就可以從任何類似數(shù)組結(jié)構(gòu)中讀取數(shù)據(jù),并且可以通過使用并且通過使用Dask . Array .from_array方法具有.shape屬性。它還可以讀取.npy和.zarr文件。
import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它會生成大小為(1000,1000)的塊darr.npartitioins# 100
當(dāng)你的數(shù)組真的很重時(shí)(即它們無法放入內(nèi)存)并且numpy對此無能為力時(shí),可以使用它。因此,Dask將它們分為數(shù)組塊并為你并行處理它們。
現(xiàn)在,Dask對每種方法進(jìn)行惰性評估。因此,要實(shí)際計(jì)算函數(shù)的值,必須使用.compute方法。它將以塊為單位并行計(jì)算結(jié)果,同時(shí)并行化每個獨(dú)立任務(wù)。
result = darr.compute
1)元素?cái)?shù)量較少時(shí),Numpy比Dask快;2)Dask接管了Numpy,耗時(shí)約1e7個元素;3)Numpy無法產(chǎn)生更多元素的結(jié)果,因?yàn)樗鼰o法將它們存儲在內(nèi)存中。
b)DataFrame:
5個Pandas DataFrame在一個Dask DataFrame中提供每月數(shù)據(jù)(可以來自diff文件)
與Dask Arrays相似,Dask DataFrames通過將文件劃分為塊并將這些塊的計(jì)算函數(shù)并行化,從而對不適合內(nèi)存非常大的數(shù)據(jù)文件進(jìn)行并行計(jì)算。
import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)
現(xiàn)在,你可以應(yīng)用/使用pandas庫中可用的大多數(shù)功能,并在此處應(yīng)用。
agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #請查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head
c)Bag:
Dask Bag包并行處理包含多個數(shù)據(jù)類型元素Python的list相似對象。當(dāng)你嘗試處理一些半結(jié)構(gòu)化數(shù)據(jù)(例如JSON Blob或日志文件)時(shí),此功能很有用。
import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)
Daskbag逐行讀取,.take方法輸出指定行數(shù)的元組。
Dask Bag在這樣的Python對象集合上實(shí)現(xiàn)例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成這個任務(wù),占用的內(nèi)存很小。它類似于PyToolz的并行版本或PySpark RDD的Python版本。
filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute
如果你的任務(wù)有點(diǎn)簡單,并且你不能或不想使用這些高級集合來執(zhí)行操作,則可以使用低級調(diào)度程序,該程序可幫助你使用dask.delayed接口并行化代碼/算法。dask.delayed也可以進(jìn)行延遲計(jì)算。
import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum
你可以根據(jù)需要在這些函數(shù)之間添加復(fù)雜的交互,使用上一個任務(wù)的結(jié)果作為下一個任務(wù)的參數(shù)。Dask不會立即計(jì)算這些函數(shù),而是會為你的任務(wù)繪制圖形,有效地合并你使用的函數(shù)之間的交互。
inputs = list(np.arange(1, 11))#將外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 計(jì)算輸入的sq并保存 # 延遲計(jì)算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加兩個連續(xù) # 結(jié)果從prev步驟inputs = tempresult = sum(inputs) # 將所有prev步驟的結(jié)果相加results.compute
你可以將延遲添加到具有許多可能的小塊的任何可并行化代碼中,從而獲得加速的效果。它可以是你想計(jì)算的許多函數(shù),例如上面的示例,或者可以使用并行讀取多個文件pandas.read_csv。
首先,到目前為止,我們一直使用Dask的默認(rèn)調(diào)度器來計(jì)算任務(wù)的結(jié)果。但是你可以根據(jù)需要從Dask提供的選項(xiàng)中更改它們。
Dask 帶有四個可用的調(diào)度程序:
· threaded:由線程池支持的調(diào)度程序
· processes:由進(jìn)程池支持的調(diào)度程序
· single-threaded(又名" sync"):同步調(diào)度程序,用于調(diào)試
· distributed:用于在多臺計(jì)算機(jī)上執(zhí)行圖形的分布式調(diào)度程序
result.compute(scheduler="single-threaded") #用于調(diào)試# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(從官方網(wǎng)頁)#當(dāng)被稱為GIL的函數(shù)釋放時(shí),線程任務(wù)將工作得很好,而多處理總是啟動時(shí)間較慢,并且在任務(wù)之間需要大量的通信。# 你可以通過其中一個得到調(diào)度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#單線程的最后一個
但是,Dask還有一個調(diào)度器,dask.distributed由于以下原因它可能是首選使用的:
1. 它提供了異步API的訪問,尤其是Future,
1. 它提供了一個診斷儀表板,可以提供有關(guān)性能和進(jìn)度的寶貴見解
1. 它可以更復(fù)雜地處理數(shù)據(jù)位置,因此在需要多個流程的工作負(fù)載上,它比多處理調(diào)度程序更有效。
你可以創(chuàng)建一個Dask的dask.distributed調(diào)度程序,通過導(dǎo)入和創(chuàng)建客戶端實(shí)現(xiàn)分布式調(diào)度器
from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以導(dǎo)航到http://localhost:8787/status 查看# 診斷儀表板,如果你有Bokeh安裝的話
現(xiàn)在,你可以使用client.submit方法,將函數(shù)和參數(shù)作為其參數(shù),從而將任務(wù)提交到此集群。然后我們可以使用client.gather或.result方法來收集結(jié)果。
sent = client.submit(sq, 4) # sq: square 函數(shù)result = client.gather(sent) # 或者 sent.result
你也可以僅使用dask.distributed.progress來查看當(dāng)前單元格中任務(wù)的進(jìn)度。你還可以明確選擇使用dask.distributed.wait來等待任務(wù)完成。
Note: (Local Cluster)有時(shí)您會注意到Dask正在超出內(nèi)存使用,即使它正在劃分任務(wù)。它可能發(fā)生在您身上,因?yàn)槟噲D在數(shù)據(jù)集上使用的函數(shù)需要您的大部分?jǐn)?shù)據(jù)進(jìn)行處理,而多重處理可能使情況變得更糟,因?yàn)樗泄ぷ魅藛T都可能試圖將數(shù)據(jù)集復(fù)制到內(nèi)存中。這可能發(fā)生在聚合的情況下?;蛘吣赡芟胂拗艱ask只使用特定數(shù)量的內(nèi)存。
在這些情況下,您可以使用Dask.distributed。LocalCluster參數(shù),并將它們傳遞給Client,從而使用本地機(jī)器的核心構(gòu)建LocalCluster。
from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client
'scheduler_port=0'和' stics_port=0'將為這個特定的客戶端選擇隨機(jī)端口號。在'process =False'的情況下,dask的客戶端不會復(fù)制數(shù)據(jù)集,這可能發(fā)生在您所創(chuàng)建的每個進(jìn)程中。您可以根據(jù)自己的需要或限制對客戶機(jī)進(jìn)行調(diào)優(yōu),要了解更多信息,可以查看LocalCluster的參數(shù)。您還可以在同一臺機(jī)器的不同端口上使用多個客戶機(jī)。
Dask也有一個庫,可以幫助并允許大多數(shù)流行的機(jī)器學(xué)習(xí)庫,例如sklearn,tensorflow和xgboost。
在機(jī)器學(xué)習(xí)中,你可能會遇到幾個不同的問題。而具體的策略取決于你面臨的問題:
1. 大型模型:數(shù)據(jù)適合放入RAM,但是訓(xùn)練時(shí)間太長。許多超參數(shù)組合,許多模型的大型集合等。
1. 大型數(shù)據(jù)集:數(shù)據(jù)大于RAM,并且不能選擇抽樣。
因此,你應(yīng)該:
· 對于內(nèi)存中適合的問題,只需使用scikit-learn(或你最喜歡的ML庫)即可;
· 對于大型模型,請使用dask_ml.joblib和你最喜歡的scikit-learn估算器
· 對于大型數(shù)據(jù)集,請使用dask_ml估算器。
a)預(yù)處理:
dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(穩(wěn)健標(biāo)量),StandardScalar(標(biāo)準(zhǔn)標(biāo)量),LabelEncoder(標(biāo)簽編碼器),OneHotEncoder(獨(dú)熱編碼),PolynomialFeatures(多項(xiàng)式特性)等等,以及它的一些自己的如Categorizer(分類器),DummyEncoder(虛擬編碼),OrdinalEncoder(序數(shù)編碼器)等。
你可以像使用PandasDataFrame一樣使用它們。
from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])
你可以使用Dask的DataFrame上的預(yù)處理方法,從Sklearn的Make_pipeline方法生成一個管道。
b)超參數(shù)搜索:
Dask具有sklearn用于進(jìn)行超參數(shù)搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。
from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)
而且,如果要partial_fit與估算器一起使用,則可以使用dask-ml的IncrementalSearchCV。
注意:(來自Dask)如果要使用后擬合任務(wù)(如評分和預(yù)測),則使用基礎(chǔ)估計(jì)量評分方法。如果你的估算器(可能來自sklearn )無法處理大型數(shù)據(jù)集,則將估算器包裝在" dask_ml.wrappers.ParallelPostFit" 周圍。它可以并行化" predict"," predict_proba"," transform"等方法。
c)模型/估計(jì)器:
Dask具有一些線性模型(的LinearRegression,LogisticRegression等),一些聚類模型(Kmeans和SpectralClustering),一種使用Tensorflow 的方法,使用Dask訓(xùn)練XGBoost模型的方法。
如果訓(xùn)練數(shù)據(jù)較小,則可以將sklearn的模型與結(jié)合使用Dask,如果ParallelPostFit包裝數(shù)據(jù)較大,則可以與包裝器一起使用(如果測試數(shù)據(jù)較大)。
from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)
如果數(shù)據(jù)集不大但模型很大,則可以使用joblib。sklearns編寫了許多用于并行執(zhí)行的算法(你可能使用過n_jobs=-1參數(shù)),joblib該算法利用線程和進(jìn)程來并行化工作負(fù)載。要用于Dask并行化,你可以創(chuàng)建一個Client(客戶端)(必須),然后使用with joblib.parallel_backend('dask'):包裝代碼。
import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代碼
注意:DASK JOBLIB后端對于擴(kuò)展CPU綁定的工作負(fù)載非常有用; 在RAM中包含數(shù)據(jù)集的工作負(fù)載,但具有許多可以并行完成的單獨(dú)操作。要擴(kuò)展到受RAM約束的工作負(fù)載(大于內(nèi)存的數(shù)據(jù)集),你應(yīng)該使用Dask的內(nèi)置模型和方法。
而且,如果你訓(xùn)練的數(shù)據(jù)太大而無法容納到內(nèi)存中,那么你應(yīng)該使用Dask的內(nèi)置估算器來加快速度。你也可以使用Dask的wrapper.Incremental它使用基礎(chǔ)估算器的partial_fit方法對整個數(shù)據(jù)集進(jìn)行訓(xùn)練,但實(shí)際上是連續(xù)的。
Dask的內(nèi)置估計(jì)器很好地?cái)U(kuò)展用于大型數(shù)據(jù)集與多種優(yōu)化算法,如admm,lbfgs,gradient_descent等,并且正則化器如 L1,L2,ElasticNet等。
from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")
經(jīng)過4期的內(nèi)容講解,你學(xué)會加快Python算法的四種方法了么?
數(shù)據(jù)分析咨詢請掃描二維碼
若不方便掃碼,搜微信號:CDAshujufenxi
SQL Server 中 CONVERT 函數(shù)的日期轉(zhuǎn)換:從基礎(chǔ)用法到實(shí)戰(zhàn)優(yōu)化 在 SQL Server 的數(shù)據(jù)處理中,日期格式轉(zhuǎn)換是高頻需求 —— 無論 ...
2025-09-18MySQL 大表拆分與關(guān)聯(lián)查詢效率:打破 “拆分必慢” 的認(rèn)知誤區(qū) 在 MySQL 數(shù)據(jù)庫管理中,“大表” 始終是性能優(yōu)化繞不開的話題。 ...
2025-09-18CDA 數(shù)據(jù)分析師:表結(jié)構(gòu)數(shù)據(jù) “獲取 - 加工 - 使用” 全流程的賦能者 表結(jié)構(gòu)數(shù)據(jù)(如數(shù)據(jù)庫表、Excel 表、CSV 文件)是企業(yè)數(shù)字 ...
2025-09-18DSGE 模型中的 Et:理性預(yù)期算子的內(nèi)涵、作用與應(yīng)用解析 動態(tài)隨機(jī)一般均衡(Dynamic Stochastic General Equilibrium, DSGE)模 ...
2025-09-17Python 提取 TIF 中地名的完整指南 一、先明確:TIF 中的地名有哪兩種存在形式? 在開始提取前,需先判斷 TIF 文件的類型 —— ...
2025-09-17CDA 數(shù)據(jù)分析師:解鎖表結(jié)構(gòu)數(shù)據(jù)特征價(jià)值的專業(yè)核心 表結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 規(guī)范存儲的結(jié)構(gòu)化數(shù)據(jù),如數(shù)據(jù)庫表、Excel 表、 ...
2025-09-17Excel 導(dǎo)入數(shù)據(jù)含缺失值?詳解 dropna 函數(shù)的功能與實(shí)戰(zhàn)應(yīng)用 在用 Python(如 pandas 庫)處理 Excel 數(shù)據(jù)時(shí),“缺失值” 是高頻 ...
2025-09-16深入解析卡方檢驗(yàn)與 t 檢驗(yàn):差異、適用場景與實(shí)踐應(yīng)用 在數(shù)據(jù)分析與統(tǒng)計(jì)學(xué)領(lǐng)域,假設(shè)檢驗(yàn)是驗(yàn)證研究假設(shè)、判斷數(shù)據(jù)差異是否 “ ...
2025-09-16CDA 數(shù)據(jù)分析師:掌控表格結(jié)構(gòu)數(shù)據(jù)全功能周期的專業(yè)操盤手 表格結(jié)構(gòu)數(shù)據(jù)(以 “行 - 列” 存儲的結(jié)構(gòu)化數(shù)據(jù),如 Excel 表、數(shù)據(jù) ...
2025-09-16MySQL 執(zhí)行計(jì)劃中 rows 數(shù)量的準(zhǔn)確性解析:原理、影響因素與優(yōu)化 在 MySQL SQL 調(diào)優(yōu)中,EXPLAIN執(zhí)行計(jì)劃是核心工具,而其中的row ...
2025-09-15解析 Python 中 Response 對象的 text 與 content:區(qū)別、場景與實(shí)踐指南 在 Python 進(jìn)行 HTTP 網(wǎng)絡(luò)請求開發(fā)時(shí)(如使用requests ...
2025-09-15CDA 數(shù)據(jù)分析師:激活表格結(jié)構(gòu)數(shù)據(jù)價(jià)值的核心操盤手 表格結(jié)構(gòu)數(shù)據(jù)(如 Excel 表格、數(shù)據(jù)庫表)是企業(yè)最基礎(chǔ)、最核心的數(shù)據(jù)形態(tài) ...
2025-09-15Python HTTP 請求工具對比:urllib.request 與 requests 的核心差異與選擇指南 在 Python 處理 HTTP 請求(如接口調(diào)用、數(shù)據(jù)爬取 ...
2025-09-12解決 pd.read_csv 讀取長浮點(diǎn)數(shù)據(jù)的科學(xué)計(jì)數(shù)法問題 為幫助 Python 數(shù)據(jù)從業(yè)者解決pd.read_csv讀取長浮點(diǎn)數(shù)據(jù)時(shí)的科學(xué)計(jì)數(shù)法問題 ...
2025-09-12CDA 數(shù)據(jù)分析師:業(yè)務(wù)數(shù)據(jù)分析步驟的落地者與價(jià)值優(yōu)化者 業(yè)務(wù)數(shù)據(jù)分析是企業(yè)解決日常運(yùn)營問題、提升執(zhí)行效率的核心手段,其價(jià)值 ...
2025-09-12用 SQL 驗(yàn)證業(yè)務(wù)邏輯:從規(guī)則拆解到數(shù)據(jù)把關(guān)的實(shí)戰(zhàn)指南 在業(yè)務(wù)系統(tǒng)落地過程中,“業(yè)務(wù)邏輯” 是連接 “需求設(shè)計(jì)” 與 “用戶體驗(yàn) ...
2025-09-11塔吉特百貨孕婦營銷案例:數(shù)據(jù)驅(qū)動下的精準(zhǔn)零售革命與啟示 在零售行業(yè) “流量紅利見頂” 的當(dāng)下,精準(zhǔn)營銷成為企業(yè)突圍的核心方 ...
2025-09-11CDA 數(shù)據(jù)分析師與戰(zhàn)略 / 業(yè)務(wù)數(shù)據(jù)分析:概念辨析與協(xié)同價(jià)值 在數(shù)據(jù)驅(qū)動決策的體系中,“戰(zhàn)略數(shù)據(jù)分析”“業(yè)務(wù)數(shù)據(jù)分析” 是企業(yè) ...
2025-09-11Excel 數(shù)據(jù)聚類分析:從操作實(shí)踐到業(yè)務(wù)價(jià)值挖掘 在數(shù)據(jù)分析場景中,聚類分析作為 “無監(jiān)督分組” 的核心工具,能從雜亂數(shù)據(jù)中挖 ...
2025-09-10統(tǒng)計(jì)模型的核心目的:從數(shù)據(jù)解讀到?jīng)Q策支撐的價(jià)值導(dǎo)向 統(tǒng)計(jì)模型作為數(shù)據(jù)分析的核心工具,并非簡單的 “公式堆砌”,而是圍繞特定 ...
2025-09-10