Go SDK

最近更新时间: 2026-06-30 15:06:00

操作场景

本文以调用 Go SDK 为例介绍通过开源 SDK 实现消息收发的操作过程,帮助您更好地理解消息收发的完整过程。

前提条件

操作步骤

  1. 执行如下命令在客户端环境安装所需包。

    go get "github.com/rabbitmq/amqp091-go"
    
  2. 安装完成后,即可引入到您的 GO 工程文件中。

    import (amqp "github.com/rabbitmq/amqp091-go")
    
  3. 引入之后即可在您的项目中使用客户端。

使用示例

  1. 建立连接和通信信道。

    // 所需参数
    const (
        host     = "amqp-xx.rabbitmq.x.xxxxxtdmq.com" // 服务接入地址
        username = "roleName" // 角色控制台对应的角色名称
        password = "eyJrZX..." // 角色对应的密钥
        vhost    = "amqp-xx|Vhost" // 要使用的Vhost全称
    )
    // 创建连接
    conn, err := amqp.Dial("amqp://" + username + ":" + password + "@" + host + ":5672/" + vhost)
    failOnError(err, "Failed to connect to RabbitMQ")
    defer func(conn *amqp.Connection) {
        err := conn.Close()
        if err != nil {
        }
    }(conn)
    
    // 建立通道
    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer func(ch *amqp.Channel) {
        err := ch.Close()
        if err != nil {
        }
    }(ch)
    
    参数 说明
    host 集群接入地址,在集群基本信息页面的客户端接入模块获取。
    username 用户名称,填写在控制台创建的用户名称。
    password 用户密码,填写在控制台创建用户时填写的密码。
    vhost Vhost 名称,在控制台 Vhost 列表获取。
  2. 声明交换机。

    // 声明交换机 (名称和类型需要与存在的交换机保持一致)
        err = ch.ExchangeDeclare(
            "logs-exchange",   // 交换机名称
            "fanout", // 交换机类型
            true,     // durable
            false,    // auto-deleted
            false,    // internal
            false,    // no-wait
            nil,      // arguments
        )
        failOnError(err, "Failed to declare a exchange")
    
  3. 发布消息。
    消息可发给交换机,也可以直接发到指定队列 ( hello world 和 work queues 消息模型)。

  • 发布消息到交换机:

    // 消息内容
    body := "this is new message."
    // 发布消息到交换机   
    err = ch.Publish(
        "logs-exchange",         // exchange
        "", // routing key   (根据使用的交换机类型可选择的是否需要routing key),如果不选择交换机,该参数为消息队列名称
        false,      // mandatory
        false,      // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        []byte(body),
        })
    failOnError(err, "Failed to publish a message")
    
  • 发布消息到指定队列:

    // 发布消息到指定的消息队列
    err = ch.Publish(
    "",         // exchange
    queue.Name, // routing key
    false,      // mandatory
    false,      // immediate
    amqp.Publishing{
        ContentType: "text/plain",
        Body:        []byte(body),
    })
    failOnError(err, "Failed to publish a message")
    
  1. 订阅消息。

    // 创建消费者并消费指定消息队列中的消息
    msgs, err := ch.Consume(
        "message-queue", // message-queue
        "",           // consumer
        false,        // 设置为非自动确认(可根据需求自己选择)
        false,        // exclusive
        false,        // no-local
        false,        // no-wait
        nil,          // args
    )
    failOnError(err, "Failed to register a consumer")
    
    // 获取消息队列中的消息
    forever := make(chan bool)
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            t := time.Duration(1)
            time.Sleep(t * time.Second)
            // 手动回复ack
            d.Ack(false)
        }
    }()
    log.Printf(" [Consumer] Waiting for messages.")
    <-forever
    
  2. 消费者使用 routing key。

    // 需要在消息队列中指定 交换机 和 routing key
    err = ch.QueueBind(
        q.Name, // queue name
        "routing_key",     // routing key
        "topic_demo", // exchange
        false,
        nil,
    )
    failOnError(err, "Failed to bind a queue")
    

说明

详细使用示例可参见 DemoRabbitMQ 官网