white-goo
文章7
标签6
分类1
RabbitMQ消息模型

RabbitMQ消息模型

官方文档:RabbitMQ Tutorials — RabbitMQ

首先是环境搭建

pom.xml

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /

“Hello World!”

  • 这是最简单的一种消息模型,是点对点的形式,没有交换机

  • 生产者

@Autowired
private RabbitTemplate rabbitTemplate;


@GetMapping("/hello")
@ResponseBody
public void HelloWord(){

/**
*
* convertAndSend(String exchange,
* String routingKey,
* Object message,
* MessagePostProcessor messagePostProcessor,
* CorrelationData correlationData)
* 这个方法最多可以有5个参数
* exchange: 交换机
* routingKey: 路由键,跟队列绑定,没有队列会自动创建
* message: 消息,这里会自动转换成byte数组
* messagePostProcessor: 转换成Message类型对象后可以做一些处理
* correlationData: 消息的唯一id,用来标识一个消息
*
* 本案例只使用 2 和 3
*/
rabbitTemplate.convertAndSend("hello", "hello word");
}

​ 消费者

@Component
//创建一个队列取名 hello
//创建队列时可以声明一些参数
/**
* name: 队列的名字
* durable: 是否开启持久化
* autoDelete: 是否自动删除
* exclusive: 是否独占
* arguments: 额外的参数
* ignoreDeclarationExceptions: 忽略的异常
*/
@RabbitListener(queuesToDeclare = @Queue(name="hello"))
public class HelloConsumer {

@RabbitHandler
public void HelloWord(String message){
System.out.println(message);
}

}

WorkQueues

  • 工作队列,可有多个消费者监听队列,同样没有交换机,默认是采用轮询的方式分发消息
  • 生产者
@GetMapping("work")
@ResponseBody
public void Work(){
rabbitTemplate.convertAndSend("work", "work模型");
}
  • 消费者
@Component
public class WorkConsumer {

@RabbitListener(queuesToDeclare = @Queue("work"))
public void Work1(String massage){
System.out.println("work1: "+ massage);
}

@RabbitListener(queuesToDeclare = @Queue("work"))
public void Work2(String massage){
System.out.println("work2: " + massage);
}
}

Publish/Subscribe

  • 发布订阅模式,也叫广播模式,有交换机,可以不写路由键
  • 生产者
/**
* fanout 是广播模型,在这个模型里,routingKey是没有意义的,所以可以不写
*/
@GetMapping("fanout")
@ResponseBody
public void fanout(){
rabbitTemplate.convertAndSend("fanout","","fanout模型");
}
  • 消费者
@Component
public class PublishConsumer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(),//不起名为临时队列
exchange = @Exchange(name="fanout",type = "fanout") //绑定交换机
)
})
public void fanout1(String message){
System.out.println("fanout1: "+message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(),//不起名为临时队列
exchange = @Exchange(name="fanout",type = "fanout") //绑定交换机
)
})
public void fanout2(String message){
System.out.println("fanout2: "+message);
}
}

Routing

  • 路由模式,可以用路由键让
    队列和交换机进行绑定
  • 生产者
@GetMapping("route")
@ResponseBody
public void routing(){
rabbitTemplate.convertAndSend("route", "user", "route模型");
rabbitTemplate.convertAndSend("route","error","error信息");
rabbitTemplate.convertAndSend("route","info","info信息");
}
  • 消费者
@Component
public class RouteConsumer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name="route"),
exchange = @Exchange(name = "route",type = "direct"),
key = {"user","error"}//绑定路由键,消费者只能接收到指定路由键的消息
)
})
public void route(String message){
System.out.println("route: "+message);
}

}

Topics

  • 动态路由,在路由模式的基础上,路由键可以进行动态匹配
  • 匹配规则:
    • ‘*’ 匹配一个单词,单词与单词之间用 ‘.’ 分割,如: “user.id”
    • ‘#’ 匹配零个或多个单词
  • 生产者
@GetMapping("topic")
@ResponseBody
public void topic(){
rabbitTemplate.convertAndSend("topic","user.id","topic模型");
rabbitTemplate.convertAndSend("topic","user.id.message","user.id.message消息");
}
  • 消费者
@Component
public class TopicConsumer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "topic1"),
exchange = @Exchange(name = "topic",type = "topic"),
key = {"user.*"}

)
})
public void topic1(String message){
System.out.println("topic1: "+ message);
}


@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "topic2"),
exchange = @Exchange(name = "topic",type = "topic"),
key = {"user.#"}

)
})
public void topic2(String message){
System.out.println("topic2: "+ message);
}

}

RPC

  • RPC:远程过程调用,RabbitMQ的RPC模式指的是用RabbitMQ来实现RPC

    • 过程其实很简单,如果 A服务器 想调用 B服务器 的某个方法或功能,A就可以向队列发送一个消息,表示想调用服务器的某个功能,并且可以提携带参数,B服务器再监听这个队列,一旦接收到消息,就表示有人想调用B的某个功能,然后B拿着A提供的参数去执行目标方法,并把返回值放到另一个队列,A再去监听这个队列,一旦接收到消息,就表示自己的远程调用已经返回了结果,就可以对结果进行其他操作.
    • 这样就可以用RabbitMQ去实现RPC
    • 需要注意的是,A在接收到结果的时候,必须要确认该结果是否是给自己的,因为可以有多个服务器向B进行远程调用,所以必须要在发消息的时候给消息带一个唯一ID
  • 客户端

//消息的唯一ID,用来确定返回的结果是否是给自己的
private static final String MessageID = UUID.randomUUID().toString();

//这是一个客户端
@GetMapping("RPC")
@ResponseBody
public void rpc(){
//向hello队列发送一个消息,表示要调用hello方法,可以用MessageProperties来携带参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.getHeaders().put("携带的请求参数", "hello");
rabbitTemplate.convertAndSend("dohello", new Message("调用Hello方法".getBytes(), messageProperties), new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
return message;
}

//把id放进去,方便消费者获取
@Override
public Message postProcessMessage(Message message, Correlation correlation) {
if(correlation instanceof CorrelationData){
message.getMessageProperties().setCorrelationId(((CorrelationData) correlation).getId());
}
return message;
}
}, new CorrelationData(MessageID));
}

//客户端监听result队列,等待调用方法的返回值
@RabbitListener(queuesToDeclare = @Queue(name="result"))
public void callBack(Message message){
//确定是给自己的
if(MessageID.equals(message.getMessageProperties().getCorrelationId())){
System.out.println("RPCServer发过来的消息是: " + new String(message.getBody()));
System.out.println("Hello方法的返回值是: " + message.getMessageProperties().getHeaders().get("返回的结果"));
}
}
  • 服务端
@Component
public class RPCServer {

@Autowired
private RabbitTemplate rabbitTemplate;

//RPC远程服务器,监听hello队列,收到消息表示要调用Hello方法
@RabbitListener(queuesToDeclare = @Queue(name = "dohello"))
public void RPCServer(Channel channel, Message message){
System.out.println("RPCClient发过来的消息是: " + new String(message.getBody()));

MessageProperties messageProperties = message.getMessageProperties();
String result = Hello(messageProperties.getHeaders().get("携带的请求参数").toString());
//服务端处理完之后把返回结果发送到指定队列
MessageProperties messageProperties1 = new MessageProperties();
messageProperties1.getHeaders().put("返回的结果",result);
rabbitTemplate.convertAndSend("result", new Message("返回的结果".getBytes(), messageProperties1), new MessagePostProcessor() {

@Override
public Message postProcessMessage(Message message) throws AmqpException {
return message;
}

//把id再传回去
@Override
public Message postProcessMessage(Message message, Correlation correlation) {
if(correlation instanceof CorrelationData){
message.getMessageProperties().setCorrelationId(((CorrelationData) correlation).getId());
}
return message;
}

},new CorrelationData(messageProperties.getCorrelationId()));
}

public static String Hello(String message){
return message.toUpperCase();
}

}

Publisher Confirms

  • 消息确认机制,分为三块

    • publisher confirmCallback 生产者确认回调,生产者的消息成功发送到交换机,MQ就会触发回调函数
    • publisher returnCallback 交换机未投递到队列时触发的回调
    • consumer ack机制 消费者成功消费消息是的确认机制
  • yml配置

spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 开启发送端确认
publisher-confirm-type: correlated
# 开启发送端抵达队列确认
publisher-returns: true
# 只要到达队列,以异步的方式优先调用returnConfirm
template:
mandatory: true
# 开启手动ack模式
listener:
simple:
acknowledge-mode: manual

  • rabbitConfig
package com.whitegoo.rabbitmq.config;

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

import javax.annotation.PostConstruct;

@Configuration
public class RabbitConfig {

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void initRabbit(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {

/**
*
* @param correlationData 消息的唯一关联数据(id)
* @param ack 消息是否成功抵达MQ服务器
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {

if(ack){
//此处没有设置id的话会抛空指针异常
System.out.println("消息发送成功,id为: " + correlationData.getId());
}else {
System.out.println(cause);
System.out.println("发送失败的消息是: " + new String(correlationData.getReturned().getMessage().getBody()));
}
}
});

/*

//可以用lambda表达式
rabbitTemplate.setReturnsCallback(returned->{
System.out.println("投递失败的消息: " + returned.getMessage().getBody());
});
*/
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
//这个方法的触发时机是消息投递到队列失败的时候
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println("投递失败的消息: " + new String(returned.getMessage().getBody()));
}
});
}

}

  • 生产者
@GetMapping("confirm")
@ResponseBody
public void confirm(){
rabbitTemplate.convertAndSend("confirm",new Message("confirm".getBytes()),new CorrelationData(UUID.randomUUID().toString()));
}
  • 消费者
package com.whitegoo.rabbitmq.consumer;

import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
//创建一个临时队列 取名 hello
//创建队列时可以声明一些参数
/**
* name: 队列的名字
* durable: 是否开启持久化
* autoDelete: 是否自动删除
* exclusive: 是否独占
* arguments: 额外的参数
* ignoreDeclarationExceptions: 忽略的异常
*/
@RabbitListener(queuesToDeclare = @Queue(name="hello"))
public class HelloConsumer {

@RabbitHandler
public void HelloWord(String message){
System.out.println(message);
}

}

到这,rabbitmq的7种消息模型就介绍完毕了.

本文作者:white-goo
本文链接:https://white-goo.github.io/2021/08/02/RabbitMQ%E6%B6%88%E6%81%AF%E6%A8%A1%E5%9E%8B/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可