99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁精彩閱讀Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程
Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程
2018-01-31
收藏

Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程

這是一篇有關(guān)如何將大量的數(shù)據(jù)放入有限的內(nèi)存中的簡略教程。

與客戶工作時(shí),有時(shí)會(huì)發(fā)現(xiàn)他們的數(shù)據(jù)庫實(shí)際上只是一個(gè)csv或Excel文件倉庫,你只能將就著用,經(jīng)常需要在不更新他們的數(shù)據(jù)倉庫的情況下完成工作。大部分情況下,如果將這些文件存儲(chǔ)在一個(gè)簡單的數(shù)據(jù)庫框架中或許更好,但時(shí)間可能不允許。這種方法對(duì)時(shí)間、機(jī)器硬件和所處環(huán)境都有要求。

下面介紹一個(gè)很好的例子:假設(shè)有一堆表格(沒有使用Neo4j、MongoDB或其他類型的數(shù)據(jù)庫,僅僅使用csvs、tsvs等格式存儲(chǔ)的表格),如果將所有表格組合在一起,得到的數(shù)據(jù)幀太大,無法放入內(nèi)存。所以第一個(gè)想法是:將其拆分成不同的部分,逐個(gè)存儲(chǔ)。這個(gè)方案看起來不錯(cuò),但處理起來很慢。除非我們使用多核處理器。
目標(biāo)

這里的目標(biāo)是從所有職位中(大約1萬個(gè)),找出相關(guān)的的職位。將這些職位與政府給的職位代碼組合起來。接著將組合的結(jié)果與對(duì)應(yīng)的州(行政單位)信息組合起來。然后用通過word2vec生成的屬性信息在我們的客戶的管道中增強(qiáng)已有的屬性。

這個(gè)任務(wù)要求在短時(shí)間內(nèi)完成,誰也不愿意等待。想象一下,這就像在不使用標(biāo)準(zhǔn)的關(guān)系型數(shù)據(jù)庫的情況下進(jìn)行多個(gè)表的連接。
數(shù)據(jù)

示例腳本

下面的是一個(gè)示例腳本,展示了如何使用multiprocessing來在有限的內(nèi)存空間中加速操作過程。腳本的第一部分是和特定任務(wù)相關(guān)的,可以自由跳過。請(qǐng)著重關(guān)注第二部分,這里側(cè)重的是multiprocessing引擎。


#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager
 
# the data in one particular column was number in the form that horrible excel version
# of a number where '12000' is '12,000' with that beautiful useless comma in there.
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
  return np.median([int(x.replace(',','')) for x in column])
 
# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score']
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)
 
def job_title_location_matcher(title,location):
  try:
    related_title_score_df = dictionary_of_dataframes[title]
    # we limit dataframe1 to only those related_titles that are above
    # a previously established threshold
    related_title_score_df = related_title_score_df[title_score_df['score']>80]
 
    #we merge the related titles with another table and its codes
    codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
    codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()
 
    # merge the two dataframes by the codes
    merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
    #limit the BLS data to the state we want
    all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]
 
    #calculate some summary statistics for the time we want
    group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
    row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
    #convert it all to strings so we can combine them all when writing to file
    row_string = [str(x) for x in row]
    return row_string
  except:
    # if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
    'do nothing'

這里發(fā)生了神奇的事情:    
#runs the function and puts the answers in the queue
def worker(row, q):
    ans = job_title_location_matcher(row[0],row[1])
    q.put(ans)
 
# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
  f = open(filename,'wb')
  while 1:
    m = q.get()
    if m =='kill':
        break
    f.write(','.join(m) + 'n')
    f.flush()
  f.close()
 
def main():
  #load all your data, then throw out all unnecessary tables/columns
  filename = 'skill_TEST_POOL.txt'
 
  #sets up the necessary multiprocessing tasks
  manager = Manager()
  q = manager.Queue()
  pool = Pool(cpu_count() + 2)
  watcher = pool.map_async(listener,(q,))
 
  jobs = []
  #titles_states is a dataframe of millions of job titles and states they were found in
  for i in titles_states.iloc:
    job = pool.map_async(worker, (i, q))
    jobs.append(job)
 
  for job in jobs:
    job.get()
  q.put('kill')
  pool.close()
  pool.join()
 
if __name__ == "__main__":
  main()


由于每個(gè)數(shù)據(jù)幀的大小都不同(總共約有100Gb),所以將所有數(shù)據(jù)都放入內(nèi)存是不可能的。通過將最終的數(shù)據(jù)幀逐行寫入內(nèi)存,但從來不在內(nèi)存中存儲(chǔ)完整的數(shù)據(jù)幀。我們可以完成所有的計(jì)算和組合任務(wù)。這里的“標(biāo)準(zhǔn)方法”是,我們可以僅僅在“job_title_location_matcher”的末尾編寫一個(gè)“write_line”方法,但這樣每次只會(huì)處理一個(gè)實(shí)例。根據(jù)我們需要處理的職位/州的數(shù)量,這大概需要2天的時(shí)間。而通過multiprocessing,只需2個(gè)小時(shí)。

雖然讀者可能接觸不到本教程處理的任務(wù)環(huán)境,但通過multiprocessing,可以突破許多計(jì)算機(jī)硬件的限制。本例的工作環(huán)境是c3.8xl ubuntu ec2,硬件為32核60Gb內(nèi)存(雖然這個(gè)內(nèi)存很大,但還是無法一次性放入所有數(shù)據(jù))。這里的關(guān)鍵之處是我們?cè)?0Gb的內(nèi)存的機(jī)器上有效的處理了約100Gb的數(shù)據(jù),同時(shí)速度提升了約25倍。通過multiprocessing在多核機(jī)器上自動(dòng)處理大規(guī)模的進(jìn)程,可以有效提高機(jī)器的利用率。也許有些讀者已經(jīng)知道了這個(gè)方法,但對(duì)于其他人,可以通過multiprocessing能帶來非常大的收益。



數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼

若不方便掃碼,搜微信號(hào):CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機(jī)數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進(jìn)行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個(gè)參數(shù)驗(yàn)證碼對(duì)象,之后可以使用它調(diào)用相應(yīng)的接口 initGeetest({ // 以下 4 個(gè)配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺(tái)檢測(cè)極驗(yàn)服務(wù)器是否宕機(jī) new_captcha: data.new_captcha, // 用于宕機(jī)時(shí)表示是新驗(yàn)證碼的宕機(jī) product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說明請(qǐng)參見:http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計(jì)時(shí)完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請(qǐng)輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請(qǐng)輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }