RabbitMQ-如何保证消息不丢失
我们当时MYSQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的,这里面就要求了消息的高可用性,我们要保证消息的不丢失。主要从三个层面考虑
生产者确认机制
第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果报错可以先记录到日志中,再去修复数据
1 2 3 4 5 6 7
| rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功到达交换机: {}", correlationData); } else { log.error("消息未到达交换机,原因: {}", cause); } });
|
持久化功能
第二个是开启持久化功能,确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化
交换机持久化:
1 2 3 4
| @Bean public DirectExchange orderExchange() { return new DirectExchange(ORDER_EXCHANGE, true, false); }
|
队列持久化:
1 2 3 4 5 6 7
| @Bean public Queue orderQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-dead-letter-exchange", DEAD_EXCHANGE); args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); return new Queue(ORDER_QUEUE, true, false, false, args); }
|
消息持久化:
1 2 3 4 5 6 7 8 9 10
| rabbitTemplate.convertAndSend( ORDER_EXCHANGE, ORDER_ROUTING_KEY, order.getId(), message -> { message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000)); message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; } )
|
消费者确认机制
第三个是开启消费者确认机制,消息处理成功后完成ack,当然也需要设置一定的重试次数,我们当时设置了3次,如果重试3次还没有收到消息,就将失败后的消息投递到异常交换机,交由人工处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| @Component @RequiredArgsConstructor @Slf4j public class RabbitMQListener {
private final OrderMapper orderMapper; private final RabbitTemplate rabbitTemplate;
@RabbitListener(queues = DEAD_QUEUE) public void listenDeadQueue(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); try { Order order = orderMapper.selectOneById(msg); if ("103000".equals(order.getStatus())) { UpdateChain.of(Order.class) .set(ORDER.STATUS, "103020") .where(ORDER.ID.eq(order.getId())) .update(); log.info("订单{}已取消", msg); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count"); if (retryCount == null) { retryCount = 0; }
if (retryCount < MAX_RETRY_COUNT) { retryCount++; message.getMessageProperties().getHeaders().put("x-retry-count", retryCount); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } else { Map<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders()); headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey()); rabbitTemplate.convertAndSend(EXCEPTION_EXCHANGE, EXCEPTION_ROUTING_KEY, message.getBody(), m -> { m.getMessageProperties().getHeaders().putAll(headers); return m; }); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); } } }
@RabbitListener(queues = EXCEPTION_QUEUE) public void listenExceptionQueue(Message message) { String msg = new String(message.getBody()); log.error("处理异常消息: {}", msg); try { MailUtil.sendEmail(ADMIN_EMAIL, "异常消息", msg); } catch (Exception e) { throw new BaseUnCheckedException("发送异常邮件失败"); } }
}
|
RabbitMQ消息的重复消费问题如何解决的
嗯,这个我们还真遇到过,是这样的,我们当时消费者是设置了自动确认机制,当服务还没来得及给MQ确认的时候,服务宕机了,导致服务重启之后,又消费了一次消息。这样就重复消费了
因为我们当时处理的支付(订单|业务唯一标识),它有一个业务的唯一标识,我们再处理消息时,先到数据库查询一下,这个数据是否存在,如果不存在,说明没有处理过,这个时候就可以正常处理这个消息了。如果已经存在这个数据了,就说明消息重复消费了,我们就不需要再消费了(在业务上解决)
面试官:那你还知道其他的解决方案吗?
候选人:
其实这个就是典型的幂等的问题,比如,redis分布式锁、数据库的锁都是可以的
RabbitMQ中死信交换机 ? (RabbitMQ延迟队列有了解过嘛)
我们当时的xx项目有一个xx业务,需要用到延迟队列,其中就是使用RabbitMQ来实现的。
延迟队列就是用到了死信交换机和TTL(消息存活时间)实现的。
如果消息超时未消费就会变成死信,在RabbitMQ中如果消息成为死信,队列可以绑定一个死信交换机,在死信交换机上可以绑定其他队列,在我们发消息的时候可以按照需求指定TTL的时间,这样就实现了延迟队列的功能了。
我记得RabbitMQ还有一种方式可以实现延迟队列,在RabbitMQ中安装一个死信插件,这样更方便一些,我们只需要在声明交互机的时候,指定这个就是死信交换机,然后在发送消息的时候直接指定超时时间就行了,相对于死信交换机+TTL要省略了一些步骤
面试官:如果有100万消息堆积在MQ , 如何解决 ?
候选人:
我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的
第一:提高消费者的消费能力 ,可以使用多线程消费任务
第二:增加更多消费者,提高消费速度
使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息
第三:扩大队列容积,提高堆积上限
可以使用RabbitMQ惰性队列,惰性队列的好处主要是
①接收到消息后直接存入磁盘而非内存
②消费者要消费消息时才会从磁盘中读取并加载到内存
③支持数百万条的消息存储
RabbitMQ的高可用机制有了解过嘛
我们当时项目在生产环境下,使用的集群,当时搭建是镜像模式集群,使用了3台机器。
镜像队列结构是一主多从,所有操作都是主节点完成,然后同步给镜像节点,如果主节点宕机后,镜像节点会替代成新的主节点,不过在主从同步完成前,主节点就已经宕机,可能出现数据丢失
面试官:那出现丢数据怎么解决呢?
候选人:
我们可以采用仲裁队列,与镜像队列一样,都是主从模式,支持主从数据同步,主从同步基于Raft协议,强一致。
并且使用起来也非常简单,不需要额外的配置,在声明队列的时候只要指定这个是仲裁队列即可
RocketMQ
https://www.bilibili.com/video/BV1GY4y1F7og/?spm_id_from=333.999.0.0&vd_source=fa7ba4ae353f08f1d08d1bb24528e96c
三种消息队列的对比及适用场景
中间件 |
适用场景 |
RabbitMQ |
- 需要多种消息模式的应用:由于RabbitMQ支持发布/订阅、工作队列、路由、请求/响应等多种消息传递模式,因此适合需要灵活消息交互的应用场景。 - 对易用性和稳定性有较高要求的系统:对于那些更注重部署简便性和维护简单性的应用,RabbitMQ是一个很好的选择。 |
RocketMQ |
- 大规模消息处理系统:RocketMQ适用于需要处理大量消息的应用场景,特别是在金融交易、电商等对消息可靠性和性能有极高要求的行业。 - 高并发、高性能要求的场景:RocketMQ能够提供非常高的消息吞吐量和低延迟,适合对性能敏感的应用。 |
Kafka |
- 实时流处理:Kafka非常适合用于实时数据流处理场景,如实时分析、监控数据等。 - 日志收集:由于Kafka的数据持久性和高可用性特点,它经常被用来作为日志收集系统的基础组件。 - 事件驱动架构:对于构建基于事件驱动的应用程序,Kafka可以作为一个可靠的事件总线。 |