Go SDK

最近更新时间: 2024-06-12 15:06:00

操作场景

本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

  1. 在客户端环境引入 pulsar-client-go 库。

    1. 在客户端环境执行如下命令下载 Pulsar 客户端相关的依赖包。

      go get github.com/apache/pulsar-client-go/pulsar@v0.9.0
    2. 安装完成后,即可通过以下代码引用到您的 Go 工程文件中。

      import "github.com/apache/pulsar-client-go/pulsar"
  2. 创建 Pulsar Client。

    // 创建pulsar客户端
    client, err := pulsar.NewClient(pulsar.ClientOptions{
       // 服务接入地址
       URL: serviceUrl,
       // 授权角色密钥
       Authentication:    pulsar.NewAuthenticationToken(authentication),
       ListenerName: RouterID
       OperationTimeout:  30 * time.Second,
       ConnectionTimeout: 30 * time.Second,
    })
    
    if err != nil {
       log.Fatalf("Could not instantiate Pulsar client: %v", err)
    }
    
    defer client.Close()
    参数 说明
    serviceUrl 集群接入地址,可以在控制台 集群管理 接入点页面查看并复制。
    Authentication 角色密钥,在 角色管理 页面复制密钥列复制。
    RouterID
  3. 创建生产者。

    // 使用客户端创建生产者
    producer, err := client.CreateProducer(pulsar.ProducerOptions{
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
        Topic: "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
    })
    
    if err != nil {
        log.Fatal(err)
    }
    defer producer.Close()

    说明:

    Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。

  4. 发送消息。

    // 发送消息
    _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
        // 消息内容
        Payload: []byte("hello go client, this is a message."),
        // 业务key
        Key: "yourKey",
        // 业务参数
        Properties: map[string]string{"key": "value"},
    })
  5. 创建消费者。

    // 使用客户端创建消费者
    consumer, err := client.Subscribe(pulsar.ConsumerOptions{
        // topic完整路径,格式为persistent://集群(租户)ID/命名空间/Topic名称
        Topic:            "persistent://pulsar-mmqwr5xx9n7g/sdk_go/topic1",
        // 订阅名称
        SubscriptionName: "topic1_sub",
        // 订阅模式
        Type:             pulsar.Shared,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer consumer.Close()

    说明:

    • subscriptionName 需要写入订阅名,可在消费管理界面查看。
    • Topic 名称需要填入完整路径,即 persistent://clusterid/namespace/Topic,clusterid/namespace/topic 的部分可以从控制台上 Topic管理 页面直接复制。
  6. 消费消息。

    // 获取消息
    msg, err := consumer.Receive(context.Background())
    if err != nil {
        log.Fatal(err)
    }
    // 模拟业务处理
    fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
               msg.ID(), string(msg.Payload()))
    
    // 消费成功,回复ack,消费失败根据业务需要选择回复nack或ReconsumeLater
    consumer.Ack(msg)
  7. 登录 TDMQ Pulsar 版控制台,依次点击 Topic 管理 > Topic 名称进入消费管理页面,点开订阅名下方右三角号,可查看生产消费记录。

    说明:

    上述是对消息的发布和订阅方式的简单介绍。更多操作可参见 Demo 或 Pulsar 官方文档。