99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁精彩閱讀R語言并行化基礎(chǔ)與提高
R語言并行化基礎(chǔ)與提高
2018-03-16
收藏

R語言并行化基礎(chǔ)與提高

本文將介紹R中的并行計(jì)算,并給出了一些常見的陷進(jìn)以及避免它們的小技巧。

使用并行計(jì)算的原因就是因?yàn)槌绦蜻\(yùn)行時(shí)間太長。大部分程序都是可以并行化的,它們大部分都是Embarrassingly parallel。這里介紹幾種可以并行化的方法:
   Bootstrapping
    交叉驗(yàn)證(Cross-validation)
    (Multivariate Imputation by Chained Equations ,MICE)相關(guān)介紹:R語言中的缺失值處理
    擬合多元回歸方程
學(xué)習(xí)lapply是關(guān)鍵
沒有早點(diǎn)學(xué)習(xí)lapply是我的遺憾之一。這函數(shù)即優(yōu)美又簡單:它只需要一個(gè)參數(shù)(一個(gè)vector或list),和一個(gè)以該參數(shù)為輸入的函數(shù),最后返回一個(gè)列表。
> lapply(1:3, function(x) c(x, x^2, x^3))
[[1]]
 [1] 1 1 1

[[2]]
 [1] 2 4 8

[[3]]
 [1] 3 9 27

你還可以添加額外的參數(shù):

> lapply(1:3/3, round, digits=3)
[[1]]
[1] 0.333

[[2]]
[1] 0.667

[[3]]
[1] 1

當(dāng)每個(gè)元素都是獨(dú)立地計(jì)算時(shí),這個(gè)任務(wù)就是 Embarrassingly parallel的。當(dāng)你學(xué)習(xí)完使用lapply之后,你會(huì)發(fā)現(xiàn)并行化你的代碼就像喝水一樣簡單。
parallel包

使用 parallel包,首先要初始化一個(gè)集群,這個(gè)集群的數(shù)量最好是你CPU核數(shù)-1。如果一臺(tái)8核的電腦建立了數(shù)量為8的集群,那你的CPU就干不了其他事情了。所以可以這樣啟動(dòng)一個(gè)集群:

library(parallel)

# Calculate the number of cores
no_cores <- detectCores() - 1

# Initiate cluster
cl <- makeCluster(no_cores)
現(xiàn)在只需要使用并行化版本的lapply,parLapply就可以了

parLapply(cl, 2:4,
          function(exponent)
            2^exponent)
