网易首页 > 网易号 > 正文 申请入驻

使用 Kafka 和 Spring Boot 开发全栈 Java 应用程序

0
分享至

每日分享最新,最流行的软件开发知识与最新行业趋势,希望大家能够一键三连,多多支持,跪求关注,点赞,留言。
本教程展示了如何在 Spring Boot 应用程序中发布和订阅 Kafka 消息,以及如何在浏览器中实时显示消息。

你将构建什么
您将构建一个通过 Kafka 发送和接收消息的全栈响应式 Web 应用程序。该应用程序在服务器上使用 Spring Boot 和 Java,在客户端使用 Lit 和 TypeScript,以及用于组件和通信的 Hilla 框架。


带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。
你需要什么
20分钟
Java 11 或更新版本
节点 16.14 或更高版本
一个同时支持 Java 和 TypeScript 的 IDE,例如VS Code。
技术概述
卡夫卡
Apache Kafka 是一个分布式事件流平台。您可以将其视为类固醇上的发布/订阅系统。Kafka 生产者可以向主题发送消息,然后消费者可以读取这些消息。但是,与大多数 pub/sub 系统不同,当您阅读这些消息时,它们不会从主题中删除。这允许您执行流处理以实时分析、聚合或转换来自不同事件的数据。
如果您想了解 Kafka 的基础知识,我强烈建议您观看 Tim Berglund 的视频:
Spring Boot 和 Spring Kafka
Spring Boot 是一种使用 Spring 的固执己见的方式。它通过依赖于配置的约定将配置代码的数量减少到最低限度。此外,Spring Kafka 增加了对配置 Kafka 生产者和消费者以及通过注解方法监听传入消息的支持。
Hilla
Hilla 是一个为 Java 构建的前端框架。它结合了 Spring Boot 后端和 Lit 内置的响应式 TypeScript 前端。Hilla 会根据您的服务器端点签名自动生成 TypeScript 类型,这有助于在您开发应用程序时保持前端和后端同步。
下载并运行 Kafka
本教程使用本地 Kafka 代理。按照以下步骤在您的计算机上下载并启动 Kafka:
进入Kafka下载页面,下载Kafka。
提取下载的存档tar -xzf kafka<,version>.tgz
打开目录cd kafka_
启动 Zookeeper 管理本地 Kafka 集群bin/zookeeper-server-start.sh config/zookeeper.properties
打开第二个终端并运行bin/kafka-server-start.sh config/server.properties以启动 Kafka 代理。
您现在已经运行了 Kafka,并准备开始构建您的应用程序。
创建一个新项目
首先创建一个新的 Hilla 项目。这将为您提供一个配置了 TypeScript-Lit 前端的 Spring Boot 项目。
使用 Vaadin CLI 初始化项目:npx @vaadin/cli init --hilla --empty hilla-kafka
在您选择的 IDE 中打开项目。
使用包含的 Maven 包装器启动应用程序。该命令将下载 Maven 和 npm 依赖项并启动开发服务器。注意:初始启动可能需要几分钟时间。然而,随后的启动几乎是瞬间的。./mvnw
添加 Kafka Spring 依赖项
pom.xml通过在文件中包含以下依赖项,向应用程序添加 Kafka 支持:
org.springframework.kafka
spring-kafka
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
父 Spring Boot BOM 涵盖了依赖项,因此您无需显式添加版本号。
定义数据模型
首先创建一个新的 Java 包:com.example.application.model.
在这个新创建的包中,创建一个新的 Java 类 ,Message.java来表示您将通过 Kafka 发送的消息。然后,将以下内容添加到类中:
package com.example.application.model;
import java.time.Instant;
import dev.hilla.Nonnull;
public class Message {
private @Nonnull String text;
private Instant time;
private @Nonnull String userName;
public String getText() {
return text;
public void setText(String text) {
this.text = text;
public Instant getTime() {
return time;
public void setTime(Instant time) {
this.time = time;
public String getUserName() {
return userName;
public void setUserName(String userName) {
this.userName = userName;

Hilla 框架使用@Nonnull注解来指导 TypeScript 类型生成:它们对 Java 行为没有影响。
使用 Kafka 发送自定义对象
在本教程中,您将发送 Java 对象作为消息,而不是使用像字符串或数字这样的原始方法。为此,您需要创建自定义序列化器和反序列化器。
在同一个包中,创建以下两个新类,MessageSerializer.java并MessageDeserializer.java具有以下内容:
package com.example.application.model;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
public class MessageSerializer implements Serializer {
public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();
@Override
public byte[] serialize(String topic, Message message) {
try {
return mapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SerializationException(e);

package com.example.application.model;
import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
public class MessageDeSerializer implements Deserializer {
public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();
@Override
public Message deserialize(String topic, byte[] data) {
try {
return mapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);

序列化器和反序列化器使用 Jackson 将对象与 JSON 进行转换。findAndAddModules()builder 方法允许 Jackson 通过您添加的依赖项支持 JSR310 数据类型。
配置Kafka
接下来,通过将以下内容添加到src/main/resources/application.properties文件中来配置 Kafka:
# A custom property to hold the name of our Kafka topic:
topic.name=chat
# Set up Kafka:
spring.kafka.bootstrap-servers=localhost:9092
# Configure the consumer:
spring.kafka.consumer.client-id=chat-consumer
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.application.model.MessageDeSerializer
# Configure the producer:
spring.kafka.producer.client-id=chat-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.application.model.MessageSerializer
更新Application.java以编程方式配置主题。
package com.example.application;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import com.vaadin.flow.component.page.AppShellConfigurator;
import com.vaadin.flow.server.PWA;
import com.vaadin.flow.theme.Theme;
* The entry point of the Spring Boot application.
* Use the @PWA annotation to make the application installable on phones, tablets, and some desktop
* browsers.

@SpringBootApplication
@Theme(value = "hilla-kafka")\
@PWA(name = "hilla-kafka", shortName = "hilla-kafka", offlineResources = {})
@Configuration
public class Application implements AppShellConfigurator {
@Value("${topic.name}")
private String topicName;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
@Bean
NewTopic chatTopic() {
return TopicBuilder
.name(topicName)
.partitions(1)
.replicas(1)
.build();

以下是解释的基本部分:
通过 Spring 注入主题名称。
使用 TopicBuilder bean 配置来定义和配置新主题。在这个示例应用程序中,您只设置了一个分区和一个副本。在实际应用中,您会希望设置更多的分区和副本,以确保集群运行良好且可靠。
创建服务器端点
您现在已准备好开始使用 Kafka。接下来,创建将与 Kafka 代理和客户端 Web 应用程序通信的服务器端点。
MessageEndpoint.java,在包中创建一个新的 Java 文件com.example.application并将以下代码添加到其中:
package com.example.application;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import com.example.application.model.Message;
import com.vaadin.flow.server.auth.AnonymousAllowed;
import dev.hilla.Endpoint;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
@Endpoint
@AnonymousAllowed
public class MessageEndpoint {
@Value("${topic.name}")
private String topicName;
private final Many chatSink;
private final Flux chat;
private final KafkaTemplate kafkaTemplate;
MessageEndpoint(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
chatSink = Sinks.many().multicast().directBestEffort();
chat = chatSink.asFlux();
public Flux join() {
return chat;
public void send(Message message) {
message.setTime(Instant.now());
kafkaTemplate.send(topicName, message);
@KafkaListener(topics = "chat", groupId = "chat-group")
private void consumer(Message message) {
chatSink.emitNext(message,
(signalType, emitResult) -> emitResult == EmitResult.FAIL_NON_SERIALIZED);

以下是解释的基本部分:
@Endpoint 注释告诉 Hilla 将所有公共方法作为 TypeScript 方法提供给客户端。@AnonymousAllowed 关闭此端点的身份验证。
chatSink 是一种将数据传递给系统的编程方式。它发出消息,以便任何订阅了相关聊天 Flux 的客户端都会收到它们。
构造函数获取 Spring 注入的 KafkaTemplate 并将其保存到字段中。
join() 方法返回您将在客户端订阅的聊天 Flux。
send() 方法接收一条消息,用发送时间标记它,然后使用 kafkaTemplate 发送它。
consumer() 方法有一个 @KafkaListener 注释,它告诉 Spring Kafka 在传入消息上运行此方法。该方法将接收到的消息发送到 chatSink,它会通知所有订阅了聊天 Flux 的客户端。
启用反应式端点
在编写本教程 (1.2) 时,Hilla 的当前版本通过功能标志支持 Flux 端点方法。通过创建一个src/main/resources/vaadin-featureflags.properties,包含以下内容的新文件来启用该功能:
# Push support in Hilla
com.vaadin.experimental.hillaPush=true
创建用于发送和接收消息的视图
现在您已经配置了 Kafka 并设置了服务器以发送和接收消息,最后一步是创建一个可用于发送和接收消息的 Web 视图。
Hilla 包括 Vaadin 组件集,其中包含 40 多个组件。您可以使用和组件来构建主聊天 UI。您还可以使用该组件来捕获当前用户的姓名。
Hilla 使用 Lit 创建视图。Lit 在概念上类似于 React:组件由状态和模板组成。每当状态发生变化时,模板都会重新呈现。
首先重命名生成的占位符视图。frontend/views/empty/empty-view.ts将文件夹和文件重命名为frontend/views/messages/messages-view.ts. 用以下代码替换文件的内容:
import { View } from "Frontend/views/view";
import { customElement, state } from "lit/decorators.js";
import { html } from "lit";
import "@vaadin/message-list";
import "@vaadin/message-input";
import "@vaadin/text-field";
import { TextFieldChangeEvent } from "@vaadin/text-field";
import { MessageEndpoint } from "Frontend/generated/endpoints";
import Message from "Frontend/generated/com/example/application/model/Message";
@customElement("messages-view")
export class MessagesView extends View {
@state() messages: Message[] = [];
@state() userName = "";
render() {
return html`
Kafka message center
class="flex-grow"
.items=${this.messages}


placeholder="Your name"
@change=${this.userNameChange}
class="flex-grow"
@submit=${this.submit}



userNameChange(e: TextFieldChangeEvent) {
this.userName = e.target.value;
async submit(e: CustomEvent) {
MessageEndpoint.send({
text: e.detail.value,
userName: this.userName,

connectedCallback() {
super.connectedCallback();
this.classList.add("flex", "flex-col", "h-full", "box-border");
MessageEndpoint.join().onNext(
(message) => (this.messages = [...this.messages, message])

以下是解释的基本部分:
Lit 跟踪 @state() 修饰的属性,并且只要它们发生变化,模板就会重新渲染。
Message 数据类型由 Hilla 根据您在服务器上创建的 Java 对象生成。
消息列表通过 .items=${this.messages} 绑定到消息列表组件。items 前面的句点告诉 Lit 将数组作为属性而不是属性传递。
每当使用 @change=${this.userNameChange} 更改值时,文本字段都会调用 userNameChange 方法(@ 表示事件侦听器)。
消息输入组件在提交时调用 MessageEndpoint.send()。请注意,您正在调用 TypeScript 方法。Hilla 负责调用服务器上的底层 Java 方法。
最后,在 connectedCallback 中调用 MessageEndpoint.join() 开始接收传入的聊天消息。
除了 Vaadin 组件之外,您还使用Hilla CSS 实用程序类进行基本布局(flex、flex-grow、flex-col)。
最后,更新路由以匹配视图的新名称。将 的内容替换为routes.ts以下内容:
import { Route } from "@vaadin/router";
import "./views/messages/messages-view";
export const routes: Route[] = [{ path: "", component: "messages-view" }];
运行已完成的应用程序
如果您的应用程序仍在运行,请重新启动它。服务器启动后,您可以通过 http://localhost:8080 访问应用程序。尝试在多个浏览器中打开应用程序,以查看所有浏览器实时显示的消息。
./mvnw


带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
广西多名钓友钓到“有奖回收”海鱼,水产研究所回应

广西多名钓友钓到“有奖回收”海鱼,水产研究所回应

齐鲁壹点
2024-06-05 21:58:17
投资者签证要落地了!日本正式宣布设立“四大经济特区”,有钱就能拿身份的一天到来了吗

投资者签证要落地了!日本正式宣布设立“四大经济特区”,有钱就能拿身份的一天到来了吗

掘金日本房产
2024-06-05 20:14:22
“新王登基”,魏哲家将带领台积电走向何方?

“新王登基”,魏哲家将带领台积电走向何方?

观察者网
2024-06-06 08:36:16
沁县公职人员遇刺:熟人作案;凶手籍籍无名,死者风云人物

沁县公职人员遇刺:熟人作案;凶手籍籍无名,死者风云人物

大师兄爱写作
2024-06-05 19:53:00
高岗自杀,毛主席不敢相信,对周总理大发脾气:问题搞得太复杂了

高岗自杀,毛主席不敢相信,对周总理大发脾气:问题搞得太复杂了

今人说古
2024-06-05 23:40:59
最高检原机关党委副书记、纪委书记张秀杰:建议采取五大举措打击“反催收”黑产

最高检原机关党委副书记、纪委书记张秀杰:建议采取五大举措打击“反催收”黑产

中国经营报
2024-06-06 08:16:44
马斯克毁了原始部落!星链计划接入9个月,村民沉迷色情片行为大变

马斯克毁了原始部落!星链计划接入9个月,村民沉迷色情片行为大变

小星球探索
2024-06-05 22:18:18
允许乌使用美武器攻击俄本土,美国不信会有“致命后果”?

允许乌使用美武器攻击俄本土,美国不信会有“致命后果”?

新民晚报
2024-06-05 18:23:41
7月3日起,法国所有瓶盖和瓶身必须锁死!

7月3日起,法国所有瓶盖和瓶身必须锁死!

新欧洲
2024-06-05 20:00:15
25岁姆巴佩女友曝光:29岁嫩模,带着2孩子,与荷兰国脚合住8年!

25岁姆巴佩女友曝光:29岁嫩模,带着2孩子,与荷兰国脚合住8年!

风过乡
2024-06-06 08:04:47
“OK哥”潜规则丑闻劲爆网络!爆料人称已有两名女同事被他潜了…

“OK哥”潜规则丑闻劲爆网络!爆料人称已有两名女同事被他潜了…

火山诗话
2024-06-06 06:12:20
哈兰德帽子戏法,一战横扫质疑!3-0欧洲鱼腩,挪威主场大捷

哈兰德帽子戏法,一战横扫质疑!3-0欧洲鱼腩,挪威主场大捷

我的护球最独特
2024-06-06 02:59:01
17岁小伙看色情电影,竟对46岁阿姨做下流动作,结果出大事了

17岁小伙看色情电影,竟对46岁阿姨做下流动作,结果出大事了

胖胖侃咖
2024-06-06 08:00:06
2023年日本总和生育率降至1.2 创历史最低

2023年日本总和生育率降至1.2 创历史最低

财联社
2024-06-05 13:28:04
河南中小学教师工资竟然这么低!教龄30年收入不到6000,别的省份月入过万

河南中小学教师工资竟然这么低!教龄30年收入不到6000,别的省份月入过万

互联网大聪明
2024-06-06 10:23:46
菲总统“怒”给谁看?

菲总统“怒”给谁看?

直新闻
2024-06-05 17:16:54
本周重大事件:香港楼价跌回“撤辣”前,中国跨境电商或遭美国重挫

本周重大事件:香港楼价跌回“撤辣”前,中国跨境电商或遭美国重挫

一周财经观察
2024-06-05 17:57:38
领导潜规则女下属后续:公司曝光,女主真容被扒,离职员工曝猛料

领导潜规则女下属后续:公司曝光,女主真容被扒,离职员工曝猛料

360度评说
2024-06-05 20:30:22
太难了!网传广汽启动全员销售模式,汽车圈的内卷“苦不堪言”…

太难了!网传广汽启动全员销售模式,汽车圈的内卷“苦不堪言”…

火山诗话
2024-06-06 06:51:38
又一办公室不雅行为!摄像头忘关视频被曝光,女子身份引热议

又一办公室不雅行为!摄像头忘关视频被曝光,女子身份引热议

影孖看世界
2024-06-04 23:13:52
2024-06-06 10:52:49
墨谈科技
墨谈科技
业务数码玩家.无聊的博主
2941文章数 563关注度
往期回顾 全部

科技要闻

对话戴文渊:大模型价格战不解决核心问题

头条要闻

媒体:欧盟高官称对华贸易要"公平平衡" 其实话中有话

头条要闻

媒体:欧盟高官称对华贸易要"公平平衡" 其实话中有话

体育要闻

赴美试训的崔永熙,表现究竟怎么样?

娱乐要闻

看这场笑话,经过王一博同意了吗!

财经要闻

特步和七匹狼世纪联姻 家族身价超600亿?

汽车要闻

2025年将推10款新车 长安启源7个月累销破10万

态度原创

家居
旅游
手机
公开课
军事航空

家居要闻

温室余闲 平仄之间雅趣浓

旅游要闻

墨西哥一游客与蒸汽火车自拍 被火车撞击当场身亡

手机要闻

苹果Vision Pro开发者应用包含WWDC视频的沉浸式环境

公开课

近视只是视力差?小心并发症

军事要闻

乌方:俄乌冲突升级 所有“红线”不复存在

无障碍浏览 进入关怀版