环境准备
开发环境准备
| 准备项 | 说明 |
| 安装JDK | JDK 8、JDK11 ,推荐使用 konaJDK,下载地址 |
| 安装和配置 IDE | 按需选择,比如 IntelliJ IDEA 或 Eclipse,示例使用IDEA |
| 安装 Maven | 开发环境基础配置,负责构建 Java 应用程序 |
| Maven 配置准备 | 如果需要本地调试,需要参考 6.开发环境准备 配置 Maven settings.xml,推荐 Maven 3.6.3,下载地址 |
导入示例工程代码
以下以 IntelliJ IDEA 举例,将示例工程代码导入进行说明。
2.1 下载样例代码:https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples/git/files/master
克隆或者直接下载master代码都可以
git clone https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples
2.2 导入项目,然后选择JDK、MAVEN和settings文件。



样例代码
POM文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mapreduce-test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<hadoop.version>3.2.2-TBDS-5.3.1.3</hadoop.version>
<hive.version>3.1.3-TBDS-5.3.1.3</hive.version>
</properties>
<dependencies>
<!-- Hadoop Core Dependencies -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.32</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 注意不要漏掉这个transformer,否则运行时会报类似"Could not find a suitable table factory
for 'org.apache.flink.table.delegation.ExecutorFactory" 的错误-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.tencent.tbds.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
示例代码
public class WordCount {
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
private static final Logger logger = LoggerFactory.getLogger(WordCountMapper.class);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] parts = line.split(",");
logger.info("Processing line: " + line);
String[] words = line.split("\\s+");
for (String w : words) {
word.set(w);
context.write(word, one);
logger.debug("Emitting: (" + w + ", 1)");
}
}
}
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final Logger logger = LoggerFactory.getLogger(WordCountReducer.class);
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
String formattedOutput = String.format("%-30s %d", key.toString(), sum);
context.write(new Text(formattedOutput), null);
logger.info("Reducing: (" + key.toString() + ", " + sum + ")");
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: WordCount <input path> <output path>");
System.exit(-1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "Word Count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}