99999久久久久久亚洲,欧美人与禽猛交狂配,高清日韩av在线影院,一个人在线高清免费观看,啦啦啦在线视频免费观看www

熱線電話:13121318867

登錄
首頁(yè)精彩閱讀如何通過(guò)Java程序提交yarn的MapReduce計(jì)算任務(wù)_數(shù)據(jù)分析師
如何通過(guò)Java程序提交yarn的MapReduce計(jì)算任務(wù)_數(shù)據(jù)分析師
2014-12-01
收藏


如何通過(guò)Java程序提交yarn的MapReduce計(jì)算任務(wù)_數(shù)據(jù)分析師



需要通過(guò)Java程序提交Yarn的MapReduce的計(jì)算任務(wù)。與一般的通過(guò)Jar包提交MapReduce任務(wù)不同,通過(guò)程序提交MapReduce任務(wù)需要有點(diǎn)小變動(dòng),詳見(jiàn)以下代碼。

  以下為MapReduce主程序,有幾點(diǎn)需要提一下:

  1、在程序中,我將文件讀入格式設(shè)定為WholeFileInputFormat,即不對(duì)文件進(jìn)行切分。

  2、為了控制reduce的處理過(guò)程,map的輸出鍵的格式為組合鍵格式。與常規(guī)的<key,value>不同,這里變?yōu)榱?lt;textpair,value>,TextPair的格式為<key1,key2>。

  3、為了適應(yīng)組合鍵,重新設(shè)定了分組函數(shù),即GroupComparator。分組規(guī)則為,只要TextPair中的key1相同(不要求key2相同),則數(shù)據(jù)被分配到一個(gè)reduce容器中。這樣,當(dāng)相同key1的數(shù)據(jù)進(jìn)入reduce容器后,key2起到了一個(gè)數(shù)據(jù)標(biāo)識(shí)的作用。

  package web.Hadoop;

  import java.io.IOException;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.BytesWritable;

  import org.apache.hadoop.io.WritableComparable;

  import org.apache.hadoop.io.WritableComparator;

  import org.apache.hadoop.mapred.JobClient;

  import org.apache.hadoop.mapred.JobConf;

  import org.apache.hadoop.mapred.JobStatus;

  import org.apache.hadoop.mapreduce.Job;

  import org.apache.hadoop.mapreduce.Partitioner;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

  import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

  import util.Utils;

  public class GEMIMain {

  public GEMIMain(){

  job = null;

  }

  public Job job;

  public static class NamePartitioner extends

  Partitioner<textpair, byteswritable=""> {

  @Override

  public int getPartition(TextPair key, BytesWritable value,

  int numPartitions) {

  return Math.abs(key.getFirst().hashCode() * 127) % numPartitions;

  }

  }

  /**

  * 分組設(shè)置類,只要兩個(gè)TextPair的第一個(gè)key相同,他們就屬于同一組。他們的Value就放到一個(gè)Value迭代器中,

  * 然后進(jìn)入Reducer的reduce方法中。

  *

  * @author hduser

  *

  */

  public static class GroupComparator extends WritableComparator {

  public GroupComparator() {

  super(TextPair.class, true);

  }

  @Override

  public int compare(WritableComparable a, WritableComparable b) {

  TextPair t1 = (TextPair) a;

  TextPair t2 = (TextPair) b;

  // 比較相同則返回0,比較不同則返回-1

  return t1.getFirst().compareTo(t2.getFirst()); // 只要是第一個(gè)字段相同的就分成為同一組

  }

  }

  public boolean runJob(String[] args) throws IOException,

  ClassNotFoundException, InterruptedException {

  Configuration conf = new Configuration();

  // 在conf中設(shè)置outputath變量,以在reduce函數(shù)中可以獲取到該參數(shù)的值

  conf.set("outputPath", args[args.length - 1].toString());

  //設(shè)置HDFS中,每次任務(wù)生成產(chǎn)品的質(zhì)量文件所在文件夾。args數(shù)組的倒數(shù)第二個(gè)原數(shù)為質(zhì)量文件所在文件夾

  conf.set("qualityFolder", args[args.length - 2].toString());

  //如果在Server中運(yùn)行,則需要獲取web項(xiàng)目的根路徑;如果以java應(yīng)用方式調(diào)試,則讀取/opt/hadoop-2.5.0/etc/hadoop/目錄下的配置文件

  //MapReduceProgress mprogress = new MapReduceProgress();

  //String rootPath= mprogress.rootPath;

  String rootPath="/opt/hadoop-2.5.0/etc/hadoop/";

  conf.addResource(new Path(rootPath+"yarn-site.xml"));

  conf.addResource(new Path(rootPath+"core-site.xml"));

  conf.addResource(new Path(rootPath+"hdfs-site.xml"));

  conf.addResource(new Path(rootPath+"mapred-site.xml"));

  this.job = new Job(conf);

  job.setJobName("Job name:" + args[0]);

  job.setJarByClass(GEMIMain.class);

  job.setMapperClass(GEMIMapper.class);

  job.setMapOutputKeyClass(TextPair.class);

  job.setMapOutputValueClass(BytesWritable.class);

  // 設(shè)置partition

  job.setPartitionerClass(NamePartitioner.class);

  // 在分區(qū)之后按照指定的條件分組

  job.setGroupingComparatorClass(GroupComparator.class);

  job.setReducerClass(GEMIReducer.class);

  job.setInputFormatClass(WholeFileInputFormat.class);

  job.setOutputFormatClass(NullOutputFormat.class);

  // job.setOutputKeyClass(NullWritable.class);

  // job.setOutputValueClass(Text.class);

  job.setNumReduceTasks(8);

  // 設(shè)置計(jì)算輸入數(shù)據(jù)的路徑

  for (int i = 1; i < args.length - 2; i++) {

  FileInputFormat.addInputPath(job, new Path(args[i]));

  }

  // args數(shù)組的最后一個(gè)元素為輸出路徑

  FileOutputFormat.setOutputPath(job, new Path(args[args.length - 1]));

  boolean flag = job.waitForCompletion(true);

  return flag;

  }

  @SuppressWarnings("static-access")

  public static void main(String[] args) throws ClassNotFoundException,

  IOException, InterruptedException {

  String[] inputPaths = new String[] { "normalizeJob",

  "hdfs://192.168.168.101:9000/user/hduser/red1/",

  "hdfs://192.168.168.101:9000/user/hduser/nir1/","quality11111",

  "hdfs://192.168.168.101:9000/user/hduser/test" };

  GEMIMain test = new GEMIMain();

  boolean result = test.runJob(inputPaths);

  }

  }

  以下為TextPair類

  public class TextPair implements WritableComparable {

  private Text first;

  private Text second;

  public TextPair() {

  set(new Text(), new Text());

  }

  public TextPair(String first, String second) {

  set(new Text(first), new Text(second));

  }

  public TextPair(Text first, Text second) {

  set(first, second);

  }

  public void set(Text first, Text second) {

  this.first = first;

  this.second = second;

  }

  public Text getFirst() {

  return first;

  }

  public Text getSecond() {

  return second;

  }

  @Override

  public void write(DataOutput out) throws IOException {

  first.write(out);

  second.write(out);

  }

  @Override

  public void readFields(DataInput in) throws IOException {

  first.readFields(in);

  second.readFields(in);

  }

  @Override

  public int hashCode() {

  return first.hashCode() * 163 + second.hashCode();

  }

  @Override

  public boolean equals(Object o) {

  if (o instanceof TextPair) {

  TextPair tp = (TextPair) o;

  return first.equals(tp.first) && second.equals(tp.second);

  }

  return false;

  }

  @Override

  public String toString() {

  return first + "\t" + second;

  }

  @Override

  /**A.compareTo(B)

  * 如果比較相同,則比較結(jié)果為0

  * 如果A大于B,則比較結(jié)果為1

  * 如果A小于B,則比較結(jié)果為-1

  *

  */

  public int compareTo(TextPair tp) {

  int cmp = first.compareTo(tp.first);

  if (cmp != 0) {

  return cmp;

  }

  //此時(shí)實(shí)現(xiàn)的是升序排列

  return second.compareTo(tp.second);

  }

  }

  以下為WholeFileInputFormat,其控制數(shù)據(jù)在mapreduce過(guò)程中不被切分

  package web.hadoop;

  import java.io.IOException;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.BytesWritable;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.InputSplit;

  import org.apache.hadoop.mapreduce.JobContext;

  import org.apache.hadoop.mapreduce.RecordReader;

  import org.apache.hadoop.mapreduce.TaskAttemptContext;

  import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

  public class WholeFileInputFormat extends FileInputFormat<text, byteswritable=""> {

  @Override

  public RecordReader<text, byteswritable=""> createRecordReader(

  InputSplit arg0, TaskAttemptContext arg1) throws IOException,

  InterruptedException {

  // TODO Auto-generated method stub

  return new WholeFileRecordReader();

  }

  @Override

  protected boolean isSplitable(JobContext context, Path filename) {

  // TODO Auto-generated method stub

  return false;

  }

  }

  以下為WholeFileRecordReader類

  package web.hadoop;

  import java.io.IOException;

  import org.apache.hadoop.conf.Configuration;

  import org.apache.hadoop.fs.FSDataInputStream;

  import org.apache.hadoop.fs.FileSystem;

  import org.apache.hadoop.fs.Path;

  import org.apache.hadoop.io.BytesWritable;

  import org.apache.hadoop.io.IOUtils;

  import org.apache.hadoop.io.Text;

  import org.apache.hadoop.mapreduce.InputSplit;

  import org.apache.hadoop.mapreduce.RecordReader;

  import org.apache.hadoop.mapreduce.TaskAttemptContext;

  import org.apache.hadoop.mapreduce.lib.input.FileSplit;

  public class WholeFileRecordReader extends RecordReader<text, byteswritable=""> {

  private FileSplit fileSplit;

  private FSDataInputStream fis;

  private Text key = null;

  private BytesWritable value = null;

  private boolean processed = false;

  @Override

  public void close() throws IOException {

  // TODO Auto-generated method stub

  // fis.close();

  }

  @Override

  public Text getCurrentKey() throws IOException, InterruptedException {

  // TODO Auto-generated method stub

  return this.key;

  }

  @Override

  public BytesWritable getCurrentValue() throws IOException,

  InterruptedException {

  // TODO Auto-generated method stub

  return this.value;

  }

  @Override

  public void initialize(InputSplit inputSplit, TaskAttemptContext tacontext)

  throws IOException, InterruptedException {

  fileSplit = (FileSplit) inputSplit;

  Configuration job = tacontext.getConfiguration();

  Path file = fileSplit.getPath();

  FileSystem fs = file.getFileSystem(job);

  fis = fs.open(file);

  }

  @Override

  public boolean nextKeyValue() {

  if (key == null) {

  key = new Text();

  }

  if (value == null) {

  value = new BytesWritable();

  }

  if (!processed) {

  byte[] content = new byte[(int) fileSplit.getLength()];

  Path file = fileSplit.getPath();

  System.out.println(file.getName());

  key.set(file.getName());

  try {

  IOUtils.readFully(fis, content, 0, content.length);

  // value.set(content, 0, content.length);

  value.set(new BytesWritable(content));

  } catch (IOException e) {

  // TODO Auto-generated catch block

  e.printStackTrace();

  } finally {

  IOUtils.closeStream(fis);

  }

  processed = true;

  return true;

  }

  return false;

  }

  @Override

  public float getProgress() throws IOException, InterruptedException {

  // TODO Auto-generated method stub

  return processed ? fileSplit.getLength() : 0;

  }

  }


