99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁(yè)精彩閱讀加快python算法的四個(gè)方法:Dask篇
加快python算法的四個(gè)方法:Dask篇
2020-06-08
收藏

CDA數(shù)據(jù)分析師 出品

相信大家在做一些算法經(jīng)常會(huì)被龐大的數(shù)據(jù)量所造成的超多計(jì)算量需要的時(shí)間而折磨的痛苦不已,接下來我們圍繞四個(gè)方法來幫助大家加快一下python的計(jì)算時(shí)間,減少大家在算法上的等待時(shí)間。今天給大家講述最后一方面的內(nèi)容,關(guān)于Dask的方法運(yùn)用。

1.簡(jiǎn)介

隨著對(duì)機(jī)器學(xué)習(xí)算法并行化的需求不斷增加,由于數(shù)據(jù)大小甚至模型大小呈指數(shù)級(jí)增長(zhǎng),如果我們擁有一個(gè)工具,可以幫助我們并行化處理Pandas的DataFrame,可以并行化處理Numpy的計(jì)算,甚至并行化我們的機(jī)器學(xué)習(xí)算法(可能是來自sklearn和Tensorflow的算法)也沒有太多的麻煩,那它對(duì)我們會(huì)非常有幫助。

好消息是確實(shí)存在這樣的庫(kù),其名稱為Dask。Dask是一個(gè)并行計(jì)算庫(kù),它不僅有助于并行化現(xiàn)有的機(jī)器學(xué)習(xí)工具(Pandas和Numpy)(即使用高級(jí)集合),而且還有助于并行化低級(jí)任務(wù)/功能,并且可以通過制作任務(wù)圖來處理這些功能之間的復(fù)雜交互。[ 即使用低級(jí)調(diào)度程序 ]這類似于Python的線程或多處理模塊。

他們也有一個(gè)單獨(dú)的機(jī)器學(xué)習(xí)庫(kù)dask-ml,這與如現(xiàn)有的庫(kù)集成如sklearn,xgboost和tensorflow。

Dask通過繪制任務(wù)之間的交互圖來并行化分配給它的任務(wù)。使用Dask的.visualize方法來可視化你的工作將非常有幫助,該方法可用于所有數(shù)據(jù)類型以及你計(jì)算的復(fù)雜任務(wù)鏈。此方法將輸出你的任務(wù)圖,并且如果你的任務(wù)在每個(gè)級(jí)別具有多個(gè)節(jié)點(diǎn)(即,你的任務(wù)鏈結(jié)構(gòu)在多個(gè)層次上具有許多獨(dú)立的任務(wù),例如數(shù)據(jù)塊上的并行任務(wù)),然后Dask將能夠并行化它們。

注意: Dask仍是一個(gè)相對(duì)較新的項(xiàng)目。它還有很長(zhǎng)的路要走。不過,如果你不想學(xué)習(xí)全新的API(例如PySpark),Dask是你的最佳選擇,將來肯定會(huì)越來越好。Spark / PySpark仍然遙遙領(lǐng)先,并且仍將繼續(xù)改進(jìn)。這是一個(gè)完善的Apache項(xiàng)目。

2.數(shù)據(jù)類型

Dask中的每種數(shù)據(jù)類型都提供現(xiàn)有數(shù)據(jù)類型的分布式版本,例如pandas中的DataFrame、numpy中的ndarray和Python中的list。這些數(shù)據(jù)類型可以大于你的內(nèi)存,Dask將以Blocked方式對(duì)數(shù)據(jù)并行(y)運(yùn)行計(jì)算。Blocked從某種意義上說,它們是通過執(zhí)行許多小的計(jì)算(即,以塊為單位)來執(zhí)行大型計(jì)算的,而塊的數(shù)量為chunks的總數(shù)。

a)數(shù)組:

網(wǎng)格中的許多Numpy數(shù)組作為Dask數(shù)組

