Skip to content

消息队列

更新: 5/9/2025 字数: 0 字 时长: 0 分钟

技术选型

首先我们必须要知道的是,市面上常见的MQ就四个——ActiveMQ,RabbitMQ,RocketMQ,Kafka

其中由于ActiveMQ因为没怎么经受过大规模吞吐量,导致现在很少使用了,而Kafka由于自身的一些设计特点,导致他更加适合大数据领域的实时计算和日志采集

因此我们的项目一般情况下只会设计到RabbitMQ和RocketMQ这两个

其中RocketMQ是阿里开源,后捐献给Apache基金会,社区相对不活跃,但是由于是Java开发,因此可以相对较好的将代码拉下来学习,所以如果公司的技术实力够的话还是很推荐RocketMQ的(而且根据我对阿里开源的观察,阿里系的开源软件一般都是针对秒杀场景的)

而RabbitMQ基于erlang语言开发,这个语言在设计当初就是为了接受分布式而做的语言,因此其并发量也不必多说,且社区一直活跃,所以如果对公司的技术水平信息不够,或许RabbitMQ才是一个更好的选择

一些具体的区别(来源于Doocs)

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
topic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用

为什么要用消息队列

很多小伙伴在写分布式项目的时候就想到要放消息队列进技术栈里,但是没有考虑过为什么,这是很不合理的,一下子就能被面试官看出是市面上抄的项目,

那为什么要用消息队列呢?

消息队列的作用常见的就三个——解耦,削峰,异步

解耦

场景:

用户删除自己博客上的文章,那么首先会经过我们的博客模块,然后去删除数据库中的数据,接着就可以理解返回给用户了,但这样还没完,因为我们的博客文章是在长文本模块控制的Cassandra中,博客图片则在oss模块下的minio(或则其他的什么oss系统)中,则时我们就可以引入消息队列,让消息队列在博客模块完成自己的操作后就发布出一个消息,然后就不用管了,其他的模块自己监听消息,若消息来到就执行自己的操作即可

这里的一个关键点在于 不关心/约定大于配置 ,即消息的发出者和接受者只需要约定好相同的topic和consumer即可,后面的都不用关心了,将与其他模块的关联降到最小

异步

这里的异步和解耦在过程上很像,重点在于关注的地方不同

解耦关注的是如何在A模块不感知到BCD模块的情况下就能完成整套的业务流程,而异步关注的重点是A模块在无需要求BCD返回结果的情况下如何可以快速的调用BCD模块来完成业务

所以有时候你会看到一些项目中一个模块内有一整个配对的消息发出者和接受者,这有可能就是在实现异步

那么什么场景需要异步呢?

其实只要你想到多线程实现异步的时候基本都可以用消息队列来实现异步

比如博客系统中,用户想要修改自己的文章,由于我们的论坛系统随着用户的人数上升,这个操作的耗时可能也会增加,因此这个过程可以直接丢给消息队列让他去发布消息异步修改,而接口立即返回结果给用户

那么这里就很自然的引出第二个问题

消息队列和多线程在异步上有什么区别?

  • 消息队列:

    • 可靠性高(优点): 消息队列引入持久化机制,且自带重试功能
    • 复杂度高(缺点): 消息队列因为引入了一个新的中间件,复杂度显著增加(缺点)
    • 可用性降低(缺点): 消息队列相当于多引入了一个中间件,他挂了就联不通了,可用性自然下降了
  • 多线程:

    • 复杂度低: 由于没有引入多余中间件,因此复杂度较低
    • 资源开销大: 由于线程的销毁,创建,切换都会占用系统资源,因此过多的线程会导致性能的下降

削峰

有的系统在某个时间的请求量会大幅上升(比如秒杀场景),这时如果没有一个合理的预防方案就很容易将我们的系统打崩

而消息队列由于可以储存部分消息,并且可以让消息以一定的流速流出,因此常常被用作削峰

如何保证消息队列的高可用

首先我们得先定义什么叫可用性

可用性:系统服务不中断运行时间占实际运行时间的比例

业界有时后会用几个9来说一个系统的可用性有多高

比如四个9就是99.99%,代表着全年不可用时间在52.6分钟以内,表示该系统在连续运行1年时间里最多可能的业务中断时间是52.6分钟(一年总时间的0.01%的时长)

而这里提到MQ的高可用,其实就是说如何保证MQ不会轻易挂掉

RocketMQ

RocketMQ的整个运行流程有下面几个重要的概念:

  • Producer:消息生产者
  • Consumer:消息消费者
  • CommitQueue:逻辑队列,内部仅存储一个物理地址,用来指向一个CommitLog,每一个Topic下的每个MessageQueue都一个对应的ConsumerQueue
  • CommitLog:提交日志,也就是真实的存储文件,被发送的消息本身
  • BrokerServer:消息存储,投递,查询以及服务的高可用,可以姐搜Producer发布过来的消息,同时也可以将CommitLog持久化
  • NameServer:Topic的注册中心,支持对Broker的动态注册与发现

其中NameServer是无状态的,也就是说只要使用集群部署,有一个NameServer可用,那么整体就是可用的,多部署几个NameServer就好,Broker,Consumer和Producer会自动的向对用的NameServer上去注册和连接

无状态: 指各节点之间平等,不存在主从关系

由于NameServer的无状态性,因此RockerMQ的可用性关键就放在了Broker上

一般我们会推荐多主多从的方式进行部署 (多Slave多Master)

这里首先看一下Mater和Slave的区别

Mater级别的Broker支持读和写,而Slave级别的Broker仅支持读,也就是说Producer只能与Mater Broker进行交互,而Consumer可以即和Master Broker交互也可以和Slave Broker交互

我们可以给每个Master去配置至少一个的Slave,进而产生多对的Mater-Slave,这样即时一个Master挂掉,也可以其他的Master顶上。

Slave的作用:

  • 可以在Master压力大的时候主动的去承担一部分的发出消息的责任
  • 当Master出现消息丢失甚至硬件损坏时Slave中仍然会保有一份CommitLog

这里还有一个同步双写和异步双写的问题,其实就是如何去同步Master和Slave之间的关系

同步双写就是说当Mater和Slave完成了同步后才会向Producer发出消息已经发送成功的提示

异步双写就是说不等待这个同步过程,Master接收到消息就认为消息发送成功

集群部署下的RocketMQ的运行流程

首先所有的Producer,Consumer,Broker会连接上NameServer,然后Producer发送消息时会根据负载均衡策略选择一个Master Broker来接受消息,这时Consumer就可以来接受消息了,同时Master还会根据双写策略来同步消息给到Slave

消息如何不被消费?如何实现幂等性?

这俩问题就是一个问题,因为消息不被重复消费(或者说是让重复消费不会影响系统)的解决方案就是实现幂等

首先就是我们得知道什么时候出现重复消费

  • 首先就是广播模式,当你的模块被以集群的形式多个部署的时候,就很有可能出现重复消费的情况
  • 一个Topic被多个ConsumerGroup消费时也会出现重复消费的情况
  • 在发送批量消息的情况下,如果存在部分消息没有成功的处理,也会发生重复消费

那么如何解决重复消费的问题呢?就是实现幂等

Redis幂等

由于Redis的Set本身就是幂等的,因此不需要特别关注

MySQL幂等

  • 版本号:我们可以设计一个版本号,当版本号存在的时候就说明已经被消费过一次,那就不再处理
  • ON DUPLICATE KEY UPDATE:MySQL自带的一种方案,如果存在则update,又因为重复消费的数据一样,因此也不会对我们的系统带来影响

如何保证消息的可靠性传输/如何避免消息丢失

可靠性:数据要完整准确,也就是说尽可能减少数据的丢失或者出错

能出问题的总共就三个阶段

  • 生产阶段:Producer新建消息,然后通过网络将消息投递给Broker的阶段
  • 存储阶段:Broker将消息存储到磁盘的阶段
  • 消息阶段:Consumer向Broker拉取消息的阶段

这里根据rocketmq-SpringBoot-stater来讲一下,

生产阶段

首先就是在yml文件中,应该区去配置发送失败的重试机制

yaml
rocketmq:
  producer:
    group: your_producer_group
    namesrv-addr: 127.0.0.1:9876 # NameServer 地址
    # 同步/异步发送失败的重试次数
    retry-times-when-send-failed: 3
    # 异步发送失败的重试次数 (如果设置,会覆盖上面的配置)
    # retry-times-when-send-async-failed: 3
    # 在同步发送失败时,是否尝试发送到另一个 Broker
    retry-another-broker-when-not-store-ok: true
    # 发送消息的超时时间 (毫秒)
    send-message-timeout: 3000

