01 VPC 网络接入

最近更新时间: 2025-01-15 17:01:00

操作背景

该任务以 Go 客户端为例指导您使用VPC网络接入消息队列 CKafka 并收发消息。

前提条件

操作步骤

步骤一:准备配置:

  1. 将下载的demo中的gokafkademo上传至linux服务器。
  2. 登陆linux系统,进入gokafkademo目录,执行以下命令添加依赖库。
go get -v gopkg.in/confluentinc/confluent-kafka-go.v1/kafka
  1. 修改配置文件 kafka.json。
{
  "topic": [
      "test"
  ],
  "bootstrapServers": [
      "xxx-.ap-changsha-ec.ckafka.tencentcloudmq.com:6000"
  ],
  "consumerGroupId": "yourConsumerId"
 }  
参数描述
topicTopic名称,您可以在控制台上【topic管理】页面复制。
img
bootstrapServers接入网络,在控制台的实例详情页面【接入方式】模块的网络列复制。
img
consumerGroupId您可以自定义设置,demo运行成功后可以在【Consumer Group】页面看到该消费者。

步骤二:发送消息

  1. 编写生产消息程序。
   package main
   import (
   "fmt"
   "gokafkademo/config"
   "log"
   "strings"
       "github.com/confluentinc/confluent-kafka-go/kafka"
   )
   func main() {
       cfg, err := config.ParseConfig("../../config/kafka.json")
   if err != nil {
       log.Fatal(err)
   }
       p, err := kafka.NewProducer(&kafka.ConfigMap{
       // 设置接入点,请通过控制台获取对应Topic的接入点。
       "bootstrap.servers": strings.Join(cfg.Servers, ","),
       // 用户不显示配置时,默认值为1。用户根据自己的业务情况进行设置
       "acks": 1,
       // 请求发生错误时重试次数,建议将该值设置为大于0,失败重试最大程度保证消息不丢失
       "retries": 0,
       // 发送请求失败时到下一次重试请求之间的时间
       "retry.backoff.ms": 100,
       // producer 网络请求的超时时间。
       "socket.timeout.ms": 6000,
       // 设置客户端内部重试间隔。
       "reconnect.backoff.max.ms": 3000,
   })
   if err != nil {
       log.Fatal(err)
   }
       defer p.Close()
       // 产生的消息 传递至报告处理程序
   go func() {
       for e := range p.Events() {
           switch ev := e.(type) {
           case *kafka.Message:
               if ev.TopicPartition.Error != nil {
                   fmt.Printf("Delivery failed: %v\n", ev.TopicPartition)
               } else {
                   fmt.Printf("Delivered message to %v\n", ev.TopicPartition)
               }
           }
       }
   }()
       // 异步发送消息
   topic := cfg.Topic[0]
   for _, word := range []string{"Confluent-Kafka", "Golang Client Message"} {
       _ = p.Produce(&kafka.Message{
           TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
           Value:          []byte(word),
       }, nil)
   }
       // 等待消息传递
   p.Flush(10 * 1000)
  1. 编译并运行程序发送消息。
   go run main.go
  1. 查看运行结果,示例如下。
   Delivered message to test[0]@628
   Delivered message to test[0]@629
  1. 在 CKafka 控制台的【topic管理】页面,选择对应的 Topic , 点击【更多】>【消息查询】,查看刚刚发送的消息。

步骤三:消费消息

  1. 编写消费消息程序。。
package main

import (
    "fmt"
    "gokafkademo/config"
    "log"
    "strings"

    "github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {

    cfg, err := config.ParseConfig("../../config/kafka.json")
    if err != nil {
        log.Fatal(err)
    }

    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        // 设置接入点,请通过控制台获取对应Topic的接入点。
        "bootstrap.servers": strings.Join(cfg.Servers, ","),
        // 设置的消息消费组
        "group.id":          cfg.ConsumerGroupId,
        "auto.offset.reset": "earliest",

        // 使用 Kafka 消费分组机制时,消费者超时时间。当 Broker 在该时间内没有收到消费者的心跳时,认为该消费者故障失败,Broker
        // 发起重新 Rebalance 过程。目前该值的配置必须在 Broker 配置group.min.session.timeout.ms=6000和group.max.session.timeout.ms=300000 之间
        "session.timeout.ms": 10000,
    })

    if err != nil {
        log.Fatal(err)
    }
    // 订阅的消息topic 列表
    err = c.SubscribeTopics(cfg.Topic, nil)
    if err != nil {
        log.Fatal(err)
    }

    for {
        msg, err := c.ReadMessage(-1)
        if err == nil {
            fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
        } else {
            // 客户端将自动尝试恢复所有的 error
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
        }
    }

    c.Close()
}
  1. 编译并运行程序消费消息。
go run main.go
  1. 查看运行结果,示例如下。
Message on test[0]@628: Confluent-Kafka
Message on test[0]@629: Golang Client Message
  1. 在CKafka 控制台的【Consumer Group】页面,选择对应的消费组名称,在主题名称输入topic名称,点击【查询详情】,查看消费详情。