RabbitMQ 使用
开发环境搭建
docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:3-management
通过 http://localhost:15672 访问管理界面
配置组件
- toml
- env
[rabbitmq]
url = "amqp://guest:guest@localhost:5672/"
RABBITMQ_URL="amqp://guest:guest@localhost:5672/"
基本使用
package rabbitmq_test
import (
"context"
"testing"
"time"
"github.com/infraboard/mcube/v2/ioc/config/rabbitmq"
)
const (
FANOUT_SUBJECT = "event_bus_fanout"
TOPIC_SUBJECT = "event_bus_topic"
DIRECT_SUBJECT = "event_bus_direct"
)
func TestFanoutSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()
msg := rabbitmq.NewFanoutMessage(FANOUT_SUBJECT, []byte("test"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
}
func TestFanoutSubscribe(t *testing.T) {
consumer1, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = consumer1.FanoutSubscribe(t.Context(), FANOUT_SUBJECT, func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("consumer1: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
consumer2, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = consumer2.FanoutSubscribe(t.Context(), FANOUT_SUBJECT, func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("consumer2: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(60 * time.Second)
}
func TestTopicSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()
logMsg := rabbitmq.NewTopicMessage(TOPIC_SUBJECT, "event_bus.logs.info", []byte("log"))
err = publisher.Publish(t.Context(), logMsg)
if err != nil {
t.Fatal(err)
}
alertMsg := rabbitmq.NewTopicMessage(TOPIC_SUBJECT, "event_bus.alerts.info", []byte("alert"))
err = publisher.Publish(t.Context(), alertMsg)
if err != nil {
t.Fatal(err)
}
}
func TestTopicSubscribe(t *testing.T) {
logConsumer, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = logConsumer.TopicSubscribe(t.Context(), TOPIC_SUBJECT, "event_bus.logs.*", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log(string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
alertConsumer, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = alertConsumer.TopicSubscribe(t.Context(), TOPIC_SUBJECT, "event_bus.alerts.*", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log(string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(60 * time.Second)
}
func TestDirectSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()
msg := rabbitmq.NewQueueMessage("orders", []byte("test1"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
msg = rabbitmq.NewQueueMessage("orders", []byte("test2"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
msg = rabbitmq.NewQueueMessage("orders", []byte("test3"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
}
func TestDirectSubscribe(t *testing.T) {
consumer_a, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = consumer_a.DirectSubscribe(t.Context(), DIRECT_SUBJECT, "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("a: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
consumer_b, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}
err = consumer_b.DirectSubscribe(t.Context(), DIRECT_SUBJECT, "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("b: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}
time.Sleep(60 * time.Second)
}