Storm接入CKafka
最近更新时间: 2024-10-17 17:10:00
Storm 是一个分布式实时计算框架,能够对数据进行流式处理和提供通用性分布式 RPC 调用,可以实现处理事件亚秒级的延迟,适用于对延迟要求比较高的实时数据处理场景。
Storm 工作原理
在 Storm 的集群中有两种节点,控制节点Master Node
和工作节点Worker Node
。Master Node
上运行Nimbus
进程,用于资源分配与状态监控。Worker Node
上运行Supervisor
进程,监听工作任务,启动executor
执行。整个 Storm 集群依赖zookeeper
负责公共数据存放、集群状态监听、任务分配等功能。
用户提交给 Storm 的数据处理程序称为topology
,它处理的最小消息单位是tuple
,一个任意对象的数组。topology
由spout
和bolt
构成,spout
是产生tuple
的源头,bolt
可以订阅任意spout
或bolt
发出的tuple
进行处理。
Storm with CKafka
Storm 可以把 CKafka 作为spout
,消费数据进行处理;也可以作为bolt
,存放经过处理后的数据提供给其它组件消费。
测试环境
Centos6.8系统
package | version |
---|---|
maven | 3.5.0 |
storm | 2.1.0 |
ssh | 5.3 |
Java | 1.8 |
前提条件
下载并安装JDK 8。具体操作,请参见 Download JDK 8。
下载并安装Storm,参考Apache Storm downloads。
已创建 CKafka 实例。
操作步骤
步骤1. 获取 CKafka 实例接入地址
登录 [CKafka 控制台] 。
在左侧导航栏选择【实例列表】,单击实例的“ID”,进入实例基本信息页面。
在实例的基本信息页面的【接入方式】模块,可获取实例的接入地址。
步骤2. 创建 Topic
在实例基本信息页面,选择顶部【Topic管理】页签。
在Topic管理页面,单击【新建】,创建一个 Topic。
步骤3. 添加 Maven 依赖
pom.xml 配置如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>storm</groupId>
<artifactId>storm</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>storm</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.2.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>ExclamationTopology</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
步骤4. 生产消息
使用 spout/bolt
topology 代码:
//TopologyKafkaProducerSpout.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.bolt.KafkaBolt;
import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerSpout {
//申请的ckafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//设置producer属性
//函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//属性参考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//创建写入kafka的bolt,默认使用fields("key" "message")作为生产消息的key和message,也可以在FieldNameBasedTupleToKafkaMapper()中指定
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(properties)
.withTopicSelector(new DefaultTopicSelector(TOPIC))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
TopologyBuilder builder = new TopologyBuilder();
//一个顺序生成消息的spout类,输出field是sentence
SerialSentenceSpout spout = new SerialSentenceSpout();
AddMessageKeyBolt bolt = new AddMessageKeyBolt();
builder.setSpout("kafka-spout", spout, 1);
//为tuple加上生产到ckafka所需要的fields
builder.setBolt("add-key", bolt, 1).shuffleGrouping("kafka-spout");
//写入ckafka
builder.setBolt("sendToKafka", kafkaBolt, 8).shuffleGrouping("add-key");
Config config = new Config();
if (args != null && args.length > 0) {
//集群模式,用于打包jar,并放到storm运行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
创建一个顺序生成消息的 spout 类:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class SerialSentenceSpout extends BaseRichSpout {
private SpoutOutputCollector spoutOutputCollector;
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.spoutOutputCollector = spoutOutputCollector;
}
@Override
public void nextTuple() {
Utils.sleep(1000);
//生产一个UUID字符串发送给下一个组件
spoutOutputCollector.emit(new Values(UUID.randomUUID().toString()));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
}
为tuple
加上 key、message 两个字段,当 key 为 null 时,生产的消息均匀分配到各个 partition,指定了 key 后将按照 key 值 hash 到特定 partition 上:
//AddMessageKeyBolt.java
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
public class AddMessageKeyBolt extends BaseBasicBolt {
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//取出第一个filed值
String messae = tuple.getString(0);
// System.out.println(messae);
//发送给下一个组件
basicOutputCollector.emit(new Values(null, messae));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
//创建发送给下一个组件的schema
outputFieldsDeclarer.declare(new Fields("key", "message"));
}
}
使用 trident
使用 trident 类生成 topology
//TopologyKafkaProducerTrident.java
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.trident.TridentKafkaStateFactory;
import org.apache.storm.kafka.trident.TridentKafkaStateUpdater;
import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper;
import org.apache.storm.kafka.trident.selector.DefaultTopicSelector;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Properties;
public class TopologyKafkaProducerTrident {
//申请的ckafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//设置producer属性
//函数参考:https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
//属性参考:http://kafka.apache.org/0102/documentation.html
Properties properties = new Properties();
properties.put("bootstrap.servers", BOOTSTRAP_SERVERS);
properties.put("acks", "1");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//设置Trident
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(properties)
.withKafkaTopicSelector(new DefaultTopicSelector(TOPIC))
//设置使用fields("key", "value")作为消息写入 不像FieldNameBasedTupleToKafkaMapper有默认值
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("key", "value"));
TridentTopology builder = new TridentTopology();
//一个批量产生句子的spout,输出field为sentence
builder.newStream("kafka-spout", new TridentSerialSentenceSpout(5))
.each(new Fields("sentence"), new AddMessageKey(), new Fields("key", "value"))
.partitionPersist(stateFactory, new Fields("key", "value"), new TridentKafkaStateUpdater(), new Fields());
Config config = new Config();
if (args != null && args.length > 0) {
//集群模式,用于打包jar,并放到storm运行
config.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
} else {
//本地模式
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.build());
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();
}
}
private static class AddMessageKey extends BaseFunction {
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
//取出第一个filed值
String messae = tridentTuple.getString(0);
//System.out.println(messae);
//发送给下一个组件
//tridentCollector.emit(new Values(Integer.toString(messae.hashCode()), messae));
tridentCollector.emit(new Values(null, messae));
}
}
}
创建一个批量生成消息的 spout 类:
//TridentSerialSentenceSpout.java
import org.apache.storm.Config;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.UUID;
public class TridentSerialSentenceSpout implements IBatchSpout {
private final int batchCount;
public TridentSerialSentenceSpout(int batchCount) {
this.batchCount = batchCount;
}
@Override
public void open(Map map, TopologyContext topologyContext) {
}
@Override
public void emitBatch(long l, TridentCollector tridentCollector) {
Utils.sleep(1000);
for(int i = 0; i < batchCount; i++){
tridentCollector.emit(new Values(UUID.randomUUID().toString()));
}
}
@Override
public void ack(long l) {
}
@Override
public void close() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
Config conf = new Config();
conf.setMaxTaskParallelism(1);
return conf;
}
@Override
public Fields getOutputFields() {
return new Fields("sentence");
}
}
步骤5. 消费消息
使用 spout/bolt
//TopologyKafkaConsumerSpout.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.kafka.spout.*;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerSpout {
//申请的ckafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
//设置重试策略
KafkaSpoutRetryService kafkaSpoutRetryService = new KafkaSpoutRetryExponentialBackoff(
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(500),
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2),
Integer.MAX_VALUE,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10)
);
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//设置consumer参数
//函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//参数参考http://kafka.apache.org/0102/documentation.html
KafkaSpoutConfig spoutConfig = KafkaSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
}})
.setOffsetCommitPeriodMs(10_000) //设置自动确认时间
.setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
.setRetry(kafkaSpoutRetryService)
.setRecordTranslator(trans)
.build();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafka-spout", new KafkaSpout(spoutConfig), 1);
builder.setBolt("bolt", new BaseRichBolt(){
private OutputCollector outputCollector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.outputCollector = outputCollector;
}
@Override
public void execute(Tuple tuple) {
System.out.println(tuple.getStringByField("value"));
outputCollector.ack(tuple);
}
}, 1).shuffleGrouping("kafka-spout");
Config config = new Config();
config.setMaxSpoutPending(20);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
}
else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", config, builder.createTopology());
Utils.sleep(20000);
cluster.killTopology("test");
cluster.shutdown();
}
}
}
使用 trident
//TopologyKafkaConsumerTrident.java
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.kafka.spout.ByTopicRecordTranslator;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutConfig;
import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.BaseFunction;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.tuple.TridentTuple;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.HashMap;
import static org.apache.storm.kafka.spout.FirstPollOffsetStrategy.LATEST;
public class TopologyKafkaConsumerTrident {
//申请的ckafka实例ip:port
private final static String BOOTSTRAP_SERVERS = "xx.xx.xx.xx:xxxx";
//指定要将消息写入的topic
private final static String TOPIC = "storm_test";
public static void main(String[] args) throws Exception {
ByTopicRecordTranslator<String, String> trans = new ByTopicRecordTranslator<>(
(r) -> new Values(r.topic(), r.partition(), r.offset(), r.key(), r.value()),
new Fields("topic", "partition", "offset", "key", "value"));
//设置consumer参数
//函数参考http://storm.apache.org/releases/1.1.0/javadocs/org/apache/storm/kafka/spout/KafkaSpoutConfig.Builder.html
//参数参考http://kafka.apache.org/0102/documentation.html
KafkaTridentSpoutConfig spoutConfig = KafkaTridentSpoutConfig.builder(BOOTSTRAP_SERVERS, TOPIC)
.setProp(new HashMap<String, Object>(){{
put(ConsumerConfig.GROUP_ID_CONFIG, "test-group1"); //设置group
put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //设置自动确认
put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "50000"); //设置session超时
put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000"); //设置请求超时
}})
.setFirstPollOffsetStrategy(LATEST) //设置拉取最新消息
.setRecordTranslator(trans)
.build();
TridentTopology builder = new TridentTopology();
// Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutTransactional(spoutConfig)); //事务型
Stream spoutStream = builder.newStream("spout", new KafkaTridentSpoutOpaque(spoutConfig));
spoutStream.each(spoutStream.getOutputFields(), new BaseFunction(){
@Override
public void execute(TridentTuple tridentTuple, TridentCollector tridentCollector) {
System.out.println(tridentTuple.getStringByField("value"));
tridentCollector.emit(new Values(tridentTuple.getStringByField("value")));
}
}, new Fields("message"));
Config conf = new Config();
conf.setMaxSpoutPending(20);conf.setNumWorkers(1);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.build());
}
else {
StormTopology stormTopology = builder.build();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("test", conf, stormTopology);
Utils.sleep(10000);
cluster.killTopology("test");
cluster.shutdown();stormTopology.clear();
}
}
}
步骤6. 提交 Storm
使用 mvn package 编译后,可以提交到本地集群进行 debug 测试,也可以提交到正式集群进行运行。
storm jar your_jar_name.jar topology_name
storm jar your_jar_name.jar topology_name tast_name