网易首页 > 网易号 > 正文 申请入驻

SpringBoot+Nacos+Kafka简单实现微服务流编排

0
分享至

前言

最近一直在做微服务开发,涉及了一些数据处理模块的开发,每个处理业务都会开发独立的微服务,便于后面拓展和流编排。

学习了 SpringCloud Data Flow 等框架,感觉这个框架对于我们来说太重了,维护起来也比较麻烦,于是根据流编排的思想,基于我们目前的技术栈实现简单的流编排功能。

简单的说,我们希望自己的流编排就是微服务可插拔,微服务数据入口及输出可不停机修改。

准备工作

Nacos 安装及使用入门

自己学习的话推荐使用 docker 安装,命令如下:

拉取镜像:

docker pull nacos/nacos-server


创建服务:

docker run --env MODE=standalone --name nacos -d -p 8848:8848 nacos/nacos-server

然后在浏览器输入 ip:8848/nacos,账号 nacos;密码 nacos。

docker 能够帮助我们快速安装服务,减少再环境准备花的时间。关于docker面试题,公众 号Java精选,回复java面试,获取面试资料。

准备三个 SpringBoot 服务,引入 Nacos 及 Kafka


org.springframework.bootgroupId>
spring-boot-starter-parentartifactId>
2.1.0.RELEASEversion>
parent>


org.springframework.kafkagroupId>
spring-kafkaartifactId>
dependency>


com.alibaba.bootgroupId>
nacos-config-spring-boot-starterartifactId>
0.2.1version>
dependency>

配置文件:

spring:
kafka:
bootstrap-servers: kafka-server:9092
producer:
acks: all
consumer:
group-id: node1-group #三个服务分别为node1 node2 node3
enable-auto- commit: false
# 部署的nacos服务
nacos:
config:
server-addr: nacos- server: 8848

建议配置本机 host 就可以填写 xxx-server 不用填写服务 ip。

业务解读 我们现在需要对三个服务进行编排,保障每个服务可以插拔,也可以调整服务的位置。 示意图如上:

  • node1 服务监听前置服务发送的数据流,输入的 topic 为前置数据服务输出 topic

  • node2 监听 node1 处理后的数据,所以 node2 监听的 topic 为 node1 输出的 topic,node3 同理,最终 node3 处理完成后将数据发送到数据流终点

  • 我们现在要调整流程移除 node2-server,我们只需要把 node1-sink 改变成 node2-sink 即可,这样我们这几个服务就可以灵活的嵌入的不同项目的数据流处理业务中,做到即插即用(当然,数据格式这些业务层面的都是需要约定好的)

  • 动态可调还可以保证服务某一节点出现问题时候,即时改变数据流向,比如发送到数暂存服务,避免 Kafka 中积累太多数据,吞吐不平衡

Nacos 配置 ①创建配置 通常流编排里面每个服务都有一个输入及输出,分别为 input 及 sink,所以每个服务我们需要配置两个 topic,分别是 input-topic output-topic,我们就在 nacos 里面添加输入输出配置。 nacos 配置项需要配置 groupId,dataId,通常我们用服务名称作为 groupId,配置项的名称作为 dataId。 如 node1-server 服务有一个 input 配置项,配置如下:
完成其中一个服务的配置,其它服务参考下图配置即可:
②读取配置

代码如下:

@Configuration
@NacosPropertySource(dataId = "input", groupId = "node1-server", autoRefreshed = true)
// autoRefreshed=true指的是nacos中配置发生改变后会刷新,false代表只会使用服务启动时候读取到的值
@NacosPropertySource(dataId = "sink", groupId = "node1-server", autoRefreshed = true)
public class NacosConfig {

@NacosValue(value = "${input:}", autoRefreshed = true)
private String input;

@NacosValue(value = "${sink:}", autoRefreshed = true)
private String sink;

public String getInput() {
return input;
}

public String getSink() {
return sink;
}
}


③监听配置改变 服务的输入需要在服务启动时候创建消费者,在 topic 发生改变时候重新创建消费者,移除旧 topic 的消费者,输出是业务驱动的,无需监听改变,在每次发送时候读取到的都是最新配置的 topic。 因为在上面的配置类中 autoRefreshed = true,这个只会刷新 nacosConfig 中的配置值,服务需要知道配置改变去驱动消费的创建业务,需要创建 nacos 配置监听。

