RabbitMQ 客户端模块

基于 amqp091-go 的 RabbitMQ 客户端配置模块,集成到 mcube IoC 容器中,支持连接自动重连、Publisher/Consumer 自动恢复、OpenTelemetry 链路追踪,以及静态凭证和 Vault KV 静态凭证两种凭证加载模式。

开发环境搭建

docker run -d \
  --name rabbitmq \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=guest \
  -e RABBITMQ_DEFAULT_PASS=guest \
  rabbitmq:3-management

通过 http://localhost:15672 访问管理界面(默认账号 guest / guest)。

快速开始

1. 导入包(触发自动注册)

import (
    "github.com/infraboard/mcube/v2/ioc"
    _ "github.com/infraboard/mcube/v2/ioc/config/rabbitmq"
)

2. 初始化并获取客户端

func main() {
    // 从配置文件或环境变量加载
    ioc.DevelopmentSetupWithPath("etc/application.toml")

    // 创建 Publisher
    pub, err := rabbitmq.NewPublisher()
    if err != nil {
        panic(err)
    }
    defer pub.Close()

    // 创建 Consumer
    sub, err := rabbitmq.NewConsumer()
    if err != nil {
        panic(err)
    }
    defer sub.Close()
}

3. 最简 TOML 配置(etc/application.toml

[rabbitmq]
  url = "amqp://guest:guest@localhost:5672/"

配置方式

环境变量

所有配置项均可通过 RABBITMQ_ 前缀的环境变量设置。

export RABBITMQ_URL=amqp://guest:guest@localhost:5672/
export RABBITMQ_TIMEOUT=5s
export RABBITMQ_HEART_BEAT=10s
export RABBITMQ_RECONNECT_INTERVAL=10s
export RABBITMQ_MAX_RECONNECT_ATTEMPTS=0   # 0 表示无限重连
export RABBITMQ_TRACE=true

# 凭证模式相关
export RABBITMQ_CREDENTIAL_MODE=static     # static | vault-secret
export RABBITMQ_VAULT_PATH=myapp/rabbitmq
export RABBITMQ_VAULT_URL_FIELD=url

TOML 文件

[rabbitmq]
  url                    = "amqp://guest:guest@localhost:5672/"
  timeout                = "5s"
  heart_beat             = "10s"
  reconect_interval      = "10s"      # 重连间隔
  max_reconnect_attempts = 0          # 最大重连次数,0 表示无限重连
  trace                  = true       # 启用 OpenTelemetry 链路追踪

  # 凭证模式(可选,默认 static)
  credential_mode = "static"
  vault_path      = ""
  vault_url_field = "url"

凭证模式

通过 credential_mode 字段(或 RABBITMQ_CREDENTIAL_MODE 环境变量)控制连接凭证的来源。

静态凭证(static)

默认模式。直接从配置文件或环境变量中读取 url,URL 中包含用户名和密码。

[rabbitmq]
  credential_mode = "static"   # 默认,可省略
  url             = "amqp://guest:guest@localhost:5672/"
  • ✅ 简单直接,适合开发和小规模部署
  • ❌ 密码明文存储在配置/环境变量中,安全性较低

Vault KV 静态凭证(vault-secret)

从 HashiCorp Vault 的 KV v2 引擎中读取完整的 RabbitMQ 连接 URL,密码由 Vault 统一管理,应用配置文件中不再出现密码明文。

配置示例:

[rabbitmq]
  credential_mode = "vault-secret"
  vault_path      = "myapp/rabbitmq"   # KV 路径(相对于挂载点)
  vault_url_field = "url"              # Vault 返回数据中的 URL 字段名,默认 "url"

[vault]
  address     = "http://127.0.0.1:8200"
  auth_method = "token"
  token       = "hvs.xxxxxx"

Vault 侧配置(参考):

# 1. 启动 Vault 开发服务
docker run -itd --name=vault \
  --cap-add=IPC_LOCK \
  -p 8200:8200 \
  -e 'VAULT_DEV_ROOT_TOKEN_ID=myroot' \
  -e 'VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200' \
  hashicorp/vault:latest

# 2. 写入 RabbitMQ 连接 URL
docker exec \
  -e VAULT_ADDR='http://127.0.0.1:8200' \
  -e VAULT_TOKEN='myroot' \
  vault vault kv put secret/myapp/rabbitmq \
    url="amqp://user:strongpassword@localhost:5672/vhost"

# 3. 验证读取
docker exec \
  -e VAULT_ADDR='http://127.0.0.1:8200' \
  -e VAULT_TOKEN='myroot' \
  vault vault kv get secret/myapp/rabbitmq

vault_path 是相对于挂载点的路径,默认挂载点为 secret(对应 vault 模块的 kv_mount_path),实际请求路径为 secret/data/myapp/rabbitmq,此处只需填 myapp/rabbitmq

工作流程:

应用启动 └─→ 连接 Vault(依赖 vault 模块已初始化) └─→ 读取 KV 路径中的 url 字段 └─→ 使用读取到的 URL 建立 RabbitMQ 连接
  • ✅ 连接 URL(含密码)不出现在应用配置文件中
  • ✅ 可以在 Vault 中集中管理和审计凭证访问
  • ❌ 当前仅支持静态凭证,Vault 中密码轮换后需重启服务生效

消息模式

模块封装了三种常用消息模式,通过 Publisher 发布,通过 Consumer 订阅。

队列模式(Direct)

多个消费者竞争消费同一队列,常用于任务分发。每条消息只会被一个消费者实例处理。

发布:

publisher, err := rabbitmq.NewPublisher()
if err != nil {
    panic(err)
}
defer publisher.Close()

msg := rabbitmq.NewQueueMessage("orders", []byte(`{"order_id":1}`))
err = publisher.Publish(ctx, msg)

订阅(竞争消费):

consumer_a, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}

