Skip to main content

RabbitMQ 使用

开发环境搭建

docker run -d \
--name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=guest \
-e RABBITMQ_DEFAULT_PASS=guest \
rabbitmq:3-management

通过 http://localhost:15672 访问管理界面

配置组件

[rabbitmq]
url = "amqp://guest:guest@localhost:5672/"

基本使用

package rabbitmq_test

import (
"context"
"testing"
"time"

"github.com/infraboard/mcube/v2/ioc/config/rabbitmq"
)

const (
FANOUT_SUBJECT = "event_bus_fanout"
TOPIC_SUBJECT = "event_bus_topic"
DIRECT_SUBJECT = "event_bus_direct"
)

func TestFanoutSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()

msg := rabbitmq.NewFanoutMessage(FANOUT_SUBJECT, []byte("test"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
}

func TestFanoutSubscribe(t *testing.T) {
consumer1, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = consumer1.FanoutSubscribe(t.Context(), FANOUT_SUBJECT, func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("consumer1: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

consumer2, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = consumer2.FanoutSubscribe(t.Context(), FANOUT_SUBJECT, func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("consumer2: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

time.Sleep(60 * time.Second)
}

func TestTopicSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()

logMsg := rabbitmq.NewTopicMessage(TOPIC_SUBJECT, "event_bus.logs.info", []byte("log"))
err = publisher.Publish(t.Context(), logMsg)
if err != nil {
t.Fatal(err)
}

alertMsg := rabbitmq.NewTopicMessage(TOPIC_SUBJECT, "event_bus.alerts.info", []byte("alert"))
err = publisher.Publish(t.Context(), alertMsg)
if err != nil {
t.Fatal(err)
}
}

func TestTopicSubscribe(t *testing.T) {
logConsumer, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = logConsumer.TopicSubscribe(t.Context(), TOPIC_SUBJECT, "event_bus.logs.*", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log(string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

alertConsumer, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = alertConsumer.TopicSubscribe(t.Context(), TOPIC_SUBJECT, "event_bus.alerts.*", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log(string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

time.Sleep(60 * time.Second)
}

func TestDirectSubscribePublish(t *testing.T) {
publisher, err := rabbitmq.NewPublisher()
if err != nil {
t.Fatal(err)
}
defer publisher.Close()

msg := rabbitmq.NewQueueMessage("orders", []byte("test1"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}

msg = rabbitmq.NewQueueMessage("orders", []byte("test2"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}

msg = rabbitmq.NewQueueMessage("orders", []byte("test3"))
err = publisher.Publish(t.Context(), msg)
if err != nil {
t.Fatal(err)
}
}

func TestDirectSubscribe(t *testing.T) {
consumer_a, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = consumer_a.DirectSubscribe(t.Context(), DIRECT_SUBJECT, "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("a: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

consumer_b, err := rabbitmq.NewConsumer()
if err != nil {
t.Fatal(err)
}

err = consumer_b.DirectSubscribe(t.Context(), DIRECT_SUBJECT, "orders", func(ctx context.Context, msg *rabbitmq.Message) error {
t.Log("b: " + string(msg.Body))
return nil
})
if err != nil {
t.Fatal(err)
}

time.Sleep(60 * time.Second)
}