网易首页 > 网易号 > 正文 申请入驻

SpringBoot+Nacos+Kafka简单实现微服务流编排

0
分享至

前言

最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排。

学习了 SpringCloud Data Flow 等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能。

简单的说,我们希望自己的流编排就是微服务可插拔,微服务数据入口及输出可不停机修改。

准备工作

Nacos 安装及使用入门

自己学习的话推荐使用 docker 安装,命令如下:

拉取镜像:

docker pull nacos/nacos-server


创建服务:

docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

然后在浏览器输入 ip:8848/nacos,账号 nacos;密码 nacos。

docker 能够帮助我们快速安装服务,减少再环境准备花的时间。关于docker面试题,公众 号Java精选,回复java面试,获取面试资料。

准备三个 SpringBoot 服务,引入 Nacos 及 Kafka


org.springframework.bootgroupId>
spring-boot-starter-parentartifactId>
2.1.0.RELEASEversion>
parent>


org.springframework.kafkagroupId>
spring-kafkaartifactId>
dependency>


com.alibaba.bootgroupId>
nacos-config-spring-boot-starterartifactId>
0.2.1version>
dependency>

配置文件:

spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group #三个服务分别为node1 node2 node3
enable-auto- commit: false
# 部署的nacos服务
nacos:
config:
server-addr: nacos- server: 8848

建议配置本机 host 就可以填写 xxx-server 不用填写服务 ip。

业务解读 我们现在需要对三个服务进行编排,保障每个服务可以插拔,也可以调整服务的位置。 示意图如上:

  • node1 服务监听前置服务发送的数据流,输入的 topic 为前置数据服务输出 topic

  • node2 监听 node1 处理后的数据,所以 node2 监听的 topic 为 node1 输出的 topic,node3 同理,最终 node3 处理完成后将数据发送到数据流终点

  • 我们现在要调整流程移除 node2-server,我们只需要把 node1-sink 改变成 node2-sink 即可,这样我们这几个服务就可以灵活的嵌入的不同项目的数据流处理业务中,做到即插即用(当然,数据格式这些业务层面的都是需要约定好的)

  • 动态可调还可以保证服务某一节点出现问题时候,即时改变数据流向,比如发送到数暂存服务,避免 Kafka 中积累太多数据,吞吐不平衡

Nacos 配置 ①创建配置 通常流编排里面每个服务都有一个输入及输出,分别为 input 及 sink,所以每个服务我们需要配置两个 topic,分别是 input-topic output-topic,我们就在 nacos 里面添加输入输出配置。 nacos 配置项需要配置 groupId,dataId,通常我们用服务名称作为 groupId,配置项的名称作为 dataId。 如 node1-server 服务有一个 input 配置项,配置如下:
完成其中一个服务的配置,其它服务参考下图配置即可:
②读取配置

代码如下:

@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
// autoRefreshed=true指的是nacos中配置发生改变后会刷新,false代表只会使用服务启动时候读取到的值
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {

@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;

@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;

public String getInput() {
return input;
}

public String getSink() {
return sink;
}
}


③监听配置改变 服务的输入需要在服务启动时候创建消费者,在 topic 发生改变时候重新创建消费者,移除旧 topic 的消费者,输出是业务驱动的,无需监听改变,在每次发送时候读取到的都是最新配置的 topic。 因为在上面的配置类中 autoRefreshed = true,这个只会刷新 nacosConfig 中的配置值,服务需要知道配置改变去驱动消费的创建业务,需要创建 nacos 配置监听。

/**
* 监听Nacos配置改变,创建消费者,更新消费
*/
@Component
public class ConsumerManager {

@Value("${spring.kafka.bootstrap-servers}")
private String servers;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;

@Value("${spring.kafka.consumer.group-id}")
private boolean groupId;

@Autowired
private NacosConfig nacosConfig;

@Autowired
private KafkaTemplate kafkaTemplate;

// 用于存放当前消费者使用的topic
private String topic;

// 用于执行消费者线程,公众 号Java精选,有惊喜
private ExecutorService executorService;

/**
* 监听input
*/
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
// 这个监听触发的时候 实际NacosConfig中input的值已经是最新的值了 我们只是需要这个监听触发我们更新消费者的业务
String inputTopic = nacosConfig.getInput();
// 我使用nacosConfig中读取的原因是因为监听到内容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的内容框架会处理好,大家看一下第一张图的配置内容就明白了
// 先检查当前局部变量topic是否有值,有值代表是更新消费者,没有值只需要创建即可
if(topic != null) {
// 停止旧的消费者线程
executorService.shutdownNow();
executorService == null;
}
// 根据为新的topic创建消费者
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2), threadFactory);
// 执行消费业务
executorService.execute(() -> consumer(topic));
}

/**
* 创建消费者
*/
public void consumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", servers);
properties.put("enable.auto.commit", enableAutoCommit);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", groupId);
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
Duration duration = Duration.ofSeconds(1L);
ConsumerRecords records = consumer.poll(duration);
for (ConsumerRecord record : records) {
String message = record.value();
// 执行数据处理业务 省略业务实现
String handleMessage = handle(message);
// 处理完成后发送到下一个节点
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}

总结

流编排的思路整体来说就是数据流方向可调,我们以此为需求,根据一些主流框架提供的 api 实现自己的动态调整方案,可以帮助自己更好的理解流编码思想及原理。 在实际业务中,还有许多业务问题需要去突破,我们这样处理更多是因为服务可插拔,便于流处理微服务在项目灵活搭配。 因为我现在工作是在传统公司,由于一些原因很难去推动新框架的使用,经常会用一些现有技术栈组合搞一些 sao 操作,供大家参考,希望大家多多指教。

作者:热黄油啤酒 https://juejin.cn/post/6997704312835047438

公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!

最近有很多人问,有没有读者交流群!加入方式很简单,公众号Java精选,回复“加群”,即可入群!

(微信小程序):3000+道面试题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计等,在线随时刷题!

------ 特别推荐 ------

特别推荐:专注分享最前沿的技术与资讯,为弯道超车做好准备及各种开源项目与高效率软件的公众号,「大咖笔记」,专注挖掘好东西,非常值得大家关注。点击下方公众号卡片关注

文章有帮助的话,点在看,转发吧!

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
燃油车4s店被挤爆,新能源4s店却只进来一个人

燃油车4s店被挤爆,新能源4s店却只进来一个人

水滴汽车App
2025-12-22 20:00:07
陪睡陪玩只是冰山一角!万达蒸发800亿后,王思聪再次传出大丑闻

陪睡陪玩只是冰山一角!万达蒸发800亿后,王思聪再次传出大丑闻

振华观史
2025-12-24 12:21:20
性能力与寿命关系被发现!男性40岁后,睾酮越高,死亡风险越低

性能力与寿命关系被发现!男性40岁后,睾酮越高,死亡风险越低

药师说健康
2025-12-05 09:47:10
女孩打翻水杯后续:母女道歉,正脸被扒已社死,女孩状态让人担忧

女孩打翻水杯后续:母女道歉,正脸被扒已社死,女孩状态让人担忧

奇思妙想草叶君
2025-12-24 15:17:36
就在今天!12月24日上午,广东男篮传来徐杰、萨林杰新消息!

就在今天!12月24日上午,广东男篮传来徐杰、萨林杰新消息!

皮皮观天下
2025-12-24 11:15:35
李璇:徐正源在中甲薪资税前70万美元,后涨至280万美元左右

李璇:徐正源在中甲薪资税前70万美元,后涨至280万美元左右

懂球帝
2025-12-24 11:55:31
外国游客将婴儿“寄存”给保安 中国宝妈帮忙换尿不湿哄娃 景区回应:证明咱们工作人员认真负责

外国游客将婴儿“寄存”给保安 中国宝妈帮忙换尿不湿哄娃 景区回应:证明咱们工作人员认真负责

闪电新闻
2025-12-24 14:34:08
李彦宏急了,百度大变天

李彦宏急了,百度大变天