err = consumer_a.DirectSubscribe(ctx, "event_bus_direct", "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("a received:", string(msg.Body))
    return nil
})

consumer_b, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}

err = consumer_b.DirectSubscribe(ctx, "event_bus_direct", "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("b received:", string(msg.Body))
    return nil
})

多个消费者绑定同一队列时,消息在消费者间轮询分发,每条消息只被处理一次。

发布订阅模式(Fanout)

消息广播到所有已绑定队列,每个订阅者都能收到完整消息副本,常用于事件通知。

发布:

publisher, err := rabbitmq.NewPublisher()
if err != nil {
    panic(err)
}
defer publisher.Close()

msg := rabbitmq.NewFanoutMessage("event_bus_fanout", []byte(`{"event":"login"}`))
err = publisher.Publish(ctx, msg)

订阅(广播,每个实例都收到):

consumer1, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}
err = consumer1.FanoutSubscribe(ctx, "event_bus_fanout", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("consumer1:", string(msg.Body))
    return nil
})

consumer2, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}
err = consumer2.FanoutSubscribe(ctx, "event_bus_fanout", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("consumer2:", string(msg.Body))
    return nil
})

主题模式(Topic)

基于通配符路由键匹配,灵活过滤消息,常用于日志分类、分级告警等场景。

通配符含义
*匹配一个单词
#匹配零或多个单词

发布:

publisher, err := rabbitmq.NewPublisher()
if err != nil {
    panic(err)
}
defer publisher.Close()

// 路由键 event_bus.logs.info
logMsg := rabbitmq.NewTopicMessage("event_bus_topic", "event_bus.logs.info", []byte("log message"))
err = publisher.Publish(ctx, logMsg)

// 路由键 event_bus.alerts.info
alertMsg := rabbitmq.NewTopicMessage("event_bus_topic", "event_bus.alerts.info", []byte("alert message"))
err = publisher.Publish(ctx, alertMsg)

订阅(按路由键过滤):

// 只收 logs 类消息
logConsumer, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}
err = logConsumer.TopicSubscribe(ctx, "event_bus_topic", "event_bus.logs.*", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("log:", string(msg.Body))
    return nil
})

// 只收 alerts 类消息
alertConsumer, err := rabbitmq.NewConsumer()
if err != nil {
    panic(err)
}
err = alertConsumer.TopicSubscribe(ctx, "event_bus_topic", "event_bus.alerts.*", func(ctx context.Context, msg *rabbitmq.Message) error {
    fmt.Println("alert:", string(msg.Body))
    return nil
})

链路追踪

tracetrue 且 mcube 的 trace 模块启用了 OpenTelemetry 时,每次 Publish 和消息消费都会自动创建 Span,并通过消息 Headers 传播 TraceContext。

[rabbitmq]
  trace = true

[trace]
  enable   = true
  endpoint = "http://jaeger:14268/api/traces"

完整配置参数

参数环境变量类型默认值说明
urlRABBITMQ_URLstringamqp://guest:guest@localhost:5672/RabbitMQ 连接 URL
timeoutRABBITMQ_TIMEOUTduration5s连接超时时间
heart_beatRABBITMQ_HEART_BEATduration10s心跳间隔
reconect_intervalRABBITMQ_RECONNECT_INTERVALduration10s断线重连间隔
max_reconnect_attemptsRABBITMQ_MAX_RECONNECT_ATTEMPTSint0最大重连次数,0 为无限重连
traceRABBITMQ_TRACEboolfalse启用 OpenTelemetry 链路追踪
credential_modeRABBITMQ_CREDENTIAL_MODEstringstatic凭证模式:static / vault-secret
vault_pathRABBITMQ_VAULT_PATHstringVault KV 路径(相对于挂载点)
vault_url_fieldRABBITMQ_VAULT_URL_FIELDstringurlVault 返回数据中的连接 URL 字段名