如果不使用SpringBoot集成的包,我们也可以监控一下SendResult

kotlin
fun sendMsg(msg: Message) {  
    var sendResult:SendResult?=null  
    while (sendResult==null||sendResult.sendStatus!= SendStatus.SEND_OK){  
        sendResult = rocketMQProducer.send(msg)  
    }  
}

存储阶段

默认的情况下,只要消息到了Broker端,那么就会优先将消息保存到内存中,然后立刻返回相应给生产者,随后Broker定期将一组消息从内存异步刷新到磁盘中

若想保证 Broker 端不丢消息,保证消息的可靠性,我们需要将消息保存机制修改为同步刷盘方式,即消息存储磁盘成功,才会返回响应。

在集群部署下,消息写入 master 成功,就可以返回确认响应给生产者,接着消息将会异步复制到 slave 节点

我们可以通过配置同步的方式来让master 节点将会同步等待 slave 节点复制完成,才会返回确认响应。

也就是使用同步双写的机制

但是有得必有失,这种同步双写的机制会导致响应速度变慢

消费阶段

yaml
rocketmq:
  consumer:
    group: your_consumer_group
    namesrv-addr: 127.0.0.1:9876 # NameServer 地址
    topic: your_topic
    # 消费消息的最大重试次数
    max-reconsume-times: 16
    # 消费线程数量等其他配置...

或者我们可以对我们的消费者代码进行一个异常的检测,如果检测到了异常,那么就向Broker返回消息发送失败

如何实现消息的顺序性

RocketMQ的SpringBoot-Stater很自然的就集成了顺序发送

kotlin
fun sendMsg(msg: Message<String>) {  
    rocketMQTemplate.syncSendOrderly("s",msg,"alian_sync_ordered")  
}

这里主要是说一下RockerMQ是如何保证的消息顺序性

首先必须要知道的是RocketMQ保证的是局部消息的顺序性,也就是说,在一个MessageQueue中,消息是严格有序的

当然,也可以通过一些花招来实现全局有效,比如一个Topic下只有一个MessageQueue,也就做到全局有效了,但是这样的效率太低,且Consumer只能有一个线程消费,因此就不提了

而分区有序就是依赖上面代码的syncSendOrderly的第三个参数hashKey/shardingKey,由于MessageQueue自己是能保证FIFO的,因此只要属于同一个ShardingKey就能保证顺序消费

如何解决消息队列的延时与过期问题/消息爆满怎么办

当问到这个问题的时候就是相对底层了,面试官已经试图拷问你有没有经历过真正使用RocketMQ的场景

在RocketMQ中,消息被挤压并不会丢失,而是会存放起来,因此不会出现消息过期的问题,但这个过程会导致很严重的延时

解决思路就是提高Consumer的消费能力,尽可能快速的解决掉挤压的消息

  • 提高Consumer的数量/提高消费并行数量: 针对出错的Queue写一个专门的消费程序,然后紧急征用多台服务器去部署这个程序的实例来共同消费Queue
  • 提高单个Consumer的并发线程数/批量消费: 修改消费者配置文件中的consumeThreadMin和ConsumeMessageBatch参数,增加线程可以提高单个实例的处理能力,但是由于是单机问题,要留意系统资源是否够用的问题
  • 紧急方案:
    1. 修复Consumer代码
    2. 暂停所有的Consumer
    3. 创建一个临时的Topic,拥有更多的Queue
    4. 编写一个临时程序来消费源Topic挤压的消息,这个临时程序的作用仅仅是将原挤压的Topic的消息转发到新的Topic中
    5. 临时启动大量的Consumer实例,然后消费新Topic中的消息
    6. 当挤压消息消费完后,恢复原架构

对于RabbitMQ,是存在消息过期这个说法的

你可以设置一个TTL,然后超过TTL的时间就会被Rabbit给清理掉,这样就不会像Rocket一样出现消息挤压的问题,但是会引入一个新的问题——消息丢失了怎么办?

这时你只能等到高峰期借宿,然后写一个程序,将Rabbit丢失的那批程序重新写回队列中,然后又再让Consumer重新的消费

本站访客数 人次      本站总访问量