
使用Storm實(shí)現(xiàn)實(shí)時(shí)大數(shù)據(jù)分析
簡(jiǎn)單和明了,Storm讓大變得輕松加愉快。
當(dāng)今世界,公司的日常運(yùn)營(yíng)經(jīng)常會(huì)生成TB級(jí)別的數(shù)據(jù)。數(shù)據(jù)來(lái)源囊括了互聯(lián)網(wǎng)裝置可以捕獲的任何類型數(shù)據(jù),網(wǎng)站、社交媒體、交易型商業(yè)數(shù)據(jù)以及其它商業(yè)環(huán)境中創(chuàng)建的數(shù)據(jù)。考慮到數(shù)據(jù)的生成量,實(shí)時(shí)處理成為了許多機(jī)構(gòu)需要面對(duì)的首要挑戰(zhàn)。我們經(jīng)常用的一個(gè)非常有效的開(kāi)源實(shí)時(shí)計(jì)算工具就是Storm —— Twitter開(kāi)發(fā),通常被比作“實(shí)時(shí)的Hadoop”。然而Storm遠(yuǎn)比Hadoop來(lái)的簡(jiǎn)單,因?yàn)橛盟幚泶髷?shù)據(jù)不會(huì)帶來(lái)新老技術(shù)的交替。
Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分別從事技術(shù)分析和研發(fā)工作。本文詳述了Storm的使用方法,例子中的項(xiàng)目名稱為“超速報(bào)警系統(tǒng)(Speeding Alert System)”。我們想實(shí)現(xiàn)的功能是:實(shí)時(shí)分析過(guò)往車(chē)輛的數(shù)據(jù),一旦車(chē)輛數(shù)據(jù)超過(guò)預(yù)設(shè)的臨界值 —— 便觸發(fā)一個(gè)trigger并把相關(guān)的數(shù)據(jù)存入數(shù)據(jù)庫(kù)。
Storm
對(duì)比Hadoop的批處理,Storm是個(gè)實(shí)時(shí)的、分布式以及具備高容錯(cuò)的計(jì)算系統(tǒng)。同Hadoop一樣Storm也可以處理大批量的數(shù)據(jù),然而Storm在保證高可靠性的前提下還可以讓處理進(jìn)行的更加實(shí)時(shí);也就是說(shuō),所有的信息都會(huì)被處理。Storm同樣還具備容錯(cuò)和分布計(jì)算這些特性,這就讓Storm可以擴(kuò)展到不同的機(jī)器上進(jìn)行大批量的數(shù)據(jù)處理。他同樣還有以下的這些特性:
易于擴(kuò)展。對(duì)于擴(kuò)展,你只需要添加機(jī)器和改變對(duì)應(yīng)的topology(拓?fù)?設(shè)置。Storm使用Hadoop Zookeeper進(jìn)行集群協(xié)調(diào),這樣可以充分的保證大型集群的良好運(yùn)行。
每條信息的處理都可以得到保證。
Storm集群管理簡(jiǎn)易。
Storm的容錯(cuò)機(jī)能:一旦topology遞交,Storm會(huì)一直運(yùn)行它直到topology被廢除或者被關(guān)閉。而在執(zhí)行中出現(xiàn)錯(cuò)誤時(shí),也會(huì)由Storm重新分配任務(wù)。
盡管通常使用Java,Storm中的topology可以用任何語(yǔ)言設(shè)計(jì)。
當(dāng)然為了更好的理解文章,你首先需要安裝和設(shè)置Storm。需要通過(guò)以下幾個(gè)簡(jiǎn)單的步驟:
從Storm官方下載Storm安裝文件
將bin/directory解壓到你的PATH上,并保證bin/storm腳本是可執(zhí)行的。
Storm組件
Storm集群主要由一個(gè)主節(jié)點(diǎn)和一群工作節(jié)點(diǎn)(worker node)組成,通過(guò) Zookeeper進(jìn)行協(xié)調(diào)。
主節(jié)點(diǎn):
主節(jié)點(diǎn)通常運(yùn)行一個(gè)后臺(tái)程序 —— Nimbus,用于響應(yīng)分布在集群中的節(jié)點(diǎn),分配任務(wù)和監(jiān)測(cè)故障。這個(gè)很類似于Hadoop中的Job Tracker。
工作節(jié)點(diǎn):
工作節(jié)點(diǎn)同樣會(huì)運(yùn)行一個(gè)后臺(tái)程序 —— Supervisor,用于收聽(tīng)工作指派并基于要求運(yùn)行工作進(jìn)程。每個(gè)工作節(jié)點(diǎn)都是topology中一個(gè)子集的實(shí)現(xiàn)。而Nimbus和Supervisor之間的協(xié)調(diào)則通過(guò)Zookeeper系統(tǒng)或者集群。
Zookeeper
Zookeeper是完成Supervisor和Nimbus之間協(xié)調(diào)的服務(wù)。而應(yīng)用程序?qū)崿F(xiàn)實(shí)時(shí)的邏輯則被封裝進(jìn)Storm中的“topology”。topology則是一組由Spouts(數(shù)據(jù)源)和Bolts(數(shù)據(jù)操作)通過(guò)Stream Groupings進(jìn)行連接的圖。下面對(duì)出現(xiàn)的術(shù)語(yǔ)進(jìn)行更深刻的解析。
Spout:
簡(jiǎn)而言之,Spout從來(lái)源處讀取數(shù)據(jù)并放入topology。Spout分成可靠和不可靠?jī)煞N;當(dāng)Storm接收失敗時(shí),可靠的Spout會(huì)對(duì)tuple(元組,數(shù)據(jù)項(xiàng)組成的列表)進(jìn)行重發(fā);而不可靠的Spout不會(huì)考慮接收成功與否只發(fā)射一次。而Spout中最主要的方法就是nextTuple(),該方法會(huì)發(fā)射一個(gè)新的tuple到topology,如果沒(méi)有新tuple發(fā)射則會(huì)簡(jiǎn)單的返回。
Bolt:
Topology中所有的處理都由Bolt完成。Bolt可以完成任何事,比如:連接的過(guò)濾、聚合、訪問(wèn)文件/數(shù)據(jù)庫(kù)、等等。Bolt從Spout中接收數(shù)據(jù)并進(jìn)行處理,如果遇到復(fù)雜流的處理也可能將tuple發(fā)送給另一個(gè)Bolt進(jìn)行處理。而B(niǎo)olt中最重要的方法是execute(),以新的tuple作為參數(shù)接收。不管是Spout還是Bolt,如果將tuple發(fā)射成多個(gè)流,這些流都可以通過(guò)declareStream()來(lái)聲明。
Stream Groupings:
Stream Grouping定義了一個(gè)流在Bolt任務(wù)間該如何被切分。這里有Storm提供的6個(gè)Stream Grouping類型:
1. 隨機(jī)分組(Shuffle grouping):隨機(jī)分發(fā)tuple到Bolt的任務(wù),保證每個(gè)任務(wù)獲得相等數(shù)量的tuple。
2. 字段分組(Fields grouping):根據(jù)指定字段分割數(shù)據(jù)流,并分組。例如,根據(jù)“user-id”字段,相同“user-id”的元組總是分發(fā)到同一個(gè)任務(wù),不同“user-id”的元組可能分發(fā)到不同的任務(wù)。
3. 全部分組(All grouping):tuple被復(fù)制到bolt的所有任務(wù)。這種類型需要謹(jǐn)慎使用。
4. 全局分組(Global grouping):全部流都分配到bolt的同一個(gè)任務(wù)。明確地說(shuō),是分配給ID最小的那個(gè)task。
5. 無(wú)分組(None grouping):你不需要關(guān)心流是如何分組。目前,無(wú)分組等效于隨機(jī)分組。但最終,Storm將把無(wú)分組的Bolts放到Bolts或Spouts訂閱它們的同一線程去執(zhí)行(如果可能)。
6. 直接分組(Direct grouping):這是一個(gè)特別的分組類型。元組生產(chǎn)者決定tuple由哪個(gè)元組處理者任務(wù)接收。
當(dāng)然還可以實(shí)現(xiàn)CustomStreamGroupimg接口來(lái)定制自己需要的分組。
項(xiàng)目實(shí)施
當(dāng)下情況我們需要給Spout和Bolt設(shè)計(jì)一種能夠處理大量數(shù)據(jù)(日志文件)的topology,當(dāng)一個(gè)特定數(shù)據(jù)值超過(guò)預(yù)設(shè)的臨界值時(shí)促發(fā)警報(bào)。使用Storm的topology,逐行讀入日志文件并且監(jiān)視輸入數(shù)據(jù)。在Storm組件方面,Spout負(fù)責(zé)讀入輸入數(shù)據(jù)。它不僅從現(xiàn)有的文件中讀入數(shù)據(jù),同時(shí)還監(jiān)視著新文件。文件一旦被修改Spout會(huì)讀入新的版本并且覆蓋之前的tuple(可以被Bolt讀入的格式),將tuple發(fā)射給Bolt進(jìn)行臨界分析,這樣就可以發(fā)現(xiàn)所有可能超臨界的記錄。
下一節(jié)將對(duì)用例進(jìn)行詳細(xì)介紹。
臨界分析
這一節(jié),將主要聚焦于臨界值的兩種分析類型:瞬間臨界(instant thershold)和時(shí)間序列臨界(time series threshold)。
瞬間臨界值監(jiān)測(cè):一個(gè)字段的值在那個(gè)瞬間超過(guò)了預(yù)設(shè)的臨界值,如果條件符合的話則觸發(fā)一個(gè)trigger。舉個(gè)例子當(dāng)車(chē)輛超越80公里每小時(shí),則觸發(fā)trigger。
時(shí)間序列臨界監(jiān)測(cè):字段的值在一個(gè)給定的時(shí)間段內(nèi)超過(guò)了預(yù)設(shè)的臨界值,如果條件符合則觸發(fā)一個(gè)觸發(fā)器。比如:在5分鐘類,時(shí)速超過(guò)80KM兩次及以上的車(chē)輛。
Listing One顯示了我們將使用的一個(gè)類型日志,其中包含的車(chē)輛數(shù)據(jù)信息有:車(chē)牌號(hào)、車(chē)輛行駛的速度以及數(shù)據(jù)獲取的位置。
AB 12360North city
BC 12370South city
CD 23440South city
DE 12340East city
EF 12390South city
GH 12350West city
這里將創(chuàng)建一個(gè)對(duì)應(yīng)的XML文件,這將包含引入數(shù)據(jù)的模式。這個(gè)XML將用于日志文件的解析。XML的設(shè)計(jì)模式和對(duì)應(yīng)的說(shuō)明請(qǐng)見(jiàn)下表。
XML文件和日志文件都存放在Spout可以隨時(shí)監(jiān)測(cè)的目錄下,用以關(guān)注文件的實(shí)時(shí)更新。而這個(gè)用例中的topology請(qǐng)見(jiàn)下圖。
Figure 1:Storm中建立的topology,用以實(shí)現(xiàn)數(shù)據(jù)實(shí)時(shí)處理
如圖所示:FilelistenerSpout接收輸入日志并進(jìn)行逐行的讀入,接著將數(shù)據(jù)發(fā)射給ThresoldCalculatorBolt進(jìn)行更深一步的臨界值處理。一旦處理完成,被計(jì)算行的數(shù)據(jù)將發(fā)送給DBWriterBolt,然后由DBWriterBolt存入給數(shù)據(jù)庫(kù)。下面將對(duì)這個(gè)過(guò)程的實(shí)現(xiàn)進(jìn)行詳細(xì)的解析。
Spout的實(shí)現(xiàn)
Spout以日志文件和XML描述文件作為接收對(duì)象。XML文件包含了與日志一致的設(shè)計(jì)模式。不妨設(shè)想一下一個(gè)示例日志文件,包含了車(chē)輛的車(chē)牌號(hào)、行駛速度、以及數(shù)據(jù)的捕獲位置。(看下圖)
Figure2:數(shù)據(jù)從日志文件到Spout的流程圖
Listing Two顯示了tuple對(duì)應(yīng)的XML,其中指定了字段、將日志文件切割成字段的定界符以及字段的類型。XML文件以及數(shù)據(jù)都被保存到Spout指定的路徑。
Listing Two:用以描述日志文件的XML文件。
<
TUPLEINFO
>
<
FIELDLIST
>
<
FIELD
>
<
COLUMNNAME
>
vehicle_number
COLUMNNAME
>
<
COLUMNTYPE
>
string
COLUMNTYPE
>
FIELD
>
<
FIELD
>
<
COLUMNNAME
>
speed
COLUMNNAME
>
<
COLUMNTYPE
>
int
COLUMNTYPE
>
FIELD
>
<
FIELD
>
<
COLUMNNAME
>
location
COLUMNNAME
>
<
COLUMNTYPE
>
string
COLUMNTYPE
>
FIELD
>
FIELDLIST
>
<
DELIMITER
>
,
DELIMITER
>
TUPLEINFO
>
通過(guò)構(gòu)造函數(shù)及它的參數(shù)Directory、PathSpout和TupleInfo對(duì)象創(chuàng)建Spout對(duì)象。TupleInfo儲(chǔ)存了日志文件的字段、定界符、字段的類型這些很必要的信息。這個(gè)對(duì)象通過(guò)XSTream序列化XML時(shí)建立。
Spout的實(shí)現(xiàn)步驟:
對(duì)文件的改變進(jìn)行分開(kāi)的監(jiān)聽(tīng),并監(jiān)視目錄下有無(wú)新日志文件添加。
在數(shù)據(jù)得到了字段的說(shuō)明后,將其轉(zhuǎn)換成tuple。
聲明Spout和Bolt之間的分組,并決定tuple發(fā)送給Bolt的途徑。
Spout的具體編碼在Listing Three中顯示。
Listing Three:Spout中open、nextTuple和delcareOutputFields方法的邏輯。
public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )
{
_collector = collector;
try
{
fileReader = new BufferedReader(new FileReader(new File(file)));
}
catch (FileNotFoundException e)
{
System.exit(1);
}
}
public void nextTuple()
{
protected void ListenFile(File file)
{
Utils.sleep(2000);
RandomAccessFile access = null;
String line = null;
try
{
while ((line = access.readLine()) != null)
{
if (line !=null)
{
String[] fields=null;
if (tupleInfo.getDelimiter().equals("|")) fields = line.split("\\"+tupleInfo.getDelimiter());
else
fields = line.split (tupleInfo.getDelimiter());
if (tupleInfo.getFieldList().size() == fields.length) _collector.emit(new Values(fields));
}
}
}
catch (IOException ex){ }
}
}
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
String[] fieldsArr = new String [tupleInfo.getFieldList().size()];
for(int i=0; i
{
fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();
}
declarer.declare(new Fields(fieldsArr));
}
declareOutputFileds()決定了tuple發(fā)射的格式,這樣的話Bolt就可以用類似的方法將tuple譯碼。Spout持續(xù)對(duì)日志文件的數(shù)據(jù)的變更進(jìn)行監(jiān)聽(tīng),一旦有添加Spout就會(huì)進(jìn)行讀入并且發(fā)送給Bolt進(jìn)行處理。
Bolt的實(shí)現(xiàn)
Spout的輸出結(jié)果將給予Bolt進(jìn)行更深一步的處理。經(jīng)過(guò)對(duì)用例的思考,我們的topology中需要如Figure 3中的兩個(gè)Bolt。
Figure 3:Spout到Bolt的數(shù)據(jù)流程。
ThresholdCalculatorBolt
Spout將tuple發(fā)出,由ThresholdCalculatorBolt接收并進(jìn)行臨界值處理。在這里,它將接收好幾項(xiàng)輸入進(jìn)行檢查;分別是:
臨界值檢查
臨界值欄數(shù)檢查(拆分成字段的數(shù)目)
臨界值數(shù)據(jù)類型(拆分后字段的類型)
臨界值出現(xiàn)的頻數(shù)
臨界值時(shí)間段檢查
Listing Four中的類,定義用來(lái)保存這些值。
Listing Four:ThresholdInfo類
public class ThresholdInfo implementsSerializable
{
private String action;
private String rule;
private Object thresholdValue;
private int thresholdColNumber;
private Integer timeWindow;
private int frequencyOfOccurence;
}
基于字段中提供的值,臨界值檢查將被Listing Five中的execute()方法執(zhí)行。代碼大部分的功能是解析和接收值的檢測(cè)。
Listing Five:臨界值檢測(cè)代碼段
public void execute(Tuple tuple, BasicOutputCollector collector)
{
if(tuple!=null)
{
List
數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼
若不方便掃碼,搜微信號(hào):CDAshujufenxi
LSTM 模型輸入長(zhǎng)度選擇技巧:提升序列建模效能的關(guān)鍵? 在循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)家族中,長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM)憑借其解決長(zhǎng)序列 ...
2025-07-11CDA 數(shù)據(jù)分析師報(bào)考條件詳解與準(zhǔn)備指南? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代浪潮下,CDA 數(shù)據(jù)分析師認(rèn)證愈發(fā)受到矚目,成為眾多有志投身數(shù) ...
2025-07-11數(shù)據(jù)透視表中兩列相乘合計(jì)的實(shí)用指南? 在數(shù)據(jù)分析的日常工作中,數(shù)據(jù)透視表憑借其強(qiáng)大的數(shù)據(jù)匯總和分析功能,成為了 Excel 用戶 ...
2025-07-11尊敬的考生: 您好! 我們誠(chéng)摯通知您,CDA Level I和 Level II考試大綱將于 2025年7月25日 實(shí)施重大更新。 此次更新旨在確保認(rèn) ...
2025-07-10BI 大數(shù)據(jù)分析師:連接數(shù)據(jù)與業(yè)務(wù)的價(jià)值轉(zhuǎn)化者? ? 在大數(shù)據(jù)與商業(yè)智能(Business Intelligence,簡(jiǎn)稱 BI)深度融合的時(shí)代,BI ...
2025-07-10SQL 在預(yù)測(cè)分析中的應(yīng)用:從數(shù)據(jù)查詢到趨勢(shì)預(yù)判? ? 在數(shù)據(jù)驅(qū)動(dòng)決策的時(shí)代,預(yù)測(cè)分析作為挖掘數(shù)據(jù)潛在價(jià)值的核心手段,正被廣泛 ...
2025-07-10數(shù)據(jù)查詢結(jié)束后:分析師的收尾工作與價(jià)值深化? ? 在數(shù)據(jù)分析的全流程中,“query end”(查詢結(jié)束)并非工作的終點(diǎn),而是將數(shù) ...
2025-07-10CDA 數(shù)據(jù)分析師考試:從報(bào)考到取證的全攻略? 在數(shù)字經(jīng)濟(jì)蓬勃發(fā)展的今天,數(shù)據(jù)分析師已成為各行業(yè)爭(zhēng)搶的核心人才,而 CDA(Certi ...
2025-07-09【CDA干貨】單樣本趨勢(shì)性檢驗(yàn):捕捉數(shù)據(jù)背后的時(shí)間軌跡? 在數(shù)據(jù)分析的版圖中,單樣本趨勢(shì)性檢驗(yàn)如同一位耐心的偵探,專注于從單 ...
2025-07-09year_month數(shù)據(jù)類型:時(shí)間維度的精準(zhǔn)切片? ? 在數(shù)據(jù)的世界里,時(shí)間是最不可或缺的維度之一,而year_month數(shù)據(jù)類型就像一把精準(zhǔn) ...
2025-07-09CDA 備考干貨:Python 在數(shù)據(jù)分析中的核心應(yīng)用與實(shí)戰(zhàn)技巧? ? 在 CDA 數(shù)據(jù)分析師認(rèn)證考試中,Python 作為數(shù)據(jù)處理與分析的核心 ...
2025-07-08SPSS 中的 Mann-Kendall 檢驗(yàn):數(shù)據(jù)趨勢(shì)與突變分析的有力工具? ? ? 在數(shù)據(jù)分析的廣袤領(lǐng)域中,準(zhǔn)確捕捉數(shù)據(jù)的趨勢(shì)變化以及識(shí)別 ...
2025-07-08備戰(zhàn) CDA 數(shù)據(jù)分析師考試:需要多久?如何規(guī)劃? CDA(Certified Data Analyst)數(shù)據(jù)分析師認(rèn)證作為國(guó)內(nèi)權(quán)威的數(shù)據(jù)分析能力認(rèn)證 ...
2025-07-08LSTM 輸出不確定的成因、影響與應(yīng)對(duì)策略? 長(zhǎng)短期記憶網(wǎng)絡(luò)(LSTM)作為循環(huán)神經(jīng)網(wǎng)絡(luò)(RNN)的一種變體,憑借獨(dú)特的門(mén)控機(jī)制,在 ...
2025-07-07統(tǒng)計(jì)學(xué)方法在市場(chǎng)調(diào)研數(shù)據(jù)中的深度應(yīng)用? 市場(chǎng)調(diào)研是企業(yè)洞察市場(chǎng)動(dòng)態(tài)、了解消費(fèi)者需求的重要途徑,而統(tǒng)計(jì)學(xué)方法則是市場(chǎng)調(diào)研數(shù) ...
2025-07-07CDA數(shù)據(jù)分析師證書(shū)考試全攻略? 在數(shù)字化浪潮席卷全球的當(dāng)下,數(shù)據(jù)已成為企業(yè)決策、行業(yè)發(fā)展的核心驅(qū)動(dòng)力,數(shù)據(jù)分析師也因此成為 ...
2025-07-07剖析 CDA 數(shù)據(jù)分析師考試題型:解鎖高效備考與答題策略? CDA(Certified Data Analyst)數(shù)據(jù)分析師考試作為衡量數(shù)據(jù)專業(yè)能力的 ...
2025-07-04SQL Server 字符串截取轉(zhuǎn)日期:解鎖數(shù)據(jù)處理的關(guān)鍵技能? 在數(shù)據(jù)處理與分析工作中,數(shù)據(jù)格式的規(guī)范性是保證后續(xù)分析準(zhǔn)確性的基礎(chǔ) ...
2025-07-04CDA 數(shù)據(jù)分析師視角:從數(shù)據(jù)迷霧中探尋商業(yè)真相? 在數(shù)字化浪潮席卷全球的今天,數(shù)據(jù)已成為企業(yè)決策的核心驅(qū)動(dòng)力,CDA(Certifie ...
2025-07-04CDA 數(shù)據(jù)分析師:開(kāi)啟數(shù)據(jù)職業(yè)發(fā)展新征程? ? 在數(shù)據(jù)成為核心生產(chǎn)要素的今天,數(shù)據(jù)分析師的職業(yè)價(jià)值愈發(fā)凸顯。CDA(Certified D ...
2025-07-03