[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16
當(dāng)我們結(jié)束后,要記得關(guān)閉集群,否則你電腦的內(nèi)存會(huì)始終被R占用

stopCluster(cl)

變量作用域

在Mac/Linux中你可以使用 makeCluster(no_core, type="FORK")這一選項(xiàng)從而當(dāng)你并行運(yùn)行的時(shí)候可以包含所有環(huán)境變量。
在Windows中由于使用的是Parallel Socket Cluster (PSOCK),所以每個(gè)集群只會(huì)加載base包,所以你運(yùn)行時(shí)要指定加載特定的包或變量:

cl<-makeCluster(no_cores)

base <- 2
clusterExport(cl, "base")
parLapply(cl,
          2:4,
          function(exponent)
            base^exponent)

stopCluster(cl)

[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16

注意到你需要用clusterExport(cl, "base")把base這一個(gè)變量加載到集群當(dāng)中。如果你在函數(shù)中使用了一些其他的包就要使用clusterEvalQ加載進(jìn)去,比如說,使用rms包,那么就用clusterEvalQ(cl, library(rms))。要注意的是,在clusterExport 加載某些變量后,這些變量的任何變化都會(huì)被忽略:

cl<-makeCluster(no_cores)
clusterExport(cl, "base")
base <- 4
# Run
parLapply(cl,
          2:4,
          function(exponent)
            base^exponent)

# Finish
stopCluster(cl)
[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16
使用parSapply

如果你想程序返回一個(gè)向量或者矩陣。而不是一個(gè)列表,那么就應(yīng)該使用sapply,他同樣也有并行版本parSapply:

> parSapply(cl, 2:4,
          function(exponent)
            base^exponent)
[1]  4  8 16

輸出矩陣并顯示行名和列名(因此才需要使用as.character)

> parSapply(cl, as.character(2:4),
          function(exponent){
            x <- as.numeric(exponent)
            c(base = base^x, self = x^x)
          })
2  3   4
base 4  8  16
self 4 27 256

foreach包

設(shè)計(jì)foreach包的思想可能想要?jiǎng)?chuàng)建一個(gè)lapply和for循環(huán)的標(biāo)準(zhǔn),初始化的過程有些不同,你需要register注冊集群:

library(foreach)
library(doParallel)

cl<-makeCluster(no_cores)
registerDoParallel(cl)

要記得最后要結(jié)束集群(不是用stopCluster()):

stopImplicitCluster()

foreach函數(shù)可以使用參數(shù).combine控制你匯總結(jié)果的方法:

> foreach(exponent = 2:4,
        .combine = c)  %dopar%  
  base^exponent
  [1]  4  8 16

> foreach(exponent = 2:4,
        .combine = rbind)  %dopar%  
  base^exponent
    [,1]
result.1    4
result.2    8
result.3   16

foreach(exponent = 2:4,
        .combine = list,
        .multicombine = TRUE)  %dopar%  
  base^exponent
[[1]]
[1] 4

[[2]]
[1] 8

[[3]]
[1] 16
注意到最后list的combine方法是默認(rèn)的。在這個(gè)例子中用到一個(gè).multicombine參數(shù),他可以幫助你避免嵌套列表。比如說list(list(result.1, result.2), result.3) :

> foreach(exponent = 2:4,
        .combine = list)  %dopar%  
  base^exponent
[[1]]
[[1]][[1]]
[1] 4

[[1]][[2]]
[1] 8


[[2]]
[1] 16
變量作用域

在foreach中,變量作用域有些不同,它會(huì)自動(dòng)加載本地的環(huán)境到函數(shù)中:

> base <- 2
> cl<-makeCluster(2)
> registerDoParallel(cl)
> foreach(exponent = 2:4,
        .combine = c)  %dopar%  
  base^exponent
stopCluster(cl)
 [1]  4  8 16

但是,對于父環(huán)境的變量則不會(huì)加載,以下這個(gè)例子就會(huì)拋出錯(cuò)誤:

test <- function (exponent) {
  foreach(exponent = 2:4,
          .combine = c)  %dopar%  
    base^exponent
}
test()

 Error in base^exponent : task 1 failed - "object 'base' not found"

為解決這個(gè)問題你可以使用.export這個(gè)參數(shù)而不需要使用clusterExport。注意的是,他可以加載最終版本的變量,在函數(shù)運(yùn)行前,變量都是可以改變的:

base <- 2
cl<-makeCluster(2)
registerDoParallel(cl)

base <- 4
test <- function (exponent) {
  foreach(exponent = 2:4,
          .combine = c,
          .export = "base")  %dopar%  
    base^exponent
}
test()

stopCluster(cl)

 [1]  4  8 16
類似的你可以使用.packages參數(shù)來加載包,比如說:.packages = c("rms", "mice")
使用Fork還是sock?

我在windows上做了很多分析,也習(xí)慣了使用PSOCK系統(tǒng)。對于使用其他系統(tǒng)的人要意識到這兩個(gè)的區(qū)別:

    FORK:”to divide in branches and go separate ways”
    系統(tǒng):Unix/Mac (not Windows)
    環(huán)境: 所有
    PSOCK:并行socket集群
    系統(tǒng): All (including Windows)
    環(huán)境: 空

內(nèi)存控制

如果你不打算使用windows的話,建議你嘗試FORK模式,它可以實(shí)現(xiàn)內(nèi)存共享,節(jié)省你的內(nèi)存。
PSOCK:

library(pryr) # Used for memory analyses
cl<-makeCluster(no_cores)
clusterExport(cl, "a")
clusterEvalQ(cl, library(pryr))
parSapply(cl, X = 1:10, function(x) {address(a)}) == address(a)
[1] FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE FALSE
FORK :

cl<-makeCluster(no_cores, type="FORK")
parSapply(cl, X = 1:10, function(x) address(a)) == address(a)
 [1] TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE TRUE

你不需要花費(fèi)太多時(shí)間去配置你的環(huán)境,有趣的是,你不需要擔(dān)心變量沖突:

b <- 0
parSapply(cl, X = 1:10, function(x) {b <- b + 1; b})
# [1] 1 1 1 1 1 1 1 1 1 1
parSapply(cl, X = 1:10, function(x) {b <<- b + 1; b})
# [1] 1 2 3 4 5 1 2 3 4 5
b
# [1] 0

調(diào)試

當(dāng)你在并行環(huán)境中工作是,debug是很困難的,你不能使用browser/cat/print等函數(shù)來發(fā)現(xiàn)你的問題。
tryCatch-list方法

使用stop()函數(shù)這不是一個(gè)好方法,因?yàn)楫?dāng)你收到一個(gè)錯(cuò)誤信息時(shí),很可能這個(gè)錯(cuò)誤信息你在很久之前寫的,都快忘掉了,但是當(dāng)你的程序跑了1,2天后,突然彈出這個(gè)錯(cuò)誤,就只因?yàn)檫@一個(gè)錯(cuò)誤,你的程序終止了,并把你之前的做的計(jì)算全部扔掉了,這是很討厭的。為此,你可以嘗試使用tryCatch去捕捉那些錯(cuò)誤,從而使得出現(xiàn)錯(cuò)誤后程序還能繼續(xù)執(zhí)行:

