RabbitMQ消息模型
官方文档:RabbitMQ Tutorials — RabbitMQ
首先是环境搭建
pom.xml
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application.yml
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
|
“Hello World!”
@Autowired private RabbitTemplate rabbitTemplate;
@GetMapping("/hello") @ResponseBody public void HelloWord(){
rabbitTemplate.convertAndSend("hello", "hello word"); }
|
消费者
@Component
@RabbitListener(queuesToDeclare = @Queue(name="hello")) public class HelloConsumer {
@RabbitHandler public void HelloWord(String message){ System.out.println(message); }
}
|
WorkQueues
- 工作队列,可有多个消费者监听队列,同样没有交换机,默认是采用轮询的方式分发消息
- 生产者
@GetMapping("work") @ResponseBody public void Work(){ rabbitTemplate.convertAndSend("work", "work模型"); }
|
@Component public class WorkConsumer {
@RabbitListener(queuesToDeclare = @Queue("work")) public void Work1(String massage){ System.out.println("work1: "+ massage); }
@RabbitListener(queuesToDeclare = @Queue("work")) public void Work2(String massage){ System.out.println("work2: " + massage); } }
|
Publish/Subscribe
- 发布订阅模式,也叫广播模式,有交换机,可以不写路由键
- 生产者
@GetMapping("fanout") @ResponseBody public void fanout(){ rabbitTemplate.convertAndSend("fanout","","fanout模型"); }
|
@Component public class PublishConsumer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue(),//不起名为临时队列 exchange = @Exchange(name="fanout",type = "fanout") //绑定交换机 ) }) public void fanout1(String message){ System.out.println("fanout1: "+message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue(),//不起名为临时队列 exchange = @Exchange(name="fanout",type = "fanout") //绑定交换机 ) }) public void fanout2(String message){ System.out.println("fanout2: "+message); } }
|
Routing
- 路由模式,可以用路由键让
队列和交换机进行绑定
- 生产者
@GetMapping("route") @ResponseBody public void routing(){ rabbitTemplate.convertAndSend("route", "user", "route模型"); rabbitTemplate.convertAndSend("route","error","error信息"); rabbitTemplate.convertAndSend("route","info","info信息"); }
|
@Component public class RouteConsumer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue(name="route"), exchange = @Exchange(name = "route",type = "direct"), key = {"user","error"}//绑定路由键,消费者只能接收到指定路由键的消息 ) }) public void route(String message){ System.out.println("route: "+message); }
}
|
Topics
- 动态路由,在路由模式的基础上,路由键可以进行动态匹配
- 匹配规则:
- ‘*’ 匹配一个单词,单词与单词之间用 ‘.’ 分割,如: “user.id”
- ‘#’ 匹配零个或多个单词
- 生产者
@GetMapping("topic") @ResponseBody public void topic(){ rabbitTemplate.convertAndSend("topic","user.id","topic模型"); rabbitTemplate.convertAndSend("topic","user.id.message","user.id.message消息"); }
|
@Component public class TopicConsumer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue(name = "topic1"), exchange = @Exchange(name = "topic",type = "topic"), key = {"user.*"}
) }) public void topic1(String message){ System.out.println("topic1: "+ message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue(name = "topic2"), exchange = @Exchange(name = "topic",type = "topic"), key = {"user.#"}
) }) public void topic2(String message){ System.out.println("topic2: "+ message); }
}
|
RPC
private static final String MessageID = UUID.randomUUID().toString();
@GetMapping("RPC") @ResponseBody public void rpc(){ MessageProperties messageProperties = new MessageProperties(); messageProperties.getHeaders().put("携带的请求参数", "hello"); rabbitTemplate.convertAndSend("dohello", new Message("调用Hello方法".getBytes(), messageProperties), new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { return message; }
@Override public Message postProcessMessage(Message message, Correlation correlation) { if(correlation instanceof CorrelationData){ message.getMessageProperties().setCorrelationId(((CorrelationData) correlation).getId()); } return message; } }, new CorrelationData(MessageID)); }
@RabbitListener(queuesToDeclare = @Queue(name="result")) public void callBack(Message message){ if(MessageID.equals(message.getMessageProperties().getCorrelationId())){ System.out.println("RPCServer发过来的消息是: " + new String(message.getBody())); System.out.println("Hello方法的返回值是: " + message.getMessageProperties().getHeaders().get("返回的结果")); } }
|
@Component public class RPCServer {
@Autowired private RabbitTemplate rabbitTemplate;
@RabbitListener(queuesToDeclare = @Queue(name = "dohello")) public void RPCServer(Channel channel, Message message){ System.out.println("RPCClient发过来的消息是: " + new String(message.getBody()));
MessageProperties messageProperties = message.getMessageProperties(); String result = Hello(messageProperties.getHeaders().get("携带的请求参数").toString()); MessageProperties messageProperties1 = new MessageProperties(); messageProperties1.getHeaders().put("返回的结果",result); rabbitTemplate.convertAndSend("result", new Message("返回的结果".getBytes(), messageProperties1), new MessagePostProcessor() {
@Override public Message postProcessMessage(Message message) throws AmqpException { return message; }
@Override public Message postProcessMessage(Message message, Correlation correlation) { if(correlation instanceof CorrelationData){ message.getMessageProperties().setCorrelationId(((CorrelationData) correlation).getId()); } return message; }
},new CorrelationData(messageProperties.getCorrelationId())); }
public static String Hello(String message){ return message.toUpperCase(); }
}
|
Publisher Confirms
消息确认机制,分为三块
- publisher confirmCallback 生产者确认回调,生产者的消息成功发送到交换机,MQ就会触发回调函数
- publisher returnCallback 交换机未投递到队列时触发的回调
- consumer ack机制 消费者成功消费消息是的确认机制
yml配置
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: /
publisher-confirm-type: correlated
publisher-returns: true
template: mandatory: true
listener: simple: acknowledge-mode: manual
|
package com.whitegoo.rabbitmq.config;
import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
@Configuration public class RabbitConfig {
@Autowired private RabbitTemplate rabbitTemplate;
@PostConstruct public void initRabbit(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){ System.out.println("消息发送成功,id为: " + correlationData.getId()); }else { System.out.println(cause); System.out.println("发送失败的消息是: " + new String(correlationData.getReturned().getMessage().getBody())); } } });
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { System.out.println("投递失败的消息: " + new String(returned.getMessage().getBody())); } }); }
}
|
@GetMapping("confirm") @ResponseBody public void confirm(){ rabbitTemplate.convertAndSend("confirm",new Message("confirm".getBytes()),new CorrelationData(UUID.randomUUID().toString())); }
|
package com.whitegoo.rabbitmq.consumer;
import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component
@RabbitListener(queuesToDeclare = @Queue(name="hello")) public class HelloConsumer {
@RabbitHandler public void HelloWord(String message){ System.out.println(message); }
}
|
到这,rabbitmq的7种消息模型就介绍完毕了.