Dask Array對(duì)非常大的數(shù)組進(jìn)行操作,將它們劃分為塊并并行執(zhí)行這些塊。它有許多可用的numpy方法,你可以使用這些方法來加快速度。但是其中一些沒有實(shí)現(xiàn)。

只要支持numpy切片,Dask Array就可以從任何類似數(shù)組結(jié)構(gòu)中讀取數(shù)據(jù),并且可以通過使用并且通過使用Dask . Array .from_array方法具有.shape屬性。它還可以讀取.npy和.zarr文件。

import dask.array as daimport numpy as nparr = numpy.random.randint(1, 1000, (10000, 10000))darr = da.from_array(arr, chunks=(1000, 1000))# 它會(huì)生成大小為(1000,1000)的塊darr.npartitioins# 100

當(dāng)你的數(shù)組真的很重時(shí)(即它們無法放入內(nèi)存)并且numpy對(duì)此無能為力時(shí),可以使用它。因此,Dask將它們分為數(shù)組塊并為你并行處理它們。

現(xiàn)在,Dask對(duì)每種方法進(jìn)行惰性評(píng)估。因此,要實(shí)際計(jì)算函數(shù)的值,必須使用.compute方法。它將以塊為單位并行計(jì)算結(jié)果,同時(shí)并行化每個(gè)獨(dú)立任務(wù)。

result = darr.compute

1)元素?cái)?shù)量較少時(shí),Numpy比Dask快;2)Dask接管了Numpy,耗時(shí)約1e7個(gè)元素;3)Numpy無法產(chǎn)生更多元素的結(jié)果,因?yàn)樗鼰o法將它們存儲(chǔ)在內(nèi)存中。

b)DataFrame

5個(gè)Pandas DataFrame在一個(gè)Dask DataFrame中提供每月數(shù)據(jù)(可以來自diff文件)

與Dask Arrays相似,Dask DataFrames通過將文件劃分為塊并將這些塊的計(jì)算函數(shù)并行化,從而對(duì)不適合內(nèi)存非常大的數(shù)據(jù)文件進(jìn)行并行計(jì)算。

import dask.dataframe as dddf = dd.read_csv("BigFile(s).csv", blocksize=50e6)

現(xiàn)在,你可以應(yīng)用/使用pandas庫(kù)中可用的大多數(shù)功能,并在此處應(yīng)用。

agg = df.groupby(["column"]).aggregate(["sum", "mean", "max", "min"])agg.columns = new_column_names #請(qǐng)查看notebookdf_new = df.merge(agg.reset_index, on="column", how="left")df_new.compute.head

c)Bag:

Dask Bag包并行處理包含多個(gè)數(shù)據(jù)類型元素Python的list相似對(duì)象。當(dāng)你嘗試處理一些半結(jié)構(gòu)化數(shù)據(jù)(例如JSON Blob或日志文件)時(shí),此功能很有用。

import dask.bag as dbb = db.from_txt("BigSemiStructuredData.txt")b.take(1)

Daskbag逐行讀取,.take方法輸出指定行數(shù)的元組。

Dask Bag在這樣的Python對(duì)象集合上實(shí)現(xiàn)例如map,filter,fold,和groupby等操作。它使用Python迭代器并行地完成這個(gè)任務(wù),占用的內(nèi)存很小。它類似于PyToolz的并行版本或PySpark RDD的Python版本。

filtered = b.filter(lambda x: x["Name"]=="James")\ .map(lambda x: x["Address"] = "New_Address")filtered.compute

3.延時(shí)

如果你的任務(wù)有點(diǎn)簡(jiǎn)單,并且你不能或不想使用這些高級(jí)集合來執(zhí)行操作,則可以使用低級(jí)調(diào)度程序,該程序可幫助你使用dask.delayed接口并行化代碼/算法。dask.delayed也可以進(jìn)行延遲計(jì)算。

import dask.delayed as delay@delaydef sq(x): return x**2@delay def add(x, y): return x+y@delay def sum(arr): sum=0 for i in range(len(arr)): sum+=arr[i] return sum