中国企业家杂志
2025-12-23 18:31:16
这四个生肖的人,注定命苦一生,一辈子忙忙碌碌却无所成

这四个生肖的人,注定命苦一生,一辈子忙忙碌碌却无所成

屏儿爱读书
2025-02-18 09:11:18
郑爽第一次公开露脸!她安全了?

郑爽第一次公开露脸!她安全了?

八卦疯叔
2025-12-24 11:06:14
美军悍然扣押中方油轮!大陆拦截美对台岛军售的船只,时机已到了

美军悍然扣押中方油轮!大陆拦截美对台岛军售的船只,时机已到了

大国观察眼
2025-12-22 00:10:52
不到24小时,高市连迎3大噩耗,火箭熄火,中日46条航线取消

不到24小时,高市连迎3大噩耗,火箭熄火,中日46条航线取消

碧珠映红香
2025-12-23 16:15:33
朱孝天回应阿信:场面话,听听就算了吧

朱孝天回应阿信:场面话,听听就算了吧

枫尘余往逝
2025-12-23 20:11:43
马斯克个人净资产飙升至7490亿美元,约合5.28万亿元人民币,活到80岁每天要花1.8亿元才能花完

马斯克个人净资产飙升至7490亿美元,约合5.28万亿元人民币,活到80岁每天要花1.8亿元才能花完

观威海
2025-12-24 15:46:29
吕良伟70岁大寿众星云集!向太憨笑,甄子丹岳母比女儿还显年轻

吕良伟70岁大寿众星云集!向太憨笑,甄子丹岳母比女儿还显年轻

裕丰娱间说
2025-12-24 18:41:45
辽宁为莫兰德庆生!杨鸣露肩,本人举蛋糕笑开花,已成球队支柱!

辽宁为莫兰德庆生!杨鸣露肩,本人举蛋糕笑开花,已成球队支柱!

篮球资讯达人
2025-12-24 18:36:01
用脚投票,全球开发者最喜欢的大模型排行榜,Grok遥遥领先,DeepSeek 第五

用脚投票,全球开发者最喜欢的大模型排行榜,Grok遥遥领先,DeepSeek 第五

机器学习与Python社区
2025-12-22 20:09:40
公主远嫁波斯,中途突然怀孕,就地建国,如今此地是中国领土!

公主远嫁波斯,中途突然怀孕,就地建国,如今此地是中国领土!

铭记历史呀
2025-12-13 17:03:07
郑丽文韩国瑜联手清党渣,侯友宜卢秀燕罕见求和,国民党或将翻盘

郑丽文韩国瑜联手清党渣,侯友宜卢秀燕罕见求和,国民党或将翻盘

书纪文谭
2025-12-24 15:34:03
全国最好的医院排名,建议收藏!

全国最好的医院排名,建议收藏!

霹雳炮
2025-11-24 22:55:34
2025-12-24 21:51:00
Java精选
Java精选
一场永远也演不完的戏
1766文章数 3859关注度
往期回顾 全部

科技要闻

智谱和MiniMax拿出了“血淋淋”的账本

头条要闻

韩国财阀千金在柬埔寨被捕 被指涉嫌参与性交易和贩毒

头条要闻

韩国财阀千金在柬埔寨被捕 被指涉嫌参与性交易和贩毒

体育要闻

26岁广西球王,在质疑声中成为本土得分王

娱乐要闻

怀孕增重30斤!阚清子惊传诞一女夭折?

财经要闻

北京进一步放松限购 沪深是否会跟进?

汽车要闻

“运动版库里南”一月份亮相   或命名极氪9S

态度原创

亲子
时尚
健康
房产
旅游

亲子要闻

52岁王小骞泪谈早产经历,孕8个月羊水早破,住院保胎10天剖宫产

快点告别“妈妈装”!50+女性的冬季穿搭灵感,每一套都超美

这些新疗法,让化疗不再那么痛苦

房产要闻

硬核!央企海口一线江景顶流红盘,上演超预期交付!

旅游要闻

北海市海丝首港将上演跨年烟花秀

无障碍浏览 进入关怀版