99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁(yè)精彩閱讀如何管理Java線程池及搭建分布式Hadoop調(diào)度框架
如何管理Java線程池及搭建分布式Hadoop調(diào)度框架
2014-12-26
收藏

如何管理Java線程池及搭建分布式Hadoop調(diào)度框架


平時(shí)的開(kāi)發(fā)中線程是個(gè)少不了的東西,比如tomcat里的servlet就是線程,沒(méi)有線程我們?nèi)绾翁峁┒嘤脩粼L問(wèn)呢?不過(guò)很多剛開(kāi)始接觸線程的開(kāi)發(fā)工程師卻在這個(gè)上面吃了不少苦頭。怎么做一套簡(jiǎn)便的線程開(kāi)發(fā)模式框架讓大家從單線程開(kāi)發(fā)快速轉(zhuǎn)入多線程開(kāi)發(fā),這確實(shí)是個(gè)比較難搞的工程。

那具體什么是線程呢?首先看看進(jìn)程是什么,進(jìn)程就是系統(tǒng)中執(zhí)行的一個(gè)程序,這個(gè)程序可以使用內(nèi)存、處理器、文件系統(tǒng)等相關(guān)資源。例如QQ軟件、Eclipse、Tomcat等就是一個(gè)exe程序,運(yùn)行啟動(dòng)起來(lái)就是一個(gè)進(jìn)程。為什么需要多線程?如果每個(gè)進(jìn)程都是單獨(dú)處理一件事情不能多個(gè)任務(wù)同時(shí)處理,比如我們打開(kāi)qq只能和一個(gè)人聊天,我們用eclipse開(kāi)發(fā)代碼的時(shí)候不能編譯代碼,我們請(qǐng)求tomcat服務(wù)時(shí)只能服務(wù)一個(gè)用戶請(qǐng)求,那我想我們還在原始社會(huì)。多線程的目的就是讓一個(gè)進(jìn)程能夠同時(shí)處理多件事情或者請(qǐng)求。比如現(xiàn)在我們使用的QQ軟件可以同時(shí)和多個(gè)人聊天,我們用eclipse開(kāi)發(fā)代碼時(shí)還可以編譯代碼,tomcat可以同時(shí)服務(wù)多個(gè)用戶請(qǐng)求。

線程這么多好處,怎么把單進(jìn)程程序變成多線程程序呢?不同的語(yǔ)言有不同的實(shí)現(xiàn),這里說(shuō)下java語(yǔ)言的實(shí)現(xiàn)多線程的兩種方式:擴(kuò)展java.lang.Thread類、實(shí)現(xiàn)java.lang.Runnable接口。

先看個(gè)例子,假設(shè)有100個(gè)數(shù)據(jù)需要分發(fā)并且計(jì)算??聪聠尉€程的處理速度:

package thread;import java.util.Vector;public class OneMain {       
public static void main(String[] args) throws InterruptedException {            
Vector<Integer> list = new Vector<Integer>(100);             
for (int i = 0; i < 100; i++) {                  
list.add(i);            }             
long start = System.currentTimeMillis();             
while (list.size() > 0) {                   
int val = list.remove(0);                  
Thread. sleep(100);//模擬處理                  
System. out.println(val);            }             
long end = System.currentTimeMillis();            
System. out.println("消耗 " + (end - start) + " ms");      }       // 消耗 10063 ms}

再看一下多線程的處理速度,采用了10個(gè)線程分別處理:

package thread;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
public class MultiThread extends Thread {     
static Vector<Integer> 
list = new Vector<Integer>(100);     
static CountDownLatch count = new CountDownLatch(10);     
public void run() {          
while (list.size() > 0) {               
try {                    
int val = list.remove(0);                    
System.out.println(val);                    
Thread.sleep(100);//模擬處理               } 
catch (Exception e) {                    // 可能數(shù)組越界,這個(gè)地方只是為了說(shuō)明問(wèn)題,忽略錯(cuò)誤               }          }                   
count.countDown(); // 刪除成功減一     }     
public static void main(String[] args) throws InterruptedException {                   
for (int i = 0; i < 100; i++) {               
list.add(i);          }                   
long start = System.currentTimeMillis();          
for (int i = 0; i < 10; i++) {               
new MultiThread().start();          }                   
count.await();          
long end = System.currentTimeMillis();          
System.out.println("消耗 " + (end - start) + " ms");     }     // 消耗 1001 ms}

大家看到了線程的好處了吧!單線程需要10S,10個(gè)線程只需要1S。充分利用了系統(tǒng)資源實(shí)現(xiàn)并行計(jì)算。也許這里會(huì)產(chǎn)生一個(gè)誤解,是不是增加的線程個(gè)數(shù)越多效率越高。線程越多處理性能越高這個(gè)是錯(cuò)誤的,范式都要合適,過(guò)了就不好了。需要普及一下計(jì)算機(jī)硬件的一些知識(shí)。我們的cpu是個(gè)運(yùn)算器,線程執(zhí)行就需要這個(gè)運(yùn)算器來(lái)運(yùn)行。不過(guò)這個(gè)資源只有一個(gè),大家就會(huì)爭(zhēng)搶。一般通過(guò)以下幾種算法實(shí)現(xiàn)爭(zhēng)搶cpu的調(diào)度:

  1. 隊(duì)列方式,先來(lái)先服務(wù)。不管是什么任務(wù)來(lái)了都要按照隊(duì)列排隊(duì)先來(lái)后到。
  2. 時(shí)間片輪轉(zhuǎn),這也是最古老的cpu調(diào)度算法。設(shè)定一個(gè)時(shí)間片,每個(gè)任務(wù)使用cpu的時(shí)間不能超過(guò)這個(gè)時(shí)間。如果超過(guò)了這個(gè)時(shí)間就把任務(wù)暫停保存狀態(tài),放到隊(duì)列尾部繼續(xù)等待執(zhí)行。
  3. 優(yōu)先級(jí)方式:給任務(wù)設(shè)定優(yōu)先級(jí),有優(yōu)先級(jí)的先執(zhí)行,沒(méi)有優(yōu)先級(jí)的就等待執(zhí)行。

這三種算法都有優(yōu)缺點(diǎn),實(shí)際操作系統(tǒng)是結(jié)合多種算法,保證優(yōu)先級(jí)的能夠先處理,但是也不能一直處理優(yōu)先級(jí)的任務(wù)。硬件方面為了提高效率也有多核cpu、多線程cpu等解決方案。目前看得出來(lái)線程增多了會(huì)帶來(lái)cpu調(diào)度的負(fù)載增加,cpu需要調(diào)度大量的線程,包括創(chuàng)建線程、銷毀線程、線程是否需要換出cpu、是否需要分配到cpu。這些都是需要消耗系統(tǒng)資源的,由此,我們需要一個(gè)機(jī)制來(lái)統(tǒng)一管理這一堆線程資源。線程池的理念提出解決了頻繁創(chuàng)建、銷毀線程的代價(jià)。線程池指預(yù)先創(chuàng)建好一定大小的線程等待隨時(shí)服務(wù)用戶的任務(wù)處理,不必等到用戶需要的時(shí)候再去創(chuàng)建。特別是在java開(kāi)發(fā)中,盡量減少垃圾回收機(jī)制的消耗就要減少對(duì)象的頻繁創(chuàng)建和銷毀。

之前我們都是自己實(shí)現(xiàn)的線程池,不過(guò)隨之jdk1.5的推出,jdk自帶了java.util.concurrent并發(fā)開(kāi)發(fā)框架,解決了我們大部分線程池框架的重復(fù)工作??梢允褂肊xecutors來(lái)建立線程池,列出以下大概的,后面再介紹。

  • newCachedThreadPool建立具有緩存功能線程池
  • newFixedThreadPool建立固定數(shù)量的線程
  • newScheduledThreadPool建立具有時(shí)間調(diào)度的線程

有了線程池后有以下幾個(gè)問(wèn)題需要考慮:

  1. 線程怎么管理,比如新建任務(wù)線程。
  2. 線程如何停止、啟動(dòng)。
  3. 線程除了scheduled模式的間隔時(shí)間定時(shí)外能否實(shí)現(xiàn)精確時(shí)間啟動(dòng)。比如晚上1點(diǎn)啟動(dòng)。
  4. 線程如何監(jiān)控,如果線程執(zhí)行過(guò)程中死掉了,異常終止我們?cè)趺粗馈?/span>

考慮到這幾點(diǎn),我們需要把線程集中管理起來(lái),用java.util.concurrent是做不到的。需要做以下幾點(diǎn):

  1. 將線程和業(yè)務(wù)分離,業(yè)務(wù)的配置單獨(dú)做成一個(gè)表。
  2. 構(gòu)建基于concurrent的線程調(diào)度框架,包括可以管理線程的狀態(tài)、停止線程的接口、線程存活心跳機(jī)制、線程異常日志記錄模塊。
  3. 構(gòu)建靈活的timer組件,添加quartz定時(shí)組件實(shí)現(xiàn)精準(zhǔn)定時(shí)系統(tǒng)。
  4. 和業(yè)務(wù)配置信息結(jié)合構(gòu)建線程池任務(wù)調(diào)度系統(tǒng)??梢酝ㄟ^(guò)配置管理、添加線程任務(wù)、監(jiān)控、定時(shí)、管理等操作。

組件圖為:

構(gòu)建好線程調(diào)度框架是不是就可以應(yīng)對(duì)大量計(jì)算的需求了呢?答案是否定的。因?yàn)橐粋€(gè)機(jī)器的資源是有限的,上面也提到了cpu是時(shí)間周期的,任務(wù)一多了也會(huì)排隊(duì),就算增加cpu,一個(gè)機(jī)器能承載的cpu也是有限的。所以需要把整個(gè)線程池框架做成分布式的任務(wù)調(diào)度框架才能應(yīng)對(duì)橫向擴(kuò)展,比如一個(gè)機(jī)器上的資源達(dá)到瓶頸了,馬上增加一臺(tái)機(jī)器部署調(diào)度框架和業(yè)務(wù)就可以增加計(jì)算能力了。好了,如何搭建?如下圖:

基于jeeframework我們封裝spring、ibatis、數(shù)據(jù)庫(kù)等操作,并且可以調(diào)用業(yè)務(wù)方法完成業(yè)務(wù)處理。主要組件為:

  1. 任務(wù)集中存儲(chǔ)到數(shù)據(jù)庫(kù)服務(wù)器
  2. 控制中心負(fù)責(zé)管理集群中的節(jié)點(diǎn)狀態(tài),任務(wù)分發(fā)
  3. 線程池調(diào)度集群負(fù)責(zé)控制中心分發(fā)的任務(wù)執(zhí)行
  4. web服務(wù)器通過(guò)可視化操作任務(wù)的分派、管理、監(jiān)控。

一般這個(gè)架構(gòu)可以應(yīng)對(duì)常用的分布式處理需求了,不過(guò)有個(gè)缺陷就是隨著開(kāi)發(fā)人員的增多和業(yè)務(wù)模型的增多,單線程的編程模型也會(huì)變得復(fù)雜。比如需要對(duì)1000w數(shù)據(jù)進(jìn)行分詞,如果這個(gè)放到一個(gè)線程里來(lái)執(zhí)行,不算計(jì)算時(shí)間消耗光是查詢數(shù)據(jù)庫(kù)就需要耗費(fèi)不少時(shí)間。有人說(shuō),那我把1000w數(shù)據(jù)打散放到不同機(jī)器去運(yùn)算,然后再合并不就行了嗎?因?yàn)檫@是個(gè)特例的模式,專為了這個(gè)需求去開(kāi)發(fā)相應(yīng)的程序沒(méi)有問(wèn)題,但是以后又有其他的海量需求如何辦?比如把倒退3年的所有用戶發(fā)的帖子中發(fā)帖子最多的粉絲轉(zhuǎn)發(fā)的最高的用戶作息時(shí)間取出來(lái)。又得編一套程序?qū)崿F(xiàn),太麻煩!分布式云計(jì)算架構(gòu)要解決的就是這些問(wèn)題,減少開(kāi)發(fā)復(fù)雜度并且要高性能,大家會(huì)不會(huì)想到一個(gè)最近很熱的一個(gè)框架,hadoop,沒(méi)錯(cuò)就是這個(gè)玩意。hadoop解決的就是這個(gè)問(wèn)題,把大的計(jì)算任務(wù)分解、計(jì)算、合并,這不就是我們要的東西嗎?不過(guò)玩過(guò)這個(gè)的人都知道他是一個(gè)單獨(dú)的進(jìn)程。不是!他是一堆進(jìn)程,怎么和我們的調(diào)度框架結(jié)合起來(lái)?看圖說(shuō)話:

基本前面的分布式調(diào)度框架組件不變,增加如下組件和功能:

  • 改造分布式調(diào)度框架,可以把本身線程任務(wù)變成mapreduce任務(wù)并提交到hadoop集群。
  • hadoop集群能夠調(diào)用業(yè)務(wù)接口的spring、ibatis處理業(yè)務(wù)邏輯訪問(wèn)數(shù)據(jù)庫(kù)。
  • hadoop需要的數(shù)據(jù)能夠通過(guò)hive查詢。
  • hadoop可以訪問(wèn)hdfs/hbase讀寫操作。
  • 業(yè)務(wù)數(shù)據(jù)要及時(shí)加入hive倉(cāng)庫(kù)。
  • hive處理離線型數(shù)據(jù)、hbase處理經(jīng)常更新的數(shù)據(jù)、hdfs是hive和hbase的底層結(jié)構(gòu)也可以存放常規(guī)文件。

這樣,整個(gè)改造基本完成。不過(guò)需要注意的是架構(gòu)設(shè)計(jì)一定要減少開(kāi)發(fā)程序的復(fù)雜度。這里雖然引入了hadoop模型,但是框架上開(kāi)發(fā)者還是隱藏的。業(yè)務(wù)處理類既可以在單機(jī)模式下運(yùn)行也可以在hadoop上運(yùn)行,并且可以調(diào)用spring、ibatis。減少了開(kāi)發(fā)的學(xué)習(xí)成本,在實(shí)戰(zhàn)中慢慢體會(huì)就學(xué)會(huì)了 一項(xiàng)新技能。

界面截圖:

 

數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼

若不方便掃碼,搜微信號(hào):CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機(jī)數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進(jìn)行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個(gè)參數(shù)驗(yàn)證碼對(duì)象,之后可以使用它調(diào)用相應(yīng)的接口 initGeetest({ // 以下 4 個(gè)配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺(tái)檢測(cè)極驗(yàn)服務(wù)器是否宕機(jī) new_captcha: data.new_captcha, // 用于宕機(jī)時(shí)表示是新驗(yàn)證碼的宕機(jī) product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說(shuō)明請(qǐng)參見(jiàn):http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計(jì)時(shí)完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請(qǐng)輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請(qǐng)輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }