网易首页 > 网易号 > 正文 申请入驻

【110期】面试官:说说 RabbitMQ 消费端限流、TTL、死信队列?

0
分享至

点击上方“Java精选”,选择“设为星标”

别问别人为什么,多问自己凭什么!

下方留言必回,有问必答!

每天08:00更新文章,每天进步一点点...

1、为什么要对消费端限流

假设一个场景,首先,我们 Rabbitmq 服务器积压了有上万条未处理的消息,我们随便打开一个消费者客户端,会出现这样情况: 巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,我们无法约束生产端,这是用户的行为。所以我们应该对消费端限流,用于保持消费端的稳定,当消息数量激增的时候很有可能造成资源耗尽,以及影响服务的性能,导致系统的卡顿甚至直接崩溃。

2、限流的 api 讲解

RabbitMQ 提供了一种 qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于 consume 或者 channel 设置 Qos 的值)未被确认前,不进行消费新的消息。


* Request specific "quality of service" settings.
* These settings impose limits on the amount of data the server
* will deliver to consumers before requiring acknowledgements.
* Thus they provide a means of consumer-initiated flow control.
* @param prefetchSize maximum amount of content (measured in
* octets) that the server will deliver, 0 if unlimited
* @param prefetchCount maximum number of messages that the server
* will deliver, 0 if unlimited
* @param global true if the settings should be applied to the
* entire channel rather than each consumer
* @throws java.io.IOException if an error is encountered
void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException;


  • prefetchSize:0,单条消息大小限制,0代表不限制



  • prefetchCount:一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于 N 个消息,即一旦有 N 个消息还没有 ack,则该 consumer 将 block 掉,直到有消息 ack。



  • global:true、false 是否将上面设置应用于 channel,简单点说,就是上面限制是 channel 级别的还是 consumer 级别。当我们设置为 false 的时候生效,设置为 true 的时候没有了限流功能,因为 channel 级别尚未实现。



  • 注意:prefetchSize 和 global 这两项,rabbitmq 没有实现,暂且不研究。特别注意一点,prefetchCount 在 no_ask=false 的情况下才生效,即在自动应答的情况下这两个值是不生效的。


3、如何对消费端进行限流

  • 首先第一步,我们既然要使用消费端限流,我们需要关闭自动 ack,将 autoAck 设置为 falsechannel.basicConsume(queueName, false, consumer);



  • 第二步我们来设置具体的限流大小以及数量。channel.basicQos(0, 15, false);



  • 第三步在消费者的 handleDelivery 消费方法中手动 ack,并且设置批量处理 ack 回应为 truechannel.basicAck(envelope.getDeliveryTag(), true);


import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class QosProducer {
public static void main(String[] args) throws Exception {
//1\. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");

//2\. 通过连接工厂来创建连接
Connection connection = factory.newConnection();

//3\. 通过 Connection 来创建 Channel
Channel channel = connection.createChannel();

//4\. 声明
String exchangeName = "test_qos_exchange";
String routingKey = "item.add";

//5\. 发送
String msg = "this is qos msg";
for (int i = 0; i < 10; i++) {
String tem = msg + " : " + i;
channel.basicPublish(exchangeName, routingKey, null, tem.getBytes());
System.out.println("Send message : " + tem);
}

//6\. 关闭连接
channel.close();
connection.close();
}
}

这里我们创建一个消费者,通过以下代码来验证限流效果以及global参数设置为true时不起作用.。我们通过Thread.sleep(5000);来让 ack 即处理消息的过程慢一些,这样我们就可以从后台管理工具中清晰观察到限流情况。

import com.rabbitmq.client.*;
import java.io.IOException;
public class QosConsumer {
public static void main(String[] args) throws Exception {
//1\. 创建一个 ConnectionFactory 并进行设置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);

//2\. 通过连接工厂来创建连接
Connection connection = factory.newConnection();

//3\. 通过 Connection 来创建 Channel
final Channel channel = connection.createChannel();

//4\. 声明
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "item.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);

channel.basicQos(0, 3, false);

//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);

//5\. 创建消费者并接收消息
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String message = new String(body, "UTF-8");
System.out.println("[x] Received '" + message + "'");

channel.basicAck(envelope.getDeliveryTag(), true);
}
};
//6\. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, false, consumer);
channel.basicConsume(queueName, false, consumer1);
}
}

我们从下图中发现Unacked值一直都是 3 ,每过 5 秒 消费一条消息即 Ready 和 Total 都减少 3,而Unacked的值在这里代表消费者正在处理的消息,通过我们的实验发现了消费者一次性最多处理 3 条消息,达到了消费者限流的预期功能。

当我们将void basicQos(int prefetchSize, int prefetchCount, boolean global)中的 global 设置为true的时候我们发现并没有了限流的作用。

TTL

TTL是Time To Live的缩写,也就是生存时间。RabbitMQ支持消息的过期时间,在消息发送时可以进行指定。RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。

RabbitMQ allows you to set TTL (time to live) for both messages and queues. This can be done using optional queue arguments or policies (the latter option is recommended). Message TTL can be enforced for a single queue, a group of queues or applied for individual messages. RabbitMQ允许您为消息和队列设置TTL(生存时间)。这可以使用可选的队列参数或策略来完成(建议使用后一个选项)。可以对单个队列,一组队列强制执行消息TTL,也可以为单个消息应用消息TTL。 ——摘自 RabbitMQ 官方文档
1、消息的 TTL

我们在生产端发送消息的时候可以在 properties 中指定expiration属性来对消息过期时间进行设置,单位为毫秒(ms)。


* deliverMode 设置为 2 的时候代表持久化消息
* expiration 意思是设置消息的有效期,超过10秒没有被消费者接收后会被自动删除
* headers 自定义的一些属性
//5\. 发送
Map headers = new HashMap();
headers.put("myhead1", "111");
headers.put("myhead2", "222");

AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("100000")
.headers(headers)
.build();
String msg = "test message";
channel.basicPublish("", queueName, properties, msg.getBytes());

我们也可以后台管理页面中进入 Exchange 发送消息指定expiration


2、队列的 TTL

我们也可以在后台管理界面中新增一个 queue,创建时可以设置 ttl,对于队列中超过该时间的消息将会被移除。


死信队列

死信队列:没有被及时消费的消息存放的队列

消息没有被及时消费的原因:


  • 消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

  • TTL(time-to-live) 消息超时未消费

  • 达到最大队列长度


实现死信队列步骤
  • 首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

`Exchange: dlx.exchange
Queue: dlx.queue
RoutingKey: # 代表接收所有路由 key

  • 然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可:arguments.put("x-dead-letter-exchange",' dlx.exchange' )



  • 这样消息在过期、requeue失败、 队列在达到最大长度时,消息就可以直接路由到死信队列!


import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class DlxProducer {
public static void main(String[] args) throws Exception {
//设置连接以及创建 channel 湖绿
String exchangeName = "test_dlx_exchange";
String routingKey = "item.update";

String msg = "this is dlx msg";

//我们设置消息过期时间,10秒后再消费 让消息进入死信队列
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.build();

channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());
System.out.println("Send message : " + msg);

channel.close();
connection.close();
}
}
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class DlxConsumer {
public static void main(String[] args) throws Exception {
//创建连接、创建channel忽略 内容可以在上面代码中获取
String exchangeName = "test_dlx_exchange";
String queueName = "test_dlx_queue";
String routingKey = "item.#";

//必须设置参数到 arguments 中
Map arguments = new HashMap();
arguments.put("x-dead-letter-exchange", "dlx.exchange");

channel.exchangeDeclare(exchangeName, "topic", true, false, null);
//将 arguments 放入队列的声明中
channel.queueDeclare(queueName, true, false, false, arguments);

//一般不用代码绑定,在管理界面手动绑定
channel.queueBind(queueName, exchangeName, routingKey);

//声明死信队列
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
//路由键为 # 代表可以路由到所有消息
channel.queueBind("dlx.queue", "dlx.exchange", "#");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {

String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");

}
};

//6\. 设置 Channel 消费者绑定队列
channel.basicConsume(queueName, true, consumer);
}
}
总结

DLX也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理。

作者:Hai Xiang cnblogs.com/haixiang/p/10905189.html

精品资料,超赞福利!

- 小程序,3000+ 道面试题在线刷,最新、最全 Java 面试题!

期往精选 点击标题可跳转

文章有帮助的话,在看,转发吧!

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
1973年开国上将静坐角落起身让座,偶遇秦基伟,随即被请上主席台

1973年开国上将静坐角落起身让座,偶遇秦基伟,随即被请上主席台

磊子讲史
2026-06-02 17:41:02
CBA重磅转会即将达成!曝顶级前锋加盟山西男篮,曾单场狂砍26+7

CBA重磅转会即将达成!曝顶级前锋加盟山西男篮,曾单场狂砍26+7

老叶评球
2026-06-30 18:56:34
周深演唱会上热到双眼皮贴脱落,发文调侃:郑州把我热回单眼皮,根本粘不住

周深演唱会上热到双眼皮贴脱落,发文调侃:郑州把我热回单眼皮,根本粘不住

鲁中晨报
2026-06-30 10:30:33
Lisa回到韩国,但无任何粉丝去接机,韩国网友希望她摘到KPOP标签

Lisa回到韩国,但无任何粉丝去接机,韩国网友希望她摘到KPOP标签

芊手若
2026-06-30 09:13:22
这个一妻多夫制的民族,晚上怎么过?女人直言:简直就是受罪

这个一妻多夫制的民族,晚上怎么过?女人直言:简直就是受罪

哄动一时啊
2026-06-24 14:26:23
严打升级!8个部门联手严查,这次可不是吓唬人!

严打升级!8个部门联手严查,这次可不是吓唬人!

细说职场
2026-06-30 16:03:10
高市早苗要气炸了:当着众人的面,洪森对中国做出一项永久承诺!

高市早苗要气炸了:当着众人的面,洪森对中国做出一项永久承诺!

蜉蝣说
2026-06-30 09:38:22
苏提达巴黎出尽风头!73岁布丽吉特发型翻车,泰王冷脸看女儿下跪

苏提达巴黎出尽风头!73岁布丽吉特发型翻车,泰王冷脸看女儿下跪

白露文娱志
2026-06-30 15:19:46
iPhone 18 Pro 发布时间来了,售价首次过万!

iPhone 18 Pro 发布时间来了,售价首次过万!

XCiOS俱乐部
2026-06-29 20:31:18
吴月娘:我这浪肉,被男人摸一下真好

吴月娘:我这浪肉,被男人摸一下真好

老达子
2026-06-26 06:50:03
杨瀚森新队友来了!开拓者灰熊达成2换1交易 莫兰特加盟波特兰

杨瀚森新队友来了!开拓者灰熊达成2换1交易 莫兰特加盟波特兰

罗说NBA
2026-06-30 04:39:53
出事了!菲律宾爆发混乱,多人被抓,总统府被围,马科斯乱了

出事了!菲律宾爆发混乱,多人被抓,总统府被围,马科斯乱了

孤城落叶
2026-06-30 13:32:49
确认不打了!CBA最强主教练正式卸任,或加盟广东队取代杜锋?

确认不打了!CBA最强主教练正式卸任,或加盟广东队取代杜锋?

绯雨儿
2026-06-30 12:03:15
1935年刘文辉故意不炸泸定桥,临终含泪说出真相:那十三根铁索是川康百姓的命根子!

1935年刘文辉故意不炸泸定桥,临终含泪说出真相:那十三根铁索是川康百姓的命根子!

白驹谈人机
2026-06-29 11:19:27
爽了爽了!恭喜杨瀚森!曝28岁中锋告别开拓者

爽了爽了!恭喜杨瀚森!曝28岁中锋告别开拓者

篮球实战宝典
2026-06-30 18:13:27
苏提达王后与布丽吉特同穿浅粉造型,同框互动尽显优雅默契

苏提达王后与布丽吉特同穿浅粉造型,同框互动尽显优雅默契

墨薷桃桃
2026-06-30 12:37:05
日本饮料之王:用汉字伪装身份,年收入超60亿,一度被误认是国货

日本饮料之王:用汉字伪装身份,年收入超60亿,一度被误认是国货

小兰聊历史
2026-06-29 12:13:29
山西男篮确定补强,接近签下广厦2米03冠军锋线,常规赛场均6.5分

山西男篮确定补强,接近签下广厦2米03冠军锋线,常规赛场均6.5分

中国篮坛快讯
2026-06-30 19:20:46
世界杯高下立判!巴萨锁定世界最佳中锋!实力碾压阿尔瓦雷斯!

世界杯高下立判!巴萨锁定世界最佳中锋!实力碾压阿尔瓦雷斯!

澜归序
2026-06-30 02:24:47
台湾问题即将突破临界点,两大迹象表明,大陆或要准备出手了?

台湾问题即将突破临界点,两大迹象表明,大陆或要准备出手了?

铭记历史呀
2026-06-29 16:49:31
2026-06-30 22:04:49
Java精选
Java精选
一场永远也演不完的戏
1796文章数 3859关注度
往期回顾 全部

科技要闻

iPhone18 Pro遭泄密!印度代工商惹祸

头条要闻

坎贝尔承认:中国是最成功渡过难关的国家

头条要闻

坎贝尔承认:中国是最成功渡过难关的国家

体育要闻

大热倒灶压力给到法国 王楚揭法国队隐患

娱乐要闻

韩红称要退出公益,多位名人挽留

财经要闻

万亿“寒王”,历史时刻

汽车要闻

奇瑞风云A9探店 五个理由一定来看看

态度原创

健康
游戏
房产
公开课
军事航空

狂吃“糯叽叽”小心肠梗阻!

大的来了!《仙剑4重制版》版号获批:离发售不远了

房产要闻

等了三年!改善顶流实景交付,海口标杆的的答卷来了!

公开课

李玫瑾:为什么性格比能力更重要?

军事要闻

以色列防长:穆杰塔巴已被列入死亡名单

无障碍浏览 进入关怀版