你可以根據(jù)需要在這些函數(shù)之間添加復(fù)雜的交互,使用上一個(gè)任務(wù)的結(jié)果作為下一個(gè)任務(wù)的參數(shù)。Dask不會(huì)立即計(jì)算這些函數(shù),而是會(huì)為你的任務(wù)繪制圖形,有效地合并你使用的函數(shù)之間的交互。

inputs = list(np.arange(1, 11))#將外接程序 dask.delayed 加入到列表temp = for i in range(len(inputs)): temp.append(sq(inputs[i])) # 計(jì)算輸入的sq并保存 # 延遲計(jì)算在列表inputs=temp; temp = for i in range(0, len(inputs)-1, 2): temp.append(add(inputs[i]+inputs[i+1])) # 添加兩個(gè)連續(xù) # 結(jié)果從prev步驟inputs = tempresult = sum(inputs) # 將所有prev步驟的結(jié)果相加results.compute

你可以將延遲添加到具有許多可能的小塊的任何可并行化代碼中,從而獲得加速的效果。它可以是你想計(jì)算的許多函數(shù),例如上面的示例,或者可以使用并行讀取多個(gè)文件pandas.read_csv。

4.分布式

首先,到目前為止,我們一直使用Dask的默認(rèn)調(diào)度器來計(jì)算任務(wù)的結(jié)果。但是你可以根據(jù)需要從Dask提供的選項(xiàng)中更改它們。

Dask 帶有四個(gè)可用的調(diào)度程序:

· threaded:由線程池支持的調(diào)度程序

· processes:由進(jìn)程池支持的調(diào)度程序

· single-threaded(又名" sync"):同步調(diào)度程序,用于調(diào)試

· distributed:用于在多臺(tái)計(jì)算機(jī)上執(zhí)行圖形的分布式調(diào)度程序

result.compute(scheduler="single-threaded") #用于調(diào)試# 或者dask.config.set(scheduler="single-threaded")result.compute#注:(從官方網(wǎng)頁(yè))#當(dāng)被稱為GIL的函數(shù)釋放時(shí),線程任務(wù)將工作得很好,而多處理總是啟動(dòng)時(shí)間較慢,并且在任務(wù)之間需要大量的通信。# 你可以通過其中一個(gè)得到調(diào)度程序 commands:dask.threaded.get, dask.multiprocessing.get, dask.local.get_sync#單線程的最后一個(gè)

但是,Dask還有一個(gè)調(diào)度器,dask.distributed由于以下原因它可能是首選使用的:

1. 它提供了異步API的訪問,尤其是Future,

1. 它提供了一個(gè)診斷儀表板,可以提供有關(guān)性能和進(jìn)度的寶貴見解

1. 它可以更復(fù)雜地處理數(shù)據(jù)位置,因此在需要多個(gè)流程的工作負(fù)載上,它比多處理調(diào)度程序更有效。

你可以創(chuàng)建一個(gè)Dask的dask.distributed調(diào)度程序,通過導(dǎo)入和創(chuàng)建客戶端實(shí)現(xiàn)分布式調(diào)度器

from dask.distributed import Clientclient = Client # Set up a local cluster# 你可以導(dǎo)航到http://localhost:8787/status 查看# 診斷儀表板,如果你有Bokeh安裝的話

現(xiàn)在,你可以使用client.submit方法,將函數(shù)和參數(shù)作為其參數(shù),從而將任務(wù)提交到此集群。然后我們可以使用client.gather或.result方法來收集結(jié)果。

sent = client.submit(sq, 4) # sq: square 函數(shù)result = client.gather(sent) # 或者 sent.result

你也可以僅使用dask.distributed.progress來查看當(dāng)前單元格中任務(wù)的進(jìn)度。你還可以明確選擇使用dask.distributed.wait來等待任務(wù)完成。

Note: (Local Cluster)有時(shí)您會(huì)注意到Dask正在超出內(nèi)存使用,即使它正在劃分任務(wù)。它可能發(fā)生在您身上,因?yàn)槟噲D在數(shù)據(jù)集上使用的函數(shù)需要您的大部分?jǐn)?shù)據(jù)進(jìn)行處理,而多重處理可能使情況變得更糟,因?yàn)樗泄ぷ魅藛T都可能試圖將數(shù)據(jù)集復(fù)制到內(nèi)存中。這可能發(fā)生在聚合的情況下?;蛘吣赡芟胂拗艱ask只使用特定數(shù)量的內(nèi)存。

在這些情況下,您可以使用Dask.distributed。LocalCluster參數(shù),并將它們傳遞給Client,從而使用本地機(jī)器的核心構(gòu)建LocalCluster。

from dask.distributed import Client, LocalClusterclient = Client(n_workers=1, threads_per_worker=1, processes=False, memory_limit='25GB', scheduler_port=0, silence_logs=True, diagnostics_port=0)client

'scheduler_port=0'和' stics_port=0'將為這個(gè)特定的客戶端選擇隨機(jī)端口號(hào)。在'process =False'的情況下,dask的客戶端不會(huì)復(fù)制數(shù)據(jù)集,這可能發(fā)生在您所創(chuàng)建的每個(gè)進(jìn)程中。您可以根據(jù)自己的需要或限制對(duì)客戶機(jī)進(jìn)行調(diào)優(yōu),要了解更多信息,可以查看LocalCluster的參數(shù)。您還可以在同一臺(tái)機(jī)器的不同端口上使用多個(gè)客戶機(jī)。

5.機(jī)器學(xué)習(xí)

Dask也有一個(gè)庫(kù),可以幫助并允許大多數(shù)流行的機(jī)器學(xué)習(xí)庫(kù),例如sklearn,tensorflow和xgboost。

機(jī)器學(xué)習(xí)中,你可能會(huì)遇到幾個(gè)不同的問題。而具體的策略取決于你面臨的問題:

1. 大型模型:數(shù)據(jù)適合放入RAM,但是訓(xùn)練時(shí)間太長(zhǎng)。許多超參數(shù)組合,許多模型的大型集合等。

1. 大型數(shù)據(jù)集:數(shù)據(jù)大于RAM,并且不能選擇抽樣。

因此,你應(yīng)該:

· 對(duì)于內(nèi)存中適合的問題,只需使用scikit-learn(或你最喜歡的ML庫(kù))即可;

· 對(duì)于大型模型,請(qǐng)使用dask_ml.joblib和你最喜歡的scikit-learn估算器

· 對(duì)于大型數(shù)據(jù)集,請(qǐng)使用dask_ml估算器。

a)預(yù)處理:

dask_ml.preprocessing包含一些sklearn的一些功能,如RobustScalar(穩(wěn)健標(biāo)量),StandardScalar(標(biāo)準(zhǔn)標(biāo)量),LabelEncoder(標(biāo)簽編碼器),OneHotEncoder(獨(dú)熱編碼),PolynomialFeatures(多項(xiàng)式特性)等等,以及它的一些自己的如Categorizer(分類器),DummyEncoder(虛擬編碼),OrdinalEncoder(序數(shù)編碼器)等。

你可以像使用PandasDataFrame一樣使用它們。

from dask_ml.preprocessing import RobustScalardf = da.read_csv("BigFile.csv", chunks=50000)rsc = RobustScalardf["column"] = rsc.fit_transform(df["column"])

你可以使用Dask的DataFrame上的預(yù)處理方法,從Sklearn的Make_pipeline方法生成一個(gè)管道。

b)超參數(shù)搜索:

Dask具有sklearn用于進(jìn)行超參數(shù)搜索的方法,例如GridSearchCV,RandomizedSearchCV等等。

from dask_ml.datasets import make_regressionfrom dask_ml.model_selection import train_test_split, GridSearchCVX, y = make_regression(chunks=50000)xtr, ytr, xval, yval = test_train_split(X, y)gsearch = GridSearchCV(estimator, param_grid, cv=10)gsearch.fit(xtr, ytr)

