环境准备
开发环境准备
| 准备项 | 说明 |
| 安装JDK | JDK 8、JDK11 ,推荐使用 konaJDK,下载地址 |
| 安装和配置 IDE | 按需选择,比如 IntelliJ IDEA 或 Eclipse,示例使用IDEA |
| 安装 Maven | 开发环境基础配置,负责构建 Java 应用程序 |
| Maven 配置准备 | 如果需要本地调试,需要参考 6.开发环境准备 配置 Maven settings.xml,推荐 Maven 3.6.3,下载地址 |
导入示例工程代码
以下以 IntelliJ IDEA 举例,将示例工程代码导入进行说明。
2.1 下载样例代码:https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples/git/files/master
克隆或者直接下载master代码都可以
git clone https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples
2.2 导入项目,然后选择JDK、MAVEN和settings文件。



样例代码
POM 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tencent.tbds</groupId>
<artifactId>CountWindowApp</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<flink.version>1.16.1-TBDS-5.3.1.3</flink.version>
<java.version>1.8</java.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.12</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<!-- 注意不要漏掉这个transformer,否则运行时会报类似"Could not find a suitable table factory
for 'org.apache.flink.table.delegation.ExecutorFactory" 的错误-->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.tencent.tbds.CountWindowApp</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
示例代码
public class CountWindowApp {
private static final Logger LOGGER = LoggerFactory.getLogger(CountWindowApp.class);
private static Boolean cancel = false;
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
// env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStreamSource<String> ds =
env.addSource(new RichParallelSourceFunction<String>() {
@Override
public void run(SourceContext<String> sourceContext)
throws Exception {
while (!cancel) {
String source = RandomStringUtils.random(8, true, true);
sourceContext.collect(source);
LOGGER.info("generate source: {} ,subtask: {}", source,
getRuntimeContext().getIndexOfThisSubtask());
TimeUnit.SECONDS.sleep(5);
}
}
@Override
public void cancel() {
LOGGER.info("source cancel new ... ");
cancel = true;
}
}).setParallelism(1);
// 滚动窗口每 5 个数据产生一个窗口
AllWindowedStream<String, GlobalWindow> countWindowAll = ds.countWindowAll(5);
// 滑动窗口每 3 个数据产生一个包含前 5 个数据的窗口
// AllWindowedStream<String, GlobalWindow> countWindowAll = ds.countWindowAll(5, 3);
SingleOutputStreamOperator<List<Tuple2<String, Integer>>> aggregate =
countWindowAll.aggregate(
new AggregateFunction<String, List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>() {
@Override
public List<Tuple2<String, Integer>> createAccumulator() {
return new ArrayList<>();
}
@Override
public List<Tuple2<String, Integer>> add(String s,
List<Tuple2<String, Integer>> acc) {
acc.add(Tuple2.apply(s, 1));
return acc;
}
@Override
public List<Tuple2<String, Integer>> getResult(
List<Tuple2<String, Integer>> acc) {
return acc;
}
@Override
public List<Tuple2<String, Integer>> merge(
List<Tuple2<String, Integer>> acc1,
List<Tuple2<String, Integer>> acc2) {
acc1.addAll(acc2);
return acc1;
}
});
aggregate.addSink(new RichSinkFunction<List<Tuple2<String, Integer>>>() {
@Override
public void invoke(List<Tuple2<String, Integer>> value,
Context context) throws Exception {
LOGGER.info("sink => {}", value);
}
});
env.execute("CountWindowApp");
}
}