Kafka 的事务功能是为了支持在分布式环境中实现原子性操作而设计的。它允许生产者在发送消息时确保消息的完整性和一致性,特别是在需要多条消息作为一个整体进行处理的场景中。以下是 Kafka 事务的主要概念和功能介绍。
事务相关概念
事务的基本概念
**原子性:**事务中的所有操作要么全部成功,要么全部失败。Kafka 确保在事务中发送的消息要么被成功写入到主题中,要么不写入。
**一致性:**事务在执行前后,数据的状态应该保持一致。
**隔离性:**事务之间的操作是相互独立的,一个事务的执行不应影响其他事务的执行。
**持久性:**一旦事务被提交,其结果是永久性的,即使系统崩溃也不会丢失。
事务的工作流程
Kafka 的事务工作流程主要包括以下几个步骤:
- **启动事务:**生产者在发送消息之前调用
initTransactions()方法来初始化事务。 - **发送消息:**生产者可以发送多条消息到一个或多个主题,这些消息会被标记为事务性消息。
- 提交或中止事务:
- **提交事务:**如果所有消息都成功发送,生产者调用
commitTransaction()方法来提交事务,所有消息将被写入到 Kafka。 - **中止事务:**如果在发送过程中发生错误,生产者可以调用
abortTransaction()方法来中止事务,所有消息将不会被写入。
- **提交事务:**如果所有消息都成功发送,生产者调用
事务的配置
要使用 Kafka 的事务功能,您需要在生产者配置中设置以下参数:
transactional.id:每个事务性生产者都需要一个唯一的标识符。这个 ID 用于标识事务的所有消息。acks:设置为 all 以确保所有副本都确认消息。enable.idempotence:设置为true以启用幂等性,确保消息不会被重复发送。
事务的限制
- 性能开销:使用事务会引入额外的性能开销,因为需要进行更多的协调和确认。
- 事务超时:Kafka 对事务有超时限制,默认情况下为 60 秒。如果事务在此时间内未提交或中止,将会被自动中止。
- 消费者的处理:消费者在处理事务性消息时需要注意,只有在事务提交后,消费者才能看到这些消息。
事务使用示例
producer
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Properties;
public class TransactionalProducerDemo {
public static void main(String[] args) {
// Kafka 配置
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "my-transactional-id"); // 事务 ID
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); // 启用幂等性
// 创建 Kafka 生产者
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务
producer.initTransactions();
try {
// 开始事务
producer.beginTransaction();
// 发送消息
for (int i = 0; i < 10; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("my-topic", "key-" + i, "value-" + i);
RecordMetadata metadata = producer.send(record).get(); // 发送消息并等待确认
System.out.printf("Sent message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), metadata.partition(), metadata.offset());
}
// 提交事务
producer.commitTransaction();
System.out.println("Transaction committed successfully.");
} catch (Exception e) {
// 如果发生异常,回滚事务
producer.abortTransaction();
System.err.println("Transaction aborted due to an error: " + e.getMessage());
} finally {
// 关闭生产者
producer.close();
}
}
}
consumer
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class TransactionalConsumerDemo {
public static void main(String[] args) {
// Kafka 配置
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Kafka broker 地址
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group"); // 消费者组 ID
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); // 只读取已提交的事务消息
// 创建 Kafka 消费者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 订阅主题
consumer.subscribe(Collections.singletonList("my-topic"));
try {
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Consumed message: key=%s, value=%s, partition=%d, offset=%d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
// 关闭消费者
consumer.close();
}
}
}
Kafka 事务管理
在 Kafka 中,事务管理涉及到多个组件和数据结构,以确保事务的原子性和一致性。事务信息的内存占用主要与以下几个方面有关:
事务 ID 和 Producer ID
事务 ID:每个事务都有一个唯一的事务 ID,用于标识该事务。事务 ID 是由生产者在发送消息时指定的,通常是一个字符串。
Producer ID:每个生产者在连接到 Kafka 时会被分配一个唯一的 Producer ID。这个 ID 用于标识生产者的消息,并确保消息的顺序性和幂等性。
事务状态管理
Kafka 使用一个称为 事务状态日志 的内部主题来管理事务的状态。这个日志记录了每个事务的状态(如进行中、已提交、已中止)以及与该事务相关的消息。事务状态日志的管理涉及以下几个方面:
- 内存中的数据结构:Kafka 在内存中维护一个数据结构(例如哈希表或映射),用于存储当前活动的事务信息。这些信息包括事务 ID、Producer ID、事务状态、时间戳等。
- 持久化存储:事务状态日志会被持久化到磁盘,以确保在 Kafka 服务器重启或故障恢复时能够恢复事务状态。
事务信息的内存占用
事务信息的内存占用主要取决于以下两个因素:
- 活动事务的数量:当前正在进行的事务数量直接影响内存占用。每个活动事务都会在内存中占用一定的空间。
- 事务的元数据:每个事务的元数据(例如事务 ID、Producer ID、状态等)也会占用内存。具体的内存占用量取决于这些元数据的大小。
事务的清理
为了防止内存占用过高,Kafka 会根据配置的过期时间定期检查并清理已完成的事务,默认保留 7 天,过期删除。
事务常见的 FullGC / OOM 问题
从事务管理可以看出,事务信息会占用大量内存。其中影响事务信息占用内存大小的最直接的两个因素就是:事务 ID 的数量和 Producer ID 的数量。
- 其中事务 ID 的数量指的是客户端往 Broker 初始化、提交事务的数量,这个与客户端的事务新增提交频率强相关。
- Producer ID 指的是 Broker 内每个 Topic 分区存储的 producer 状态信息,因此 Producer ID 的数量与 broker 的分区数量强相关。
在事务场景中,事务 ID和 Producer ID 强绑定,如果同一个和事务 ID 绑定的 Producer ID 往 broker 内所有的分区都发送消息,那么一个 broker 内的 Producer ID 的数量理论上最多能达到事务 ID 数量与 broker 内分区数量的乘积。假设一个实例下的事务 ID 数量为 t,一个 broker 下的分区数量为 p,那么 Producer ID 的数量最大能达到 t * p。说明:
因此,假设一个 broker 下的事务 ID 数量为 t,平均事务内存占用大小为 tb,一个 broker 下的分区数量为 p,平均一个 Producer ID 占用大小为 pb,那么该 broker 内存中关于事务信息占用的内存大小为:t * tb + t * p * pb。
可以看出有两种场景可能会导致内存占用暴涨:
- 客户端频繁往实例初始化新增提交新的事务 ID。
- 同一个事务 ID 往多个分区发送数据,Producer ID 的叉乘数量会上涨的非常恐怖,很容易将内存打满。
说明:
因此,无论是对 Flink 客户端还是自己实现的事务 producer,都尽量避免这两种场景。例如对于 Flink,可以适当降低 checkpoint 的频率,以减小由于事务 ID 前缀+随机串计算的事务 ID 变化的频率。另外就是尽量保证同一个事务 ID 往同一个分区发送数据。
Flink 使用事务注意事项
对于 Flink 有以下优化手段,来保证事务信息不会急剧膨胀:
- 客户端优化参数:Flink 加大
checkpoint间隔(详情可参见 社区 ISSUE)。 - Flink 生产任务可优化
sink.partitioner为Fixed模式。
flink 参数说明:https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/