1. Hadoop Streaming方式运行程序Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下: 1 #!/bin/sh 2 3# 参数合法性判断4 5if [ $# != 7 ]; then 6echo./bin/avp_platform_startu...
1. Hadoop Streaming方式运行程序
Hadoop Streaming可以运行除JAVA语言以外,其它的语言编写的程序。其启动脚本示例如下:
1 #!/bin/sh 2 3# 参数合法性判断
4 5if [ $# != 7 ]; then 6echo"./bin/avp_platform_startup.sh [USER_NAME] [INPUT_PAT] [OUTPUT_PAT] [MAP_TASKS] [REDUCE_TASKS] [CLASS_ID] [CODE_TYPE]" 7 exit
8fi 910# GLOBAL VARS
11 USER_NAME=$112 INPUT_PAT=$213 OUTPUT_PAT=$314 MAP_TASKS=$415 REDUCE_TASKS=$516 CLASS_ID=$617 CODE_TYPE=$71819# Hadoop Start Area
20 /home/work/software/hadoop/bin/hadoop jar /home/work/software/hadoop/contrib/streaming/hadoop-streaming.jar 21 -input /home/$USER/$USER_NAME/output/webpage/$INPUT_PAT/22 -output /home/$USER/$USER_NAME/output/avp/avp-extract-$USER_NAME\_nlp_$OUTPUT_PAT-`date +%F-%H-%M-%S` 23 -mapper "avp_extract_mapper.sh . . $CODE_TYPE"24 -reducer "avp_extract_reducer.sh"25 -file ./script/avp_extract_mapper.sh26 -file ./script/avp_extract_reducer.sh27 -file ./script/extract_tools/tidy_page.py 28 -file ./script/decode.pl 29 -file ./script/extract_tools/format.py 30 -file ./script/extract_tools/extract_tool.py 31 -file ./class/$CLASS_ID/site.list 32 -file ./class/$CLASS_ID/*.conf 33 -jobconf mapred.job.name=$USER_NAME\_avp_platform_extract_tools_$OUTPUT_PAT 34 -jobconf mapred.map.tasks=$MAP_TASKS 35 -jobconf mapred.reduce.tasks=$REDUCE_TASKS
2. JAVA原生接口编写HADOOP程序
第一步,需要将用JAVA编写的代码打包成JAR包。
<SPAN style=‘font-family: "courier new", courier;‘>MaxTemperatureMapper.java
1
package
oldapi;
2
3
import
java.io.IOException;
4
5
import
org.apache.hadoop.io.IntWritable;
6
import
org.apache.hadoop.io.LongWritable;
7
import
org.apache.hadoop.io.Text;
8
import
org.apache.hadoop.mapred.MapReduceBase;
9
import
org.apache.hadoop.mapred.Mapper;
10
import
org.apache.hadoop.mapred.OutputCollector;
11
import
org.apache.hadoop.mapred.Reporter;
12
13
public
class MaxTemperatureMapper extends MapReduceBase
14implements Mapper<LongWritable, Text, Text, IntWritable> {
1516privatestaticfinalint MISSING = 9999;
1718publicvoid map(LongWritable key, Text value,
19 OutputCollector<Text, IntWritable> output, Reporter reporter)
20throws IOException {
2122 String line = value.toString();
23 String year = line.substring(15, 19);
24int airTemperature;
25if (line.charAt(87) == ‘+‘) { // parseInt doesn‘t like leading plus signs26 airTemperature = Integer.parseInt(line.substring(88, 92));
27 } else {
28 airTemperature = Integer.parseInt(line.substring(87, 92));
29 }
30 String quality = line.substring(92, 93);
31if (airTemperature != MISSING && quality.matches("[01459]")) {
32 output.collect(new Text(year), new IntWritable(airTemperature));
33 }
34 }
35 }
<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperatureReducer.java
1
package
oldapi;
2
3
import
java.io.IOException;
4
import
java.util.Iterator;
5
6
import
org.apache.hadoop.io.IntWritable;
7
import
org.apache.hadoop.io.Text;
8
import
org.apache.hadoop.mapred.MapReduceBase;
9
import
org.apache.hadoop.mapred.OutputCollector;
10
import
org.apache.hadoop.mapred.Reducer;
11
import
org.apache.hadoop.mapred.Reporter;
12
13
public
class MaxTemperatureReducer extends MapReduceBase
14implements Reducer<Text, IntWritable, Text, IntWritable> {
1516publicvoid reduce(Text key, Iterator<IntWritable> values,
17 OutputCollector<Text, IntWritable> output, Reporter reporter)
18throws IOException {
1920int maxValue = Integer.MIN_VALUE;
21while (values.hasNext()) {
22 maxValue = Math.max(maxValue, values.next().get());
23 }
24 output.collect(key, new IntWritable(maxValue));
25 }
26 }
<SPAN style=‘font-family: "courier new", courier; font-size: 16px;‘>MaxTemperature.java
1
package
oldapi;
2
3
import
java.io.IOException;
4
5
import
org.apache.hadoop.fs.Path;
6
import
org.apache.hadoop.io.IntWritable;
7
import
org.apache.hadoop.io.Text;
8
import
org.apache.hadoop.mapred.FileInputFormat;
9
import
org.apache.hadoop.mapred.FileOutputFormat;
10
import
org.apache.hadoop.mapred.JobClient;
11
import
org.apache.hadoop.mapred.JobConf;
12
import
org.apache.hadoop.util.Tool;
13
import
org.apache.hadoop.util.ToolRunner;
14
import
org.apache.hadoop.conf.Configured;
15
import
oldapi.MaxTemperatureMapper;
16
import
oldapi.MaxTemperatureReducer;
17
18
public
class MaxTemperature extends Configured implements Tool{
1920publicint run(String[] args) throws IOException {
21if (args.length != 2) {
22 System.err.println("Usage: MaxTemperature <input path> <output path>");
23 System.exit(-1);
24 }
2526 JobConf conf = new JobConf(MaxTemperature.class);
27 conf.setJobName("Max temperature");
2829 FileInputFormat.addInputPath(conf, new Path(args[0]));
30 FileOutputFormat.setOutputPath(conf, new Path(args[1]));
3132 conf.setMapperClass(MaxTemperatureMapper.class);
33 conf.setCombinerClass(MaxTemperatureReducer.class);
34 conf.setReducerClass(MaxTemperatureReducer.class);
3536 conf.setOutputKeyClass(Text.class);
37 conf.setOutputValueClass(IntWritable.class);
3839 JobClient.runJob(conf);
40return 0;
41 }
4243publicstaticvoid main(String[] args) throws Exception {
44int exitCode = ToolRunner.run(new MaxTemperature(),args);
45 System.exit(exitCode);
46 }
4748 }
打包命令:(即会生成MaxTemperature.jar文件,-C参数指定的文件夹目录结构:MaxTemperature/oldapi/*.java)
1 jar cvf MaxTemperature.jar -C MaxTemperature/ .
第二步,启动运行。
1
sudo -u sniper hadoop jar MaxTemperature.jar oldapi.MaxTemperature /home/sniper/zhuliang/sample.txt /home/sniper/zhuliang/max-temp-output
附:输入文件样例
1
0067011990999991950051507004+68750+023550FM-12+038299999V0203301N00671220001CN9999999N9+00001+99999999999
2
0043011990999991950051512004+68750+023550FM-12+038299999V0203201N00671220001CN9999999N9+00221+99999999999
3
0043011990999991950051518004+68750+023550FM-12+038299999V0203201N00261220001CN9999999N9-00111+99999999999
4
0043012650999991949032412004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+01111+99999999999
5 0043012650999991949032418004+62300+010750FM-12+048599999V0202701N00461220001CN0500001N9+00781+99999999999
原文:http://www.cnblogs.com/bjzhuliang/p/3618724.html
本文标题为:Apache Hadoop 运行分布式程序方法总结(Streaming方式与原生JAVA接口)
- 教你在docker 中搭建 PHP8 + Apache 环境的过程 2022-10-06
- 解决:apache24 安装后闪退和配置端口映射和连接超时设置 2023-09-11
- CentOS_mini下安装docker 之 安装docker CE 2023-09-23
- CentOS7安装GlusterFS集群的全过程 2022-10-10
- 阿里云ECS排查CPU数据分析 2022-10-06
- 【转载】CentOS安装Tomcat 2023-09-24
- IIS搭建ftp服务器的详细教程 2022-11-15
- nginx中封禁ip和允许内网ip访问的实现示例 2022-09-23
- 利用Docker 运行 python 简单程序 2022-10-16
- KVM虚拟化Linux Bridge环境部署的方法步骤 2023-07-11