數(shù)據(jù)分析咨詢請(qǐng)掃描二維碼

若不方便掃碼,搜微信號(hào):CDAshujufenxi

數(shù)據(jù)分析師資訊
更多

OK
客服在線
立即咨詢
客服在線
立即咨詢
') } function initGt() { var handler = function (captchaObj) { captchaObj.appendTo('#captcha'); captchaObj.onReady(function () { $("#wait").hide(); }).onSuccess(function(){ $('.getcheckcode').removeClass('dis'); $('.getcheckcode').trigger('click'); }); window.captchaObj = captchaObj; }; $('#captcha').show(); $.ajax({ url: "/login/gtstart?t=" + (new Date()).getTime(), // 加隨機(jī)數(shù)防止緩存 type: "get", dataType: "json", success: function (data) { $('#text').hide(); $('#wait').show(); // 調(diào)用 initGeetest 進(jìn)行初始化 // 參數(shù)1:配置參數(shù) // 參數(shù)2:回調(diào),回調(diào)的第一個(gè)參數(shù)驗(yàn)證碼對(duì)象,之后可以使用它調(diào)用相應(yīng)的接口 initGeetest({ // 以下 4 個(gè)配置參數(shù)為必須,不能缺少 gt: data.gt, challenge: data.challenge, offline: !data.success, // 表示用戶后臺(tái)檢測(cè)極驗(yàn)服務(wù)器是否宕機(jī) new_captcha: data.new_captcha, // 用于宕機(jī)時(shí)表示是新驗(yàn)證碼的宕機(jī) product: "float", // 產(chǎn)品形式,包括:float,popup width: "280px", https: true // 更多配置參數(shù)說(shuō)明請(qǐng)參見(jiàn):http://docs.geetest.com/install/client/web-front/ }, handler); } }); } function codeCutdown() { if(_wait == 0){ //倒計(jì)時(shí)完成 $(".getcheckcode").removeClass('dis').html("重新獲取"); }else{ $(".getcheckcode").addClass('dis').html("重新獲取("+_wait+"s)"); _wait--; setTimeout(function () { codeCutdown(); },1000); } } function inputValidate(ele,telInput) { var oInput = ele; var inputVal = oInput.val(); var oType = ele.attr('data-type'); var oEtag = $('#etag').val(); var oErr = oInput.closest('.form_box').next('.err_txt'); var empTxt = '請(qǐng)輸入'+oInput.attr('placeholder')+'!'; var errTxt = '請(qǐng)輸入正確的'+oInput.attr('placeholder')+'!'; var pattern; if(inputVal==""){ if(!telInput){ errFun(oErr,empTxt); } return false; }else { switch (oType){ case 'login_mobile': pattern = /^1[3456789]\d{9}$/; if(inputVal.length==11) { $.ajax({ url: '/login/checkmobile', type: "post", dataType: "json", data: { mobile: inputVal, etag: oEtag, page_ur: window.location.href, page_referer: document.referrer }, success: function (data) { } }); } break; case 'login_yzm': pattern = /^\d{6}$/; break; } if(oType=='login_mobile'){ } if(!!validateFun(pattern,inputVal)){ errFun(oErr,'') if(telInput){ $('.getcheckcode').removeClass('dis'); } }else { if(!telInput) { errFun(oErr, errTxt); }else { $('.getcheckcode').addClass('dis'); } return false; } } return true; } function errFun(obj,msg) { obj.html(msg); if(msg==''){ $('.login_submit').removeClass('dis'); }else { $('.login_submit').addClass('dis'); } } function validateFun(pat,val) { return pat.test(val); }