而且,如果要partial_fit與估算器一起使用,則可以使用dask-ml的IncrementalSearchCV。

注意:(來自Dask)如果要使用后擬合任務(wù)(如評(píng)分和預(yù)測(cè)),則使用基礎(chǔ)估計(jì)量評(píng)分方法。如果你的估算器(可能來自sklearn )無法處理大型數(shù)據(jù)集,則將估算器包裝在" dask_ml.wrappers.ParallelPostFit" 周圍。它可以并行化" predict"," predict_proba"," transform"等方法。

c)模型/估計(jì)器:

Dask具有一些線性模型(的LinearRegression,LogisticRegression等),一些聚類模型(Kmeans和SpectralClustering),一種使用Tensorflow 的方法,使用Dask訓(xùn)練XGBoost模型的方法。

如果訓(xùn)練數(shù)據(jù)較小,則可以將sklearn的模型與結(jié)合使用Dask,如果ParallelPostFit包裝數(shù)據(jù)較大,則可以與包裝器一起使用(如果測(cè)試數(shù)據(jù)較大)。

from sklearn.linear_model import ElasticNetfrom dask_ml.wrappers import ParallelPostFitel = ParallelPostFit(estimator=ElasticNet)el.fit(Xtrain, ytrain)preds = el.predict(Xtest)

如果數(shù)據(jù)集不大但模型很大,則可以使用joblib。sklearns編寫了許多用于并行執(zhí)行的算法(你可能使用過n_jobs=-1參數(shù)),joblib該算法利用線程和進(jìn)程來并行化工作負(fù)載。要用于Dask并行化,你可以創(chuàng)建一個(gè)Client(客戶端)(必須),然后使用with joblib.parallel_backend('dask'):包裝代碼。

import dask_ml.joblibfrom sklearn.externals import joblibclient = Clientwith joblib.parallel_backend('dask'): # 你的 scikit-learn 代碼

注意:DASK JOBLIB后端對(duì)于擴(kuò)展CPU綁定的工作負(fù)載非常有用; 在RAM中包含數(shù)據(jù)集的工作負(fù)載,但具有許多可以并行完成的單獨(dú)操作。要擴(kuò)展到受RAM約束的工作負(fù)載(大于內(nèi)存的數(shù)據(jù)集),你應(yīng)該使用Dask的內(nèi)置模型和方法。

而且,如果你訓(xùn)練的數(shù)據(jù)太大而無法容納到內(nèi)存中,那么你應(yīng)該使用Dask的內(nèi)置估算器來加快速度。你也可以使用Dask的wrapper.Incremental它使用基礎(chǔ)估算器的partial_fit方法對(duì)整個(gè)數(shù)據(jù)集進(jìn)行訓(xùn)練,但實(shí)際上是連續(xù)的。

Dask的內(nèi)置估計(jì)器很好地?cái)U(kuò)展用于大型數(shù)據(jù)集與多種優(yōu)化算法,如admm,lbfgs,gradient_descent等,并且正則化器如 L1,L2,ElasticNet等。

from dask_ml.linear_model import LogisticRegressionlr = LogisticRegressionlr.fit(X, y, solver="lbfgs")

經(jīng)過4期的內(nèi)容講解,你學(xué)會(huì)加快Python算法的四種方法了么?

數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼

若不方便掃碼,搜微信號(hào):CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機(jī)數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進(jìn)行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個(gè)參數(shù)驗(yàn)證碼對(duì)象,之后可以使用它調(diào)用相應(yīng)的接口 initGeetest({ // 以下 4 個(gè)配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺(tái)檢測(cè)極驗(yàn)服務(wù)器是否宕機(jī) new_captcha: data.new_captcha, // 用于宕機(jī)時(shí)表示是新驗(yàn)證碼的宕機(jī) product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說明請(qǐng)參見:http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計(jì)時(shí)完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請(qǐng)輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請(qǐng)輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }