white-goo
文章7
标签6
分类1
RabbitMQ延时队列

RabbitMQ延时队列

​ 延时队列是分布式事务最终一致性的一种解决方案,延时队列的核心是死信路由机制

  • 死信路由:及当消息过期没人消费,消息被拒收,队列满了,之类的情况发生后,消息将被抛弃,但是可以指定死信路由,将消息重新路由到死信队列
  • 延时队列:顾名思义,消息进入目标队列不是及时的,实现方式就是通过死信路由,我们可以设置一个队列,该队列不被任何人监听,那么,当队列里面的消息过期后,就会被死信路由到指定的死信队列,再由消费者消费死信队列里面的消息,这样就达到了延时的效果
  • 下面是代码演示
  • RabbitConfig
	//死信队列
public static final String DEAD_QUEUE = "DEAD_QUEUE";
//普通队列
public static final String HELLO_QUEUE = "HELLO_QUEUE";
//死信交换机
public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE";
//普通交换机
public static final String HELLO_EXCHANGE = "HELLO_EXCHANGE";
//死信路由
public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY";
//普通路由
public static final String HELLO_ROUTING_KEY = "HELLO_ROUTING";

//创建死信交换机
@Bean
public Exchange deadExchange(){
return new DirectExchange(DEAD_EXCHANGE);
}

//创建普通交换机
@Bean
public Exchange helloExchange(){
return new DirectExchange(HELLO_EXCHANGE);
}


//创建普通队列
@Bean
public Queue helloQueue(){
Map<String, Object> map = new HashMap<>();
//设置死信交换机
map.put("x-dead-letter-exchange", DEAD_EXCHANGE);
//设置私信路由
map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);
//这里可以设置队列中消息的失效时间,但是建议在发送消息的时候对每一条消息进行单独设置,这样更加灵活
//map.put("x-message-ttl","10000");

return QueueBuilder.durable(HELLO_QUEUE).withArguments(map).build();
// return QueueBuilder.durable(HELLO_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();
}

//创建死信队列
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_QUEUE).build();
}

//创建普通队列和普通交换机之间的绑定
@Bean
public Binding helloQueueAndHelloExchange(Queue helloQueue,Exchange helloExchange){
return BindingBuilder.bind(helloQueue).to(helloExchange).with(HELLO_ROUTING_KEY).noargs();
}

//创建死信队列和死信交换机之间的绑定
@Bean
public Binding deadQueueAndDeadExchange(Queue deadQueue,Exchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
  • 生产者
@GetMapping("dead")
@ResponseBody
public void deadLetter(){

Message message = new Message("10s后收到这条消息".getBytes());
MessageProperties messageProperties = message.getMessageProperties();
//设置过期时间,单位 毫秒
messageProperties.setExpiration("10000");

rabbitTemplate.convertAndSend(RabbitConfig.HELLO_EXCHANGE,RabbitConfig.HELLO_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));
}
  • 消费者
package com.whitegoo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.whitegoo.rabbitmq.config.RabbitConfig;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DeadLetterConsumer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(RabbitConfig.DEAD_QUEUE),
exchange = @Exchange(name = RabbitConfig.DEAD_EXCHANGE,type = "direct"),
key = {RabbitConfig.DEAD_ROUTING_KEY}
)
})
public void deadLetter(Message message, Channel channel){
System.out.println("deadLetter: " + new String(message.getBody()));
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}

  • 这样,我们就实现了一个简单的延时队列,利用它来解决最终一致性问题也很简单
  • 代码演示:
  • 生产者
/**
* 模拟数据库日志,如果锁了库存,就生成对应日志
*/
public static Map<String,String> map = new HashMap<>();

@GetMapping("dead")
@ResponseBody
public void deadLetter(){

//执行下定单操作
System.out.println("下订单");
//执行远程锁库存操作
lock();
//执行其他远程服务
System.out.println("执行其他远程服务");
}

@GetMapping("finish")
@ResponseBody
public void finish(){
//偷个懒,这里应该传入完成的订单的ID,同时也是日志的ID
String next = map.keySet().iterator().next();
//订单完成就删除该日志
/**
* 只有完成订单才能删除这条日志,而解锁库存的微服务接收到延时队列的消息的时候会查询该日志,
* 如果存在,就解锁库存,不存在,就表示订单顺利完成,下订单的任意一个环节出问题,都不能删除该日志,
* 等解锁库存的微服务收到来自延时队列的消息时,就会自动解锁库存,
* 从而实现分布式事务的最终一致性方案.
*/
map.remove(next);
System.out.println("订单已完成");
}

public void lock(){

System.out.println("执行所库存操作");

//日志ID,用于解锁库存
String s = UUID.randomUUID().toString();
map.put(s, "锁定了1个库存");

Message message = new Message("需要解锁库存".getBytes());
MessageProperties messageProperties = message.getMessageProperties();
messageProperties.getHeaders().put("日志ID", s);
//设置过期时间,单位 毫秒
messageProperties.setExpiration("10000");

rabbitTemplate.convertAndSend(RabbitConfig.HELLO_EXCHANGE,RabbitConfig.HELLO_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString()));
}
  • 消费者
package com.whitegoo.rabbitmq.consumer;

import com.rabbitmq.client.Channel;
import com.whitegoo.rabbitmq.config.RabbitConfig;
import com.whitegoo.rabbitmq.producer.MessageProducer;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
public class DeadLetterConsumer {

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(RabbitConfig.DEAD_QUEUE),
exchange = @Exchange(name = RabbitConfig.DEAD_EXCHANGE,type = "direct"),
key = {RabbitConfig.DEAD_ROUTING_KEY}
)
})
public void deadLetter(Message message, Channel channel){
System.out.println("收到解锁库存的消息");
String s = MessageProducer.map.get(message.getMessageProperties().getHeaders().get("日志ID"));
if("".equals(s) || s == null){
System.out.println("日志已被删除,表明订单成功完成");
}else {
System.out.println("日志还在,订单出现故障,需要解锁库存...");
//为了操作方便,解锁完库存就把日志删除
MessageProducer.map.remove(message.getMessageProperties().getHeaders().get("需要解锁库存"));
}

long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
e.printStackTrace();
}
}
}

关于RabbitMQ的延时队列及分布式事务最终一致性的解决方案的讲解就到这里了.

本文作者:white-goo
本文链接:https://white-goo.github.io/2021/08/08/DeadLetter/
版权声明:本文采用 CC BY-NC-SA 3.0 CN 协议进行许可