RabbitMQ-如何保证消息不丢失

我们当时MYSQL和Redis的数据双写一致性就是采用RabbitMQ实现同步的,这里面就要求了消息的高可用性,我们要保证消息的不丢失。主要从三个层面考虑

生产者确认机制

第一个是开启生产者确认机制,确保生产者的消息能到达队列,如果报错可以先记录到日志中,再去修复数据

1
2
3
4
5
6
7
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息成功到达交换机: {}", correlationData);
} else {
log.error("消息未到达交换机,原因: {}", cause);
}
});

持久化功能

第二个是开启持久化功能,确保消息未消费前在队列中不会丢失,其中的交换机、队列、和消息都要做持久化

交换机持久化:

1
2
3
4
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE, true, false);
}

队列持久化:

1
2
3
4
5
6
7
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", DEAD_EXCHANGE); // 队列消息过期后发送的交换器
args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY); // 队列消息过期后发送的路由键
return new Queue(ORDER_QUEUE, true, false, false, args);
}

消息持久化:

1
2
3
4
5
6
7
8
9
10
rabbitTemplate.convertAndSend(
ORDER_EXCHANGE,
ORDER_ROUTING_KEY,
order.getId(),
message -> {
message.getMessageProperties().setExpiration(String.valueOf(15 * 60 * 1000));// 15分钟
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);// 持久化消息
return message;
}
)

消费者确认机制

