示例工程开发

最近更新时间: 2026-03-13 09:03:00

环境准备

开发环境准备

准备项说明
安装JDKJDK 8、JDK11 ,推荐使用 konaJDK,下载地址
安装和配置 IDE按需选择,比如 IntelliJ IDEA 或 Eclipse,示例使用IDEA
安装 Maven开发环境基础配置,负责构建 Java 应用程序
Maven 配置准备如果需要本地调试,需要参考 6.开发环境准备 配置 Maven settings.xml,推荐 Maven 3.6.3,下载地址

导入示例工程代码

以下以 IntelliJ IDEA 举例,将示例工程代码导入进行说明。
2.1 下载样例代码:https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples/git/files/master

克隆或者直接下载master代码都可以

git clone https://g-necm8077.coding.net/public/tencentcloud-tbds-examples/tbds-examples

2.2 导入项目,然后选择JDK、MAVEN和settings文件。

样例代码

POM 文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.tencent.tbds</groupId>
    <artifactId>CountWindowApp</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <flink.version>1.16.1-TBDS-5.3.1.3</flink.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.12</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
            <scope>provided</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.1.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>

                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <!-- 注意不要漏掉这个transformer,否则运行时会报类似"Could not find a suitable table factory
                                for 'org.apache.flink.table.delegation.ExecutorFactory" 的错误-->
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>com.tencent.tbds.CountWindowApp</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

示例代码

public class CountWindowApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(CountWindowApp.class);
    private static Boolean cancel = false;

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        DataStreamSource<String> ds =
                env.addSource(new RichParallelSourceFunction<String>() {
                    @Override
                    public void run(SourceContext<String> sourceContext)
                            throws Exception {
                        while (!cancel) {
                            String source = RandomStringUtils.random(8, true, true);
                            sourceContext.collect(source);
                            LOGGER.info("generate source: {} ,subtask: {}", source,
                                    getRuntimeContext().getIndexOfThisSubtask());
                            TimeUnit.SECONDS.sleep(5);
                        }
                    }
                    @Override
                    public void cancel() {
                        LOGGER.info("source cancel new ... ");
                        cancel = true;
                    }
                }).setParallelism(1);
        // 滚动窗口每 5 个数据产生一个窗口
        AllWindowedStream<String, GlobalWindow> countWindowAll = ds.countWindowAll(5);
        // 滑动窗口每 3 个数据产生一个包含前 5 个数据的窗口
        // AllWindowedStream<String, GlobalWindow> countWindowAll = ds.countWindowAll(5, 3);
        SingleOutputStreamOperator<List<Tuple2<String, Integer>>> aggregate =
                countWindowAll.aggregate(
                        new AggregateFunction<String, List<Tuple2<String, Integer>>, List<Tuple2<String, Integer>>>() {
                            @Override
                            public List<Tuple2<String, Integer>> createAccumulator() {
                                return new ArrayList<>();
                            }
                            @Override
                            public List<Tuple2<String, Integer>> add(String s,
                                                                     List<Tuple2<String, Integer>> acc) {
                                acc.add(Tuple2.apply(s, 1));
                                return acc;
                            }
                            @Override
                            public List<Tuple2<String, Integer>> getResult(
                                    List<Tuple2<String, Integer>> acc) {
                                return acc;
                            }
                            @Override
                            public List<Tuple2<String, Integer>> merge(
                                    List<Tuple2<String, Integer>> acc1,
                                    List<Tuple2<String, Integer>> acc2) {
                                acc1.addAll(acc2);
                                return acc1;
                            }
                        });

        aggregate.addSink(new RichSinkFunction<List<Tuple2<String, Integer>>>() {
            @Override
            public void invoke(List<Tuple2<String, Integer>> value,
                               Context context) throws Exception {
                LOGGER.info("sink => {}", value);
            }

        });
        env.execute("CountWindowApp");
    }
}