RabbitMQ延时队列
延时队列是分布式事务最终一致性的一种解决方案,延时队列的核心是死信路由机制
- 死信路由:及当消息过期没人消费,消息被拒收,队列满了,之类的情况发生后,消息将被抛弃,但是可以指定死信路由,将消息重新路由到死信队列
- 延时队列:顾名思义,消息进入目标队列不是及时的,实现方式就是通过死信路由,我们可以设置一个队列,该队列不被任何人监听,那么,当队列里面的消息过期后,就会被死信路由到指定的死信队列,再由消费者消费死信队列里面的消息,这样就达到了延时的效果
- 下面是代码演示
- RabbitConfig
public static final String DEAD_QUEUE = "DEAD_QUEUE"; public static final String HELLO_QUEUE = "HELLO_QUEUE"; public static final String DEAD_EXCHANGE = "DEAD_EXCHANGE"; public static final String HELLO_EXCHANGE = "HELLO_EXCHANGE"; public static final String DEAD_ROUTING_KEY = "DEAD_ROUTING_KEY"; public static final String HELLO_ROUTING_KEY = "HELLO_ROUTING";
@Bean public Exchange deadExchange(){ return new DirectExchange(DEAD_EXCHANGE); }
@Bean public Exchange helloExchange(){ return new DirectExchange(HELLO_EXCHANGE); }
@Bean public Queue helloQueue(){ Map<String, Object> map = new HashMap<>(); map.put("x-dead-letter-exchange", DEAD_EXCHANGE); map.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); return QueueBuilder.durable(HELLO_QUEUE).withArguments(map).build();
}
@Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); }
@Bean public Binding helloQueueAndHelloExchange(Queue helloQueue,Exchange helloExchange){ return BindingBuilder.bind(helloQueue).to(helloExchange).with(HELLO_ROUTING_KEY).noargs(); }
@Bean public Binding deadQueueAndDeadExchange(Queue deadQueue,Exchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); }
|
@GetMapping("dead") @ResponseBody public void deadLetter(){
Message message = new Message("10s后收到这条消息".getBytes()); MessageProperties messageProperties = message.getMessageProperties(); messageProperties.setExpiration("10000");
rabbitTemplate.convertAndSend(RabbitConfig.HELLO_EXCHANGE,RabbitConfig.HELLO_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString())); }
|
package com.whitegoo.rabbitmq.consumer;
import com.rabbitmq.client.Channel; import com.whitegoo.rabbitmq.config.RabbitConfig; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class DeadLetterConsumer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue(RabbitConfig.DEAD_QUEUE), exchange = @Exchange(name = RabbitConfig.DEAD_EXCHANGE,type = "direct"), key = {RabbitConfig.DEAD_ROUTING_KEY} ) }) public void deadLetter(Message message, Channel channel){ System.out.println("deadLetter: " + new String(message.getBody())); long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
|
- 这样,我们就实现了一个简单的延时队列,利用它来解决最终一致性问题也很简单
- 代码演示:
- 生产者
public static Map<String,String> map = new HashMap<>();
@GetMapping("dead") @ResponseBody public void deadLetter(){
System.out.println("下订单"); lock(); System.out.println("执行其他远程服务"); }
@GetMapping("finish") @ResponseBody public void finish(){ String next = map.keySet().iterator().next();
map.remove(next); System.out.println("订单已完成"); }
public void lock(){
System.out.println("执行所库存操作");
String s = UUID.randomUUID().toString(); map.put(s, "锁定了1个库存");
Message message = new Message("需要解锁库存".getBytes()); MessageProperties messageProperties = message.getMessageProperties(); messageProperties.getHeaders().put("日志ID", s); messageProperties.setExpiration("10000");
rabbitTemplate.convertAndSend(RabbitConfig.HELLO_EXCHANGE,RabbitConfig.HELLO_ROUTING_KEY,message,new CorrelationData(UUID.randomUUID().toString())); }
|
package com.whitegoo.rabbitmq.consumer;
import com.rabbitmq.client.Channel; import com.whitegoo.rabbitmq.config.RabbitConfig; import com.whitegoo.rabbitmq.producer.MessageProducer; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component public class DeadLetterConsumer {
@RabbitListener(bindings = { @QueueBinding( value = @Queue(RabbitConfig.DEAD_QUEUE), exchange = @Exchange(name = RabbitConfig.DEAD_EXCHANGE,type = "direct"), key = {RabbitConfig.DEAD_ROUTING_KEY} ) }) public void deadLetter(Message message, Channel channel){ System.out.println("收到解锁库存的消息"); String s = MessageProducer.map.get(message.getMessageProperties().getHeaders().get("日志ID")); if("".equals(s) || s == null){ System.out.println("日志已被删除,表明订单成功完成"); }else { System.out.println("日志还在,订单出现故障,需要解锁库存..."); MessageProducer.map.remove(message.getMessageProperties().getHeaders().get("需要解锁库存")); }
long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } } }
|
关于RabbitMQ的延时队列及分布式事务最终一致性的解决方案的讲解就到这里了.