
Python利用多進(jìn)程將大量數(shù)據(jù)放入有限內(nèi)存的教程
這是一篇有關(guān)如何將大量的數(shù)據(jù)放入有限的內(nèi)存中的簡略教程。
與客戶工作時(shí),有時(shí)會(huì)發(fā)現(xiàn)他們的數(shù)據(jù)庫實(shí)際上只是一個(gè)csv或Excel文件倉庫,你只能將就著用,經(jīng)常需要在不更新他們的數(shù)據(jù)倉庫的情況下完成工作。大部分情況下,如果將這些文件存儲(chǔ)在一個(gè)簡單的數(shù)據(jù)庫框架中或許更好,但時(shí)間可能不允許。這種方法對(duì)時(shí)間、機(jī)器硬件和所處環(huán)境都有要求。
下面介紹一個(gè)很好的例子:假設(shè)有一堆表格(沒有使用Neo4j、MongoDB或其他類型的數(shù)據(jù)庫,僅僅使用csvs、tsvs等格式存儲(chǔ)的表格),如果將所有表格組合在一起,得到的數(shù)據(jù)幀太大,無法放入內(nèi)存。所以第一個(gè)想法是:將其拆分成不同的部分,逐個(gè)存儲(chǔ)。這個(gè)方案看起來不錯(cuò),但處理起來很慢。除非我們使用多核處理器。
目標(biāo)
這里的目標(biāo)是從所有職位中(大約1萬個(gè)),找出相關(guān)的的職位。將這些職位與政府給的職位代碼組合起來。接著將組合的結(jié)果與對(duì)應(yīng)的州(行政單位)信息組合起來。然后用通過word2vec生成的屬性信息在我們的客戶的管道中增強(qiáng)已有的屬性。
這個(gè)任務(wù)要求在短時(shí)間內(nèi)完成,誰也不愿意等待。想象一下,這就像在不使用標(biāo)準(zhǔn)的關(guān)系型數(shù)據(jù)庫的情況下進(jìn)行多個(gè)表的連接。
數(shù)據(jù)
示例腳本
下面的是一個(gè)示例腳本,展示了如何使用multiprocessing來在有限的內(nèi)存空間中加速操作過程。腳本的第一部分是和特定任務(wù)相關(guān)的,可以自由跳過。請(qǐng)著重關(guān)注第二部分,這里側(cè)重的是multiprocessing引擎。
#import the necessary packages
import pandas as pd
import us
import numpy as np
from multiprocessing import Pool,cpu_count,Queue,Manager
# the data in one particular column was number in the form that horrible excel version
# of a number where '12000' is '12,000' with that beautiful useless comma in there.
# did I mention I excel bothers me?
# instead of converting the number right away, we only convert them when we need to
def median_maker(column):
return np.median([int(x.replace(',','')) for x in column])
# dictionary_of_dataframes contains a dataframe with information for each title; e.g title is 'Data Scientist'
# related_title_score_df is the dataframe of information for the title; columns = ['title','score']
### where title is a similar_title and score is how closely the two are related, e.g. 'Data Analyst', 0.871
# code_title_df contains columns ['code','title']
# oes_data_df is a HUGE dataframe with all of the Bureau of Labor Statistics(BLS) data for a given time period (YAY FREE DATA, BOO BAD CENSUS DATA!)
def job_title_location_matcher(title,location):
try:
related_title_score_df = dictionary_of_dataframes[title]
# we limit dataframe1 to only those related_titles that are above
# a previously established threshold
related_title_score_df = related_title_score_df[title_score_df['score']>80]
#we merge the related titles with another table and its codes
codes_relTitles_scores = pd.merge(code_title_df,related_title_score_df)
codes_relTitles_scores = codes_relTitles_scores.drop_duplicates()
# merge the two dataframes by the codes
merged_df = pd.merge(codes_relTitles_scores, oes_data_df)
#limit the BLS data to the state we want
all_merged = merged_df[merged_df['area_title']==str(us.states.lookup(location).name)]
#calculate some summary statistics for the time we want
group_med_emp,group_mean,group_pct10,group_pct25,group_median,group_pct75,group_pct90 = all_merged[['tot_emp','a_mean','a_pct10','a_pct25','a_median','a_pct75','a_pct90']].apply(median_maker)
row = [title,location,group_med_emp,group_mean,group_pct10,group_pct25, group_median, group_pct75, group_pct90]
#convert it all to strings so we can combine them all when writing to file
row_string = [str(x) for x in row]
return row_string
except:
# if it doesnt work for a particular title/state just throw it out, there are enough to make this insignificant
'do nothing'
這里發(fā)生了神奇的事情:
#runs the function and puts the answers in the queue
def worker(row, q):
ans = job_title_location_matcher(row[0],row[1])
q.put(ans)
# this writes to the file while there are still things that could be in the queue
# this allows for multiple processes to write to the same file without blocking eachother
def listener(q):
f = open(filename,'wb')
while 1:
m = q.get()
if m =='kill':
break
f.write(','.join(m) + 'n')
f.flush()
f.close()
def main():
#load all your data, then throw out all unnecessary tables/columns
filename = 'skill_TEST_POOL.txt'
#sets up the necessary multiprocessing tasks
manager = Manager()
q = manager.Queue()
pool = Pool(cpu_count() + 2)
watcher = pool.map_async(listener,(q,))
jobs = []
#titles_states is a dataframe of millions of job titles and states they were found in
for i in titles_states.iloc:
job = pool.map_async(worker, (i, q))
jobs.append(job)
for job in jobs:
job.get()
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()
由于每個(gè)數(shù)據(jù)幀的大小都不同(總共約有100Gb),所以將所有數(shù)據(jù)都放入內(nèi)存是不可能的。通過將最終的數(shù)據(jù)幀逐行寫入內(nèi)存,但從來不在內(nèi)存中存儲(chǔ)完整的數(shù)據(jù)幀。我們可以完成所有的計(jì)算和組合任務(wù)。這里的“標(biāo)準(zhǔn)方法”是,我們可以僅僅在“job_title_location_matcher”的末尾編寫一個(gè)“write_line”方法,但這樣每次只會(huì)處理一個(gè)實(shí)例。根據(jù)我們需要處理的職位/州的數(shù)量,這大概需要2天的時(shí)間。而通過multiprocessing,只需2個(gè)小時(shí)。
雖然讀者可能接觸不到本教程處理的任務(wù)環(huán)境,但通過multiprocessing,可以突破許多計(jì)算機(jī)硬件的限制。本例的工作環(huán)境是c3.8xl ubuntu ec2,硬件為32核60Gb內(nèi)存(雖然這個(gè)內(nèi)存很大,但還是無法一次性放入所有數(shù)據(jù))。這里的關(guān)鍵之處是我們?cè)?0Gb的內(nèi)存的機(jī)器上有效的處理了約100Gb的數(shù)據(jù),同時(shí)速度提升了約25倍。通過multiprocessing在多核機(jī)器上自動(dòng)處理大規(guī)模的進(jìn)程,可以有效提高機(jī)器的利用率。也許有些讀者已經(jīng)知道了這個(gè)方法,但對(duì)于其他人,可以通過multiprocessing能帶來非常大的收益。
數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼
若不方便掃碼,搜微信號(hào):CDAshujufenxi
LSTM 模型輸入長度選擇技巧:提升序列建模效能的關(guān)鍵? 在循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)家族中,長短期記憶網(wǎng)絡(luò)(LSTM)憑借其解決長序列 ...
2025-07-11CDA 數(shù)據(jù)分析師報(bào)考條件詳解與準(zhǔn)備指南? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代浪潮下,CDA 數(shù)據(jù)分析師認(rèn)證愈發(fā)受到矚目,成為眾多有志投身數(shù) ...
2025-07-11數(shù)據(jù)透視表中兩列相乘合計(jì)的實(shí)用指南? 在數(shù)據(jù)分析的日常工作中,數(shù)據(jù)透視表憑借其強(qiáng)大的數(shù)據(jù)匯總和分析功能,成為了 Excel 用戶 ...
2025-07-11尊敬的考生: 您好! 我們誠摯通知您,CDA Level I和 Level II考試大綱將于 2025年7月25日 實(shí)施重大更新。 此次更新旨在確保認(rèn) ...
2025-07-10BI 大數(shù)據(jù)分析師:連接數(shù)據(jù)與業(yè)務(wù)的價(jià)值轉(zhuǎn)化者? ? 在大數(shù)據(jù)與商業(yè)智能(Business Intelligence,簡稱 BI)深度融合的時(shí)代,BI ...
2025-07-10SQL 在預(yù)測(cè)分析中的應(yīng)用:從數(shù)據(jù)查詢到趨勢(shì)預(yù)判? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代,預(yù)測(cè)分析作為挖掘數(shù)據(jù)潛在價(jià)值的核心手段,正被廣泛 ...
2025-07-10數(shù)據(jù)查詢結(jié)束后:分析師的收尾工作與價(jià)值深化? ? 在數(shù)據(jù)分析的全流程中,“query end”(查詢結(jié)束)并非工作的終點(diǎn),而是將數(shù) ...
2025-07-10CDA 數(shù)據(jù)分析師考試:從報(bào)考到取證的全攻略? 在數(shù)字經(jīng)濟(jì)蓬勃發(fā)展的今天,數(shù)據(jù)分析師已成為各行業(yè)爭搶的核心人才,而 CDA(Certi ...
2025-07-09【CDA干貨】單樣本趨勢(shì)性檢驗(yàn):捕捉數(shù)據(jù)背后的時(shí)間軌跡? 在數(shù)據(jù)分析的版圖中,單樣本趨勢(shì)性檢驗(yàn)如同一位耐心的偵探,專注于從單 ...
2025-07-09year_month數(shù)據(jù)類型:時(shí)間維度的精準(zhǔn)切片? ? 在數(shù)據(jù)的世界里,時(shí)間是最不可或缺的維度之一,而year_month數(shù)據(jù)類型就像一把精準(zhǔn) ...
2025-07-09CDA 備考干貨:Python 在數(shù)據(jù)分析中的核心應(yīng)用與實(shí)戰(zhàn)技巧? ? 在 CDA 數(shù)據(jù)分析師認(rèn)證考試中,Python 作為數(shù)據(jù)處理與分析的核心 ...
2025-07-08SPSS 中的 Mann-Kendall 檢驗(yàn):數(shù)據(jù)趨勢(shì)與突變分析的有力工具? ? ? 在數(shù)據(jù)分析的廣袤領(lǐng)域中,準(zhǔn)確捕捉數(shù)據(jù)的趨勢(shì)變化以及識(shí)別 ...
2025-07-08備戰(zhàn) CDA 數(shù)據(jù)分析師考試:需要多久?如何規(guī)劃? CDA(Certified Data Analyst)數(shù)據(jù)分析師認(rèn)證作為國內(nèi)權(quán)威的數(shù)據(jù)分析能力認(rèn)證 ...
2025-07-08LSTM 輸出不確定的成因、影響與應(yīng)對(duì)策略? 長短期記憶網(wǎng)絡(luò)(LSTM)作為循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)的一種變體,憑借獨(dú)特的門控機(jī)制,在 ...
2025-07-07統(tǒng)計(jì)學(xué)方法在市場調(diào)研數(shù)據(jù)中的深度應(yīng)用? 市場調(diào)研是企業(yè)洞察市場動(dòng)態(tài)、了解消費(fèi)者需求的重要途徑,而統(tǒng)計(jì)學(xué)方法則是市場調(diào)研數(shù) ...
2025-07-07CDA數(shù)據(jù)分析師證書考試全攻略? 在數(shù)字化浪潮席卷全球的當(dāng)下,數(shù)據(jù)已成為企業(yè)決策、行業(yè)發(fā)展的核心驅(qū)動(dòng)力,數(shù)據(jù)分析師也因此成為 ...
2025-07-07剖析 CDA 數(shù)據(jù)分析師考試題型:解鎖高效備考與答題策略? CDA(Certified Data Analyst)數(shù)據(jù)分析師考試作為衡量數(shù)據(jù)專業(yè)能力的 ...
2025-07-04SQL Server 字符串截取轉(zhuǎn)日期:解鎖數(shù)據(jù)處理的關(guān)鍵技能? 在數(shù)據(jù)處理與分析工作中,數(shù)據(jù)格式的規(guī)范性是保證后續(xù)分析準(zhǔn)確性的基礎(chǔ) ...
2025-07-04CDA 數(shù)據(jù)分析師視角:從數(shù)據(jù)迷霧中探尋商業(yè)真相? 在數(shù)字化浪潮席卷全球的今天,數(shù)據(jù)已成為企業(yè)決策的核心驅(qū)動(dòng)力,CDA(Certifie ...
2025-07-04CDA 數(shù)據(jù)分析師:開啟數(shù)據(jù)職業(yè)發(fā)展新征程? ? 在數(shù)據(jù)成為核心生產(chǎn)要素的今天,數(shù)據(jù)分析師的職業(yè)價(jià)值愈發(fā)凸顯。CDA(Certified D ...
2025-07-03