/**
* 监听Nacos配置改变,创建消费者,更新消费
*/
@Component
public class ConsumerManager {

@Value("${spring.kafka.bootstrap-servers}")
private String servers;

@Value("${spring.kafka.consumer.enable-auto-commit}")
private boolean enableAutoCommit;

@Value("${spring.kafka.consumer.group-id}")
private boolean groupId;

@Autowired
private NacosConfig nacosConfig;

@Autowired
private KafkaTemplate kafkaTemplate;

// 用于存放当前消费者使用的topic
private String topic;

// 用于执行消费者线程,公众 号Java精选,有惊喜
private ExecutorService executorService;

/**
* 监听input
*/
@NacosConfigListener(dataId = "node1-server", groupId = "input")
public void inputListener(String input) {
// 这个监听触发的时候 实际NacosConfig中input的值已经是最新的值了 我们只是需要这个监听触发我们更新消费者的业务
String inputTopic = nacosConfig.getInput();
// 我使用nacosConfig中读取的原因是因为监听到内容是input=xxxx而不是xxxx,如果使用需要自己截取一下,nacosConfig中的内容框架会处理好,大家看一下第一张图的配置内容就明白了
// 先检查当前局部变量topic是否有值,有值代表是更新消费者,没有值只需要创建即可
if(topic != null) {
// 停止旧的消费者线程
executorService.shutdownNow();
executorService == null;
}
// 根据为新的topic创建消费者
topic = inputTopic;
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat(topic + "-pool-%d").build();
executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(2), threadFactory);
// 执行消费业务
executorService.execute(() -> consumer(topic));
}

/**
* 创建消费者
*/
public void consumer(String topic) {
Properties properties = new Properties();
properties.put("bootstrap.servers", servers);
properties.put("enable.auto.commit", enableAutoCommit);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("group.id", groupId);
KafkaConsumer consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Arrays.asList(topic));
try {
while (!Thread.currentThread().isInterrupted()) {
Duration duration = Duration.ofSeconds(1L);
ConsumerRecords records = consumer.poll(duration);
for (ConsumerRecord record : records) {
String message = record.value();
// 执行数据处理业务 省略业务实现
String handleMessage = handle(message);
// 处理完成后发送到下一个节点
kafkaTemplate.send(nacosConfig.getSink(), handleMessage);
}
}
consumer.commitAsync();
}
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
}
}

总结

流编排的思路整体来说就是数据流方向可调,我们以此为需求,根据一些主流框架提供的 api 实现自己的动态调整方案,可以帮助自己更好的理解流编码思想及原理。 在实际业务中,还有许多业务问题需要去突破,我们这样处理更多是因为服务可插拔,便于流处理微服务在项目灵活搭配。 因为我现在工作是在传统公司,由于一些原因很难去推动新框架的使用,经常会用一些现有技术栈组合搞一些 sao 操作,供大家参考,希望大家多多指教。

作者:热黄油啤酒 https://juejin.cn/post/6997704312835047438

公众号“Java精选”所发表内容注明来源的,版权归原出处所有(无法查证版权的或者未注明出处的均来自网络,系转载,转载的目的在于传递更多信息,版权属于原作者。如有侵权,请联系,笔者会第一时间删除处理!

最近有很多人问,有没有读者交流群!加入方式很简单,公众号Java精选,回复“加群”,即可入群!

(微信小程序):3000+道面试题,包含Java基础、并发、JVM、线程、MQ系列、Redis、Spring系列、Elasticsearch、Docker、K8s、Flink、Spark、架构设计等,在线随时刷题!

------ 特别推荐 ------

特别推荐:专注分享最前沿的技术与资讯,为弯道超车做好准备及各种开源项目与高效率软件的公众号,「大咖笔记」,专注挖掘好东西,非常值得大家关注。点击下方公众号卡片关注

文章有帮助的话,点在看,转发吧!

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
不到24小时,立陶宛改口认错,向全世界承认,5年前的决定很愚蠢

不到24小时,立陶宛改口认错,向全世界承认,5年前的决定很愚蠢

boss外传
2026-02-10 18:00:07
眼镜妹为何是花探系列里面人气第一女主?敬业并把工作当爱好

眼镜妹为何是花探系列里面人气第一女主?敬业并把工作当爱好

挪威森林
2026-02-11 12:09:26
54岁刀郎正式上任,职务不一般,成都人民这下有福了

54岁刀郎正式上任,职务不一般,成都人民这下有福了

冷紫葉
2026-02-11 13:20:26
主食换一换,每年疾病少一半?4种主食,最好天天吃,效果惊人!

主食换一换,每年疾病少一半?4种主食,最好天天吃,效果惊人!

路医生健康科普
2026-02-02 22:44:59
离谱!中国生产和制造了几乎所有的东西,但美国经济仍比中国强大

离谱!中国生产和制造了几乎所有的东西,但美国经济仍比中国强大

小蒋爱唠嗑
2026-02-02 23:18:10
24岁摆摊,34岁25亿,44岁自杀:钱命有定数

24岁摆摊,34岁25亿,44岁自杀:钱命有定数

随梦而飞起
2026-02-09 20:23:09
西部排名又变了:马刺高歌猛进,火箭坐收好礼,2队排名互换

西部排名又变了:马刺高歌猛进,火箭坐收好礼,2队排名互换

篮球大视野
2026-02-11 15:55:03
不到24小时,巴拿马果然扛不住了!可以还港口,却有一个前提条件

不到24小时,巴拿马果然扛不住了!可以还港口,却有一个前提条件

科学发掘
2026-02-11 11:26:53
1951年,志愿军营长准备处决副连长,团政委出面阻拦,师长当即下令:就地正法!

1951年,志愿军营长准备处决副连长,团政委出面阻拦,师长当即下令:就地正法!

史海孤雁
2026-02-10 16:21:06
今晚19:30,U17国足再战亚洲杯二档强队,剑指双杀,直播平台如下

今晚19:30,U17国足再战亚洲杯二档强队,剑指双杀,直播平台如下

侃球熊弟
2026-02-11 06:30:03
乌克兰故地重游攻入库尔斯克的苏贾!突袭俄空降兵指挥部

乌克兰故地重游攻入库尔斯克的苏贾!突袭俄空降兵指挥部

项鹏飞
2026-02-10 17:47:13
现货白银突破82美元!世界白银协会:全球白银市场预计2026年将迎来连续第六年供应短缺

现货白银突破82美元!世界白银协会:全球白银市场预计2026年将迎来连续第六年供应短缺

每日经济新闻
2026-02-11 17:32:22
曝北控接近签下超级外援!曾在CBA场均29+7+6,张庆鹏终于出手了

曝北控接近签下超级外援!曾在CBA场均29+7+6,张庆鹏终于出手了

老叶评球
2026-02-11 16:59:36
毛主席纪念堂发布重要公告

毛主席纪念堂发布重要公告

环球时报国际
2026-02-10 17:09:37
场均25+4+7!哈登该入选全明星,加内特发出请求,他190俱乐部了

场均25+4+7!哈登该入选全明星,加内特发出请求,他190俱乐部了

巴叔GO聊体育
2026-02-11 16:30:25
白鹿活动现场惊现尴尬!牛仔裤裆部肿大,女生们会怎么反应?

白鹿活动现场惊现尴尬!牛仔裤裆部肿大,女生们会怎么反应?

白宸侃片
2026-02-11 18:01:11
天空:热刺降级可能性越来越大,赢下欧冠却英超降级还真不是笑话

天空:热刺降级可能性越来越大,赢下欧冠却英超降级还真不是笑话

砚底沉香
2026-02-11 13:33:42
朱元璋斩了十几万贪官,贪官却越来越多,雍正只用2招就药到病除

朱元璋斩了十几万贪官,贪官却越来越多,雍正只用2招就药到病除

铭记历史呀
2026-02-11 13:00:33
房贷好像没有几年前那么恐怖了?网友晒55万贷款,评论区炸锅了

房贷好像没有几年前那么恐怖了?网友晒55万贷款,评论区炸锅了

夜深爱杂谈
2026-02-10 23:04:40
为何高市早苗即将辞职?

为何高市早苗即将辞职?

寰宇大观察
2026-02-11 15:56:27
2026-02-11 19:11:00
Java精选
Java精选
一场永远也演不完的戏
1771文章数 3859关注度
往期回顾 全部

科技要闻

痛失两位华裔大佬!马斯克为何留不住人心

头条要闻

百万粉丝网红起诉用6年的助理 对方直播获超百万打赏

头条要闻

百万粉丝网红起诉用6年的助理 对方直播获超百万打赏

体育要闻

搞垮一个冬奥选手,只需要一首歌?

娱乐要闻

汪峰吃惊!章子怡年前6天高调官宣喜讯

财经要闻

习酒节前价格雪崩控量稳价变空谈

汽车要闻

比亚迪最美B级SUV? 宋Ultra这腰线美翻了

态度原创

艺术
游戏
教育
时尚
手机

艺术要闻

砸200亿,郎酒庄园建在800米悬崖上,实景震撼,真没吹牛!

《海虎》何时来的《RE》官媒展示里昂帅照闹笑话

教育要闻

校长说|孙先亮:AI时代的教育回归,让每个生命自主生长

冬季穿出高级感,全靠这3个招数简单好懂,中年女人赶紧照搬

手机要闻

iQOO年货节双机优选:旗舰巅峰与性价比王者任选

无障碍浏览 进入关怀版