第三个是开启消费者确认机制,消息处理成功后完成ack,当然也需要设置一定的重试次数,我们当时设置了3次,如果重试3次还没有收到消息,就将失败后的消息投递到异常交换机,交由人工处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@Component
@RequiredArgsConstructor
@Slf4j
public class RabbitMQListener {

private final OrderMapper orderMapper;
private final RabbitTemplate rabbitTemplate;

/**
* 监听死信队列
*/
@RabbitListener(queues = DEAD_QUEUE)
public void listenDeadQueue(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
try {
Order order = orderMapper.selectOneById(msg);
if ("103000".equals(order.getStatus())) {
UpdateChain.of(Order.class)
.set(ORDER.STATUS, "103020")
.where(ORDER.ID.eq(order.getId()))
.update();
log.info("订单{}已取消", msg);
}
// 手动确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 获取重试次数
Integer retryCount = (Integer) message.getMessageProperties().getHeaders().get("x-retry-count");
if (retryCount == null) {
retryCount = 0;
}

if (retryCount < MAX_RETRY_COUNT) {
// 重试次数加1
retryCount++;
message.getMessageProperties().getHeaders().put("x-retry-count", retryCount);
// 否认消息,并重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} else {
// 重试次数达到最大值,将消息投递到异常交换机
Map<String, Object> headers = new HashMap<>(message.getMessageProperties().getHeaders());
headers.put("x-original-routingKey", message.getMessageProperties().getReceivedRoutingKey());
rabbitTemplate.convertAndSend(EXCEPTION_EXCHANGE, EXCEPTION_ROUTING_KEY, message.getBody(), m -> {
m.getMessageProperties().getHeaders().putAll(headers);
return m;
});
// 拒绝消息,不重新入队
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
}
}

/**
* 监听异常队列
*/
@RabbitListener(queues = EXCEPTION_QUEUE)
public void listenExceptionQueue(Message message) {
String msg = new String(message.getBody());
// 记录日志,发送邮件通知
log.error("处理异常消息: {}", msg);
// 邮件通知
try {
MailUtil.sendEmail(ADMIN_EMAIL, "异常消息", msg);
} catch (Exception e) {
throw new BaseUnCheckedException("发送异常邮件失败");
}
}

}

RabbitMQ消息的重复消费问题如何解决的

嗯,这个我们还真遇到过,是这样的,我们当时消费者是设置了自动确认机制,当服务还没来得及给MQ确认的时候,服务宕机了,导致服务重启之后,又消费了一次消息。这样就重复消费了

因为我们当时处理的支付(订单|业务唯一标识),它有一个业务的唯一标识,我们再处理消息时,先到数据库查询一下,这个数据是否存在,如果不存在,说明没有处理过,这个时候就可以正常处理这个消息了。如果已经存在这个数据了,就说明消息重复消费了,我们就不需要再消费了(在业务上解决)

面试官:那你还知道其他的解决方案吗?

候选人

其实这个就是典型的幂等的问题,比如,redis分布式锁、数据库的锁都是可以的

RabbitMQ中死信交换机 ? (RabbitMQ延迟队列有了解过嘛)

我们当时的xx项目有一个xx业务,需要用到延迟队列,其中就是使用RabbitMQ来实现的。

延迟队列就是用到了死信交换机和TTL(消息存活时间)实现的。

如果消息超时未消费就会变成死信,在RabbitMQ中如果消息成为死信,队列可以绑定一个死信交换机,在死信交换机上可以绑定其他队列,在我们发消息的时候可以按照需求指定TTL的时间,这样就实现了延迟队列的功能了。

我记得RabbitMQ还有一种方式可以实现延迟队列,在RabbitMQ中安装一个死信插件,这样更方便一些,我们只需要在声明交互机的时候,指定这个就是死信交换机,然后在发送消息的时候直接指定超时时间就行了,相对于死信交换机+TTL要省略了一些步骤

面试官:如果有100万消息堆积在MQ , 如何解决 ?

候选人

我在实际的开发中,没遇到过这种情况,不过,如果发生了堆积的问题,解决方案也所有很多的

第一:提高消费者的消费能力 ,可以使用多线程消费任务

第二:增加更多消费者,提高消费速度

​ 使用工作队列模式, 设置多个消费者消费消费同一个队列中的消息

第三:扩大队列容积,提高堆积上限

可以使用RabbitMQ惰性队列,惰性队列的好处主要是

①接收到消息后直接存入磁盘而非内存

②消费者要消费消息时才会从磁盘中读取并加载到内存

③支持数百万条的消息存储

RabbitMQ的高可用机制有了解过嘛

我们当时项目在生产环境下,使用的集群,当时搭建是镜像模式集群,使用了3台机器。

镜像队列结构是一主多从,所有操作都是主节点完成,然后同步给镜像节点,如果主节点宕机后,镜像节点会替代成新的主节点,不过在主从同步完成前,主节点就已经宕机,可能出现数据丢失

面试官:那出现丢数据怎么解决呢?

候选人

我们可以采用仲裁队列,与镜像队列一样,都是主从模式,支持主从数据同步,主从同步基于Raft协议,强一致。

并且使用起来也非常简单,不需要额外的配置,在声明队列的时候只要指定这个是仲裁队列即可

RocketMQ

https://www.bilibili.com/video/BV1GY4y1F7og/?spm_id_from=333.999.0.0&vd_source=fa7ba4ae353f08f1d08d1bb24528e96c

三种消息队列的对比及适用场景

中间件 适用场景
RabbitMQ - 需要多种消息模式的应用:由于RabbitMQ支持发布/订阅、工作队列、路由、请求/响应等多种消息传递模式,因此适合需要灵活消息交互的应用场景。
- 对易用性和稳定性有较高要求的系统:对于那些更注重部署简便性和维护简单性的应用,RabbitMQ是一个很好的选择。
RocketMQ - 大规模消息处理系统:RocketMQ适用于需要处理大量消息的应用场景,特别是在金融交易、电商等对消息可靠性和性能有极高要求的行业。
- 高并发、高性能要求的场景:RocketMQ能够提供非常高的消息吞吐量和低延迟,适合对性能敏感的应用。
Kafka - 实时流处理:Kafka非常适合用于实时数据流处理场景,如实时分析、监控数据等。
- 日志收集:由于Kafka的数据持久性和高可用性特点,它经常被用来作为日志收集系统的基础组件。
- 事件驱动架构:对于构建基于事件驱动的应用程序,Kafka可以作为一个可靠的事件总线。