Skip to main content

消息总线

当前支持的消息总线类型:

  • kafka
  • nats
  • rabbitmq

配置组件


基本使用

基于nats

基于nats的消息总线, 导入nats驱动即可, nats具体配置 见nats组件

import (
// nats总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/nats"
)

具体样例

import (
"context"
"fmt"
"time"

"github.com/infraboard/mcube/v2/ioc/config/bus"
// nats总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/nats"

"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/server"
)

const (
TEST_SUBJECT = "event_bus"
)

func main() {
ioc.DevelopmentSetup()

// 消息生产者
bus.GetService().TopicSubscribe(context.Background(), TEST_SUBJECT, func(e *bus.Event) {
fmt.Println(string(e.Data))
})

// 发布消息
go func() {
for {
time.Sleep(1 * time.Second)
err := bus.GetService().Publish(context.Background(), &bus.Event{
Subject: TEST_SUBJECT,
Data: []byte("test"),
})
if err != nil {
fmt.Println(err)
}
}
}()

// 消息消费者
// 启动应用
err := server.Run(context.Background())
if err != nil {
panic(err)
}
}

基于kafka

基于kafka的消息总线, 导入kafka驱动即可, nats具体配置 见kafka组件

import (
// kafka总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/kafka"
)

具体样例

package main

import (
"context"
"fmt"
"time"

"github.com/infraboard/mcube/v2/ioc/config/bus"
// kafka总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/kafka"

"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/server"
)

const (
TEST_SUBJECT = "event_bus"
)

func main() {
ioc.DevelopmentSetup()

// 消息生产者
bus.GetService().TopicSubscribe(context.Background(), TEST_SUBJECT, func(e *bus.Event) {
fmt.Println(string(e.Data))
})

// 发布消息
go func() {
for {
time.Sleep(1 * time.Second)
err := bus.GetService().Publish(context.Background(), &bus.Event{
Subject: TEST_SUBJECT,
Data: []byte("test"),
})
if err != nil {
fmt.Println(err)
}
}
}()

// 消息消费者
// 启动应用
err := server.Run(context.Background())
if err != nil {
panic(err)
}
}

基于RabbitMQ

基于rabbitmq的消息总线, 导入rabbitmq驱动即可, rabbitmq具体配置 见rabbitmq组件

import (
// rabbitmq总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/rabbitmq"
)

具体样例

package main

import (
"context"
"fmt"
"time"

"github.com/infraboard/mcube/v2/ioc/config/bus"
// rabbitmq总线
_ "github.com/infraboard/mcube/v2/ioc/config/bus/rabbitmq"

"github.com/infraboard/mcube/v2/ioc"
"github.com/infraboard/mcube/v2/ioc/server"
)

const (
TEST_SUBJECT = "event_bus"
)

func main() {
ioc.DevelopmentSetup()

// 消息生产者
bus.GetService().TopicSubscribe(context.Background(), TEST_SUBJECT, func(e *bus.Event) {
fmt.Println(string(e.Data))
})

// 发布消息
go func() {
for {
time.Sleep(1 * time.Second)
err := bus.GetService().Publish(context.Background(), &bus.Event{
Subject: TEST_SUBJECT,
Data: []byte("test"),
})
if err != nil {
fmt.Println(err)
}
}
}()

// 消息消费者
// 启动应用
err := server.Run(context.Background())
if err != nil {
panic(err)
}
}