foreach(x=list(1, 2, "a"))  %dopar%  
{
  tryCatch({
    c(1/x, x, 2^x)
  }, error = function(e) return(paste0("The variable '", x, "'",
                                      " caused the error: '", e, "'")))
}
[[1]]
[1] 1 1 2

[[2]]
[1] 0.5 2.0 4.0

[[3]]
[1] "The variable 'a' caused the error: 'Error in 1/x: non-numeric argument to binary operator\n'"

這也正是我喜歡list的原因,它可以方便的將所有相關(guān)的數(shù)據(jù)輸出,而不是只輸出一個(gè)錯(cuò)誤信息。這里有一個(gè)使用rbind在lapply進(jìn)行conbine的例子:

`out <- lapply(1:3, function(x) c(x, 2^x, x^x))
do.call(rbind, out)
 [,1] [,2] [,3]
[1,]    1    2    1
[2,]    2    4    4
[3,]    3    8   27
創(chuàng)建一個(gè)文件輸出

當(dāng)我們無法在控制臺(tái)觀測每個(gè)工作時(shí),我們可以設(shè)置一個(gè)共享文件,讓結(jié)果輸出到文件當(dāng)中,這是一個(gè)想當(dāng)舒服的解決方案:

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  print(x)
}
stopCluster(cl)
starting worker pid=7392 on localhost:11411 at 00:11:21.077
starting worker pid=7276 on localhost:11411 at 00:11:21.319
starting worker pid=7576 on localhost:11411 at 00:11:21.762
[1] 2]

[1] "a"

創(chuàng)建一個(gè)結(jié)點(diǎn)專用文件

一個(gè)或許更為有用的選擇是創(chuàng)建一個(gè)結(jié)點(diǎn)專用的文件,如果你的數(shù)據(jù)集存在一些問題的時(shí)候,可以方便觀測:

cl<-makeCluster(no_cores, outfile = "debug.txt")
registerDoParallel(cl)
foreach(x=list(1, 2, "a"))  %dopar%  
{
  cat(dput(x), file = paste0("debug_file_", x, ".txt"))
}
stopCluster(cl)
partools包

partools這個(gè)包有一個(gè)dbs()函數(shù)或許值得一看(使用非windows系統(tǒng)值得一看),他允許你聯(lián)合多個(gè)終端給每個(gè)進(jìn)程進(jìn)行debug。
Caching

當(dāng)做一個(gè)大型計(jì)算時(shí),我強(qiáng)烈推薦使用一些緩存。這或許有多個(gè)原因你想要結(jié)束計(jì)算,但是要遺憾地浪費(fèi)了計(jì)算的寶貴的時(shí)間。這里有一個(gè)包可以做緩存,R.cache,但是我發(fā)現(xiàn)自己寫個(gè)函數(shù)來實(shí)現(xiàn)更加簡單。你只需要嵌入digest包就可以。digest()函數(shù)是一個(gè)散列函數(shù),把一個(gè)R對象輸入進(jìn)去可以輸出一個(gè)md5值或sha1等從而得到一個(gè)唯一的key值,當(dāng)你key匹配到你保存的cache中的key時(shí),你就可以繼續(xù)你的計(jì)算了,而不需要將算法重新運(yùn)行,以下是一個(gè)使用例子:

cacheParallel <- function(){
  vars <- 1:2
  tmp <- clusterEvalQ(cl,
                      library(digest))

  parSapply(cl, vars, function(var){
    fn <- function(a) a^2
    dg <- digest(list(fn, var))
    cache_fn <-
      sprintf("Cache_%s.Rdata",
              dg)
    if (file.exists(cache_fn)){
      load(cache_fn)
    }else{
      var <- fn(var);
      Sys.sleep(5)
      save(var, file = cache_fn)
    }
    return(var)
  })
}
這個(gè)例子很顯然在第二次運(yùn)行的時(shí)候并沒有啟動(dòng)Sys.sleep,而是檢測到了你的cache文件,加載了上一次計(jì)算后的cache,你就不必再計(jì)算Sys.sleep了,因?yàn)樵谏弦淮我呀?jīng)計(jì)算過了。

system.time(out <- cacheParallel())
# user system elapsed
# 0.003 0.001 5.079
out
# [1] 1 4
system.time(out <- cacheParallel())
# user system elapsed
# 0.001 0.004 0.046
out
# [1] 1 4

# To clean up the files just do:
file.remove(list.files(pattern = "Cache.+\.Rdata"))

載入平衡
任務(wù)載入

需要注意的是,無論parLapply還是foreach都是一個(gè)包裝(wrapper)的函數(shù)。這意味著他們不是直接執(zhí)行并行計(jì)算的代碼,而是依賴于其他函數(shù)實(shí)現(xiàn)的。在parLapply中的定義如下:

parLapply <- function (cl = NULL, X, fun, ...)
{
    cl <- defaultCluster(cl)
    do.call(c, clusterApply(cl, x = splitList(X, length(cl)),
        fun = lapply, fun, ...), quote = TRUE)
}
注意到splitList(X, length(cl)) ,他會(huì)將任務(wù)分割成多個(gè)部分,然后將他們發(fā)送到不同的集群中。如果你有很多cache或者存在一個(gè)任務(wù)比其他worker中的任務(wù)都大,那么在這個(gè)任務(wù)結(jié)束之前,其他提前結(jié)束的worker都會(huì)處于空閑狀態(tài)。為了避免這一情況,你需要將你的任務(wù)盡量平均分配給每個(gè)worker。舉個(gè)例子,你要計(jì)算優(yōu)化神經(jīng)網(wǎng)絡(luò)的參數(shù),這一過程你可以并行地以不同參數(shù)來訓(xùn)練神經(jīng)網(wǎng)絡(luò),你應(yīng)該將如下代碼:

# From the nnet example
parLapply(cl, c(10, 20, 30, 40, 50), function(neurons)
  nnet(ir[samp,], targets[samp,],
       size = neurons))
改為:

# From the nnet example
parLapply(cl, c(10, 50, 30, 40, 20), function(neurons)
  nnet(ir[samp,], targets[samp,],
       size = neurons))
內(nèi)存載入

在大數(shù)據(jù)的情況下使用并行計(jì)算會(huì)很快的出現(xiàn)問題。因?yàn)槭褂貌⑿杏?jì)算會(huì)極大的消耗內(nèi)存,你必須要注意不要讓你的R運(yùn)行內(nèi)存到達(dá)內(nèi)存的上限,否則這將會(huì)導(dǎo)致崩潰或非常緩慢。使用Forks是一個(gè)控制內(nèi)存上限的一個(gè)重要方法。Fork是通過內(nèi)存共享來實(shí)現(xiàn),而不需要額外的內(nèi)存空間,這對性能的影響是很顯著的(我的系統(tǒng)時(shí)16G內(nèi)存,8核心):

> rm(list=ls())
> library(pryr)
> library(magrittr)
> a <- matrix(1, ncol=10^4*2, nrow=10^4)
> object_size(a)
1.6 GB
> system.time(mean(a))
   user  system elapsed
  0.338   0.000   0.337
> system.time(mean(a + 1))
   user  system elapsed
  0.490   0.084   0.574
> library(parallel)
> cl <- makeCluster(4, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
   user  system elapsed
  5.253   0.544   7.289
> system.time(parSapply(cl, 1:8,
                        function(x) mean(a + 1)))
   user  system elapsed
  0.008   0.008   3.365
> stopCluster(cl); gc();
> cl <- makeCluster(4, type = "FORK")
> system.time(parSapply(cl, 1:8,
                        function(x) mean(a + 1)))
   user  system elapsed
  0.009   0.008   3.123
> stopCluster(cl)

FORKs可以讓你并行化從而不用崩潰:

> cl <- makeCluster(8, type = "PSOCK")
> system.time(clusterExport(cl, "a"))
   user  system elapsed
 10.576   1.263  15.877
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
Error in checkForRemoteErrors(val) :
  8 nodes produced errors; first error: cannot allocate vector of size 1.5 Gb
Timing stopped at: 0.004 0 0.389
> stopCluster(cl)
> cl <- makeCluster(8, type = "FORK")
> system.time(parSapply(cl, 1:8, function(x) mean(a + 1)))
   user  system elapsed
  0.014   0.016   3.735
> stopCluster(cl)

當(dāng)然,他并不能讓你完全解放,如你所見,當(dāng)我們創(chuàng)建一個(gè)中間變量時(shí)也是需要消耗內(nèi)存的:

> a <- matrix(1, ncol=10^4*2.1, nrow=10^4)
> cl <- makeCluster(8, type = "FORK")
> parSapply(cl, 1:8, function(x) {
+   b <- a + 1
+   mean(b)
+   })
Error in unserialize(node$con) : error reading from connection

內(nèi)存建議

    盡量使用rm()避免無用的變量
    盡量使用gc()釋放內(nèi)存。即使這在R中是自動(dòng)執(zhí)行的,但是當(dāng)它沒有及時(shí)執(zhí)行,在一個(gè)并行計(jì)算的情況下,如果沒有及時(shí)釋放內(nèi)存,那么它將不會(huì)將內(nèi)存返回給操作系統(tǒng),從而影響了其他worker的執(zhí)行。
    通常并行化在大規(guī)模運(yùn)算下很有用,但是,考慮到R中的并行化存在內(nèi)存的初始化成本,所以考慮到內(nèi)存的情況下,顯然小規(guī)模的并行化可能會(huì)更有用。
    有時(shí)候在并行計(jì)算時(shí),不斷做緩存,當(dāng)達(dá)到上限時(shí),換回串行計(jì)算。
    你也可以手動(dòng)的控制每個(gè)核所使用的內(nèi)存數(shù)量,一個(gè)簡單的方法就是:memory.limit()/memory.size() = max cores

其他建議

    一個(gè)常用的CPU核數(shù)檢測函數(shù):

max(1, detectCores() - 1)

    1

    永遠(yuǎn)不要使用set.seed(),使用clusterSetRNGStream()來代替設(shè)置種子,如果你想重現(xiàn)結(jié)果。
    如果你有Nvidia 顯卡,你可以嘗試使用gputools 包進(jìn)行GPU加速(警告:安裝可能會(huì)很困難)
    當(dāng)使用mice并行化時(shí)記得使用ibind()來合并項(xiàng)。

數(shù)據(jù)分析咨詢請掃描二維碼

若不方便掃碼,搜微信號:CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機(jī)數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進(jìn)行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個(gè)參數(shù)驗(yàn)證碼對象,之后可以使用它調(diào)用相應(yīng)的接口 initGeetest({ // 以下 4 個(gè)配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺(tái)檢測極驗(yàn)服務(wù)器是否宕機(jī) new_captcha: data.new_captcha, // 用于宕機(jī)時(shí)表示是新驗(yàn)證碼的宕機(jī) product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說明請參見:http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計(jì)時(shí)完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }