Springboot整合RabbitMQ



基本介绍

MQ全称为Message Queue,消息队列是应用程序和应用程序之间的通信方法。

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。更准确的说是一种binary wire-level protocol(链接协议)。这是其和JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

RabbitMQ是一个 Erlang 开发的AMQP(Advanced Message Queuing Protocol )的开源实现。

RabbitMQ安装和基本配置

1、安装RabbitMQ服务

1
docker pull rabbitmq:3-management //注意docker pull rabbitmq如果安装这个是后面不能访问管理网页的

2、查看安装的RabbitMQ的镜像id

1
docker images

3、启动RabbitMQ

1
docker run -d -p 5672:5672 -p 15672:15672 --name myrabbitmq 镜像id

4、访问RabbitMQ管理平台

1
2
通过:[服务器ip:15672访问]
默认账号密码为:guest

5、添加队列queues

目前只需要用到blue,blue.emps,blue.news,coderblue.news

6、添加Exchanges交换机,并绑定队列

  • exchange.direct:类型 direct

  • exchange.topic:类型 topic

  • exchange.fanout:类型 fanout

注意:

  • Exchange一共有四种类型:direct、topic、headers 和fanout

  • 效率:fanout > direct > topic

  • Direct Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行比较,如果相等,则发送到该Binding对应的Queue中。(全路径匹配)

  • Topic Exchange:将消息中的Routing key与该Exchange关联的所有Binding中的Routing key进行对比,如果匹配上了,则发送到该Binding对应的Queue中。(与关联的对比匹配,可模糊匹配)。匹配规则如下:

    • *:匹配一个单词
    • #:匹配0个或多个字符
    • *,#:只能写在.号左右,且不能挨着字符
    • 单词和单词之间需要用.隔开
  • Fanout Exchange:直接将消息转发到所有binding的对应queue中,这种exchange在路由转发的时候,忽略Routing key。(广播到所有)

  • Headers Exchange:将消息中的headers与该Exchange相关联的所有Binging中的参数进行匹配,如果匹配上了,则发送到该Binding对应的Queue中。

感谢:CSDN博主-做个好人好吗

RabbitMQ基本使用

下面以Springboot工程(版本2.3.3.RELEASE)为例,操作rabbitmq

1、配置properties

1
2
3
4
spring.rabbitmq.host=服务器ip
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#spring.rabbitmq.port=5672

2、添加MQConfig配置文件,更改默认的序列化规则

1
2
3
4
5
6
7
8
@Configuration
public class MQConfig {

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

3、创建UserInfo实体类

1
2
3
4
5
6
7
8
9
10
/**
* 无参、有参构造都要、toString、set、get
*/
@Data
public class UserInfo {

private String userName;

private Integer age;
}

4、使用Junit5测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
@SpringBootTest
class RabbitmqApplicationTests {

@Autowired
RabbitTemplate rabbitTemplate;

@Autowired
AmqpAdmin amqpAdmin;

/**
* 单播:点对点发送map格式
*/
@Test
void contextLoads() {
// Message需要自己构造一个;定义消息体内容和消息头
// rabbitTemplate.send(exchage, routeKey, message);

// object默认当成消息体,只需传入要发送的对象,自动序列化发送给rabbitmq;
// rabbitTemplate.converAndSend(exchage,routeKey,object)
HashMap<String, Object> map = new HashMap<>();
map.put("msg", "这是converAndSend发送的消息");
map.put("data", Arrays.asList(2, "helloWorld", false));
// 默认使用java的序列化消息:application/x-java-serialized-object
rabbitTemplate.convertAndSend("exchange.direct", "blue.news", map);
}

/**
* 单播:点对点发送对象格式
*/
@Test
void contextLoadsObject() {
rabbitTemplate.convertAndSend("exchange.direct", "blue.users", new UserInfo("Kity", 20));
}

/**
* 广播
*/
@Test
void sendMsg() {
rabbitTemplate.convertAndSend("exchanges.fanout", "", new UserInfo("fanout", 20));
}

@Test
public void createExchange() {
// 创建交换器
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("创建完成");
// 创建队列
amqpAdmin.declareQueue(new Queue("amqpadmin.queue", true));
// 创建绑定规则
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,
"amqpadmin.exchange", "创建绑定规则", null));
}

/**
* 删除
*/
@Test
public void deleteQueue() {
// 删除队列
amqpAdmin.deleteQueue("amqpadmin.queue");
// 删除交换机
amqpAdmin.deleteExchange("blue.users");
}

/**
* 接收数据: 接收一个队列里就少一个
*/
@Test
public void receive() {
// 注意有参无参构造器都要
Object o = rabbitTemplate.receiveAndConvert("blue.users");
System.out.println(o.getClass());
System.err.println(o);
}
}

5、监听接收对应匹配队列的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 记得在启动类添加:@EnableRabbit,使用Rabbit注解
*/
@Service
public class UserInfoService {

@RabbitListener(queues = "blue.users",containerFactory = "rabbitListenerContainerFactory")
public void receive(UserInfo userInfo) {
System.out.println("接收到的userInfo:" + userInfo);
}

@RabbitListener(queues = "blue")
public void receiveMsg(Message message) {
System.out.println(Arrays.toString(message.getBody()));
System.out.println(message.getMessageProperties());
}

}

常用工作模式

Spring Boot整合引入依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>版本号</version>
</dependency>

第一种模型(直连)

在上图的模型中,有以下概念

  • P:是我们的生产者

  • C:是我们的消费者

  • 中间的框(queue):是一个队列-RabbitMQ代表使用者保留的消息缓冲区

1、定义消费者

1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello", durable = "false", autoDelete = "true"))
public class HelloCustomer {

@RabbitHandler
public void receive(String message) {
System.out.println("message=" + message);
}
}

其中参数解释:

  • queuesToDeclare:如果队列不存在就会先创建
  • value:队列名
  • durable:是否持久化,false为不支持
  • autoDelete:是否自动删除,true为是

2、定义生产者

1
2
3
4
5
6
7
@Resource
private RabbitTemplate rabbitTemplate;

public void testHello() {
// 发送给 hello 队列,消息为 hello world
rabbitTemplate.convertAndSend("hello", "hello world");
}

第二种模型(work queue)

1、定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Component
public class WorkCustomer {

/**
* 默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
* @param message
*/
@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive1(String message) {
System.out.println("work message1 = " + message);
}

@RabbitListener(queuesToDeclare = @Queue(value = "work"))
public void receive2(String message) {
System.out.println("work message2 = " + message);
}
}

2、定义生产者

1
2
3
4
5
public void testWork() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend("work", "work模型" + i);
}
}

注意:ack为false,手动确认。防止宕机后,没有被消费的消息丢失,而且实现能者多劳。

第三种模型(fanout)

1、定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
*
* fanout 模式就是广播模式~
* 消息来了,会发给所有的队列
**/
@Component
public class FanoutCustomer {

@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建的临时队列
exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机
)
})
public void receive1(String message) {
System.out.println("message1=fanout - " + message);
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建的临时队列
exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机
)
})
public void receive2(String message) {
System.out.println("message2=fanout - " + message);
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue("fanout_queue"),
exchange = @Exchange(value = "fanout_mine", type = "fanout") //绑定的交换机
)
})
public void receive3(String message) {
System.out.println("message3=fanout - " + message);
}
}

2、定义生产者

1
2
3
public void testFanout() {
rabbitTemplate.convertAndSend("fanout_mine", "", "Fanout的模型发送的消息");
}

如下图所示,会生成一个指定队列和两个临时队列

第四种模型(Routing)

Routing之订阅模型-Direct直连

1、定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Component
public class RouteCustomer {

@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), //自定交换机名称和类型
key = {"info", "error", "warn"})
})
public void receive1(String message) {
System.out.println("message1=" + message);
}

@RabbitListener(bindings = {
@QueueBinding(value = @Queue, //创建临时队列
exchange = @Exchange(value = "directs", type = "direct"), //自定交换机名称和类型
key = {"error"})
})
public void receive2(String message) {
System.out.println("message2=" + message);
}
}

2、定义生产者

1
2
3
4
public void testRoute() {
// 只让directs队列的key为info的接收消息
rabbitTemplate.convertAndSend("directs", "info", "发送info的key的路由信息");
}

Routing之订阅模型-Topic

注意:

*(星号)可以代替一个单词。
#(哈希)可以替代零个或多个单词。

1、定义消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class TopicCustomer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, exchange = @Exchange(type = "topic", name = "topics"),
key = {"user.save", "user.*"}
)
})
public void receive1(String message) {
System.out.println("message1=" + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue, exchange = @Exchange(type = "topic", name = "topics"),
key = {"order.#", "produce.#", "user.*"}
)
})
public void receive2(String message) {
System.out.println("message2=" + message);
}
}

2、定义生产者

1
2
3
public void testTopic() {
rabbitTemplate.convertAndSend("topics", "produce.use.hello", "user.save 路由消息");
}

基于配置绑定交换机

1
2
3
4
5
6
sudo groupadd docker
sudo usermod -aG docker $(whoami)
先退出一下再重新登录来保证有了正确的权限
sudo service docker start

vi /etc/sysconfig/docker
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@Configuration
public class RabbitMQConfig {
public static final String MIAOSHA_QUEUE = "hello";
public static final String QUEUE = "queue";

public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";

public static final String HEADER_QUEUE = "header.queue";

public static final String TOPIC_EXCHANGE = "topicExchage";
public static final String FANOUT_EXCHANGE = "fanoutExchage";

public static final String HEADERS_EXCHANGE = "headersExchage";

/**
* Direct模式 交换机 Exchange
* */
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}

/**
* Topic模式 交换机Exchange
* */
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1, true);
}
@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2, true);
}
@Bean
public TopicExchange topicExchage(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
}
/**
* Fanout模式(广播模式) 交换机Exchange
* */
@Bean
public FanoutExchange fanoutExchage(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
}
/**
* Header模式 交换机Exchange
* */
@Bean
public HeadersExchange headersExchage(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Queue headerQueue1() {
return new Queue(HEADER_QUEUE, true);
}
@Bean
public Binding headerBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
}

}
打赏
  • 版权声明: 本博客所有文章除特别声明外,均采用 Apache License 2.0 许可协议。转载请注明出处!
  1. © 2020-2021 Lauy    湘ICP备20003709号

请我喝杯咖啡吧~

支付宝
微信