每日分享最新,最流行的软件开发知识与最新行业趋势,希望大家能够一键三连,多多支持,跪求关注,点赞,留言。
本教程展示了如何在 Spring Boot 应用程序中发布和订阅 Kafka 消息,以及如何在浏览器中实时显示消息。
你将构建什么
您将构建一个通过 Kafka 发送和接收消息的全栈响应式 Web 应用程序。该应用程序在服务器上使用 Spring Boot 和 Java,在客户端使用 Lit 和 TypeScript,以及用于组件和通信的 Hilla 框架。
带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。
你需要什么
20分钟
Java 11 或更新版本
节点 16.14 或更高版本
一个同时支持 Java 和 TypeScript 的 IDE,例如VS Code。
技术概述
卡夫卡
Apache Kafka 是一个分布式事件流平台。您可以将其视为类固醇上的发布/订阅系统。Kafka 生产者可以向主题发送消息,然后消费者可以读取这些消息。但是,与大多数 pub/sub 系统不同,当您阅读这些消息时,它们不会从主题中删除。这允许您执行流处理以实时分析、聚合或转换来自不同事件的数据。
如果您想了解 Kafka 的基础知识,我强烈建议您观看 Tim Berglund 的视频:
Spring Boot 和 Spring Kafka
Spring Boot 是一种使用 Spring 的固执己见的方式。它通过依赖于配置的约定将配置代码的数量减少到最低限度。此外,Spring Kafka 增加了对配置 Kafka 生产者和消费者以及通过注解方法监听传入消息的支持。
Hilla
Hilla 是一个为 Java 构建的前端框架。它结合了 Spring Boot 后端和 Lit 内置的响应式 TypeScript 前端。Hilla 会根据您的服务器端点签名自动生成 TypeScript 类型,这有助于在您开发应用程序时保持前端和后端同步。
下载并运行 Kafka
本教程使用本地 Kafka 代理。按照以下步骤在您的计算机上下载并启动 Kafka:
进入Kafka下载页面,下载Kafka。
提取下载的存档tar -xzf kafka<,version>.tgz
打开目录cd kafka_
启动 Zookeeper 管理本地 Kafka 集群bin/zookeeper-server-start.sh config/zookeeper.properties
打开第二个终端并运行bin/kafka-server-start.sh config/server.properties以启动 Kafka 代理。
您现在已经运行了 Kafka,并准备开始构建您的应用程序。
创建一个新项目
首先创建一个新的 Hilla 项目。这将为您提供一个配置了 TypeScript-Lit 前端的 Spring Boot 项目。
使用 Vaadin CLI 初始化项目:npx @vaadin/cli init --hilla --empty hilla-kafka
在您选择的 IDE 中打开项目。
使用包含的 Maven 包装器启动应用程序。该命令将下载 Maven 和 npm 依赖项并启动开发服务器。注意:初始启动可能需要几分钟时间。然而,随后的启动几乎是瞬间的。./mvnw
添加 Kafka Spring 依赖项
pom.xml通过在文件中包含以下依赖项,向应用程序添加 Kafka 支持:
org.springframework.kafka
spring-kafka
com.fasterxml.jackson.datatype
jackson-datatype-jsr310
父 Spring Boot BOM 涵盖了依赖项,因此您无需显式添加版本号。
定义数据模型
首先创建一个新的 Java 包:com.example.application.model.
在这个新创建的包中,创建一个新的 Java 类 ,Message.java来表示您将通过 Kafka 发送的消息。然后,将以下内容添加到类中:
package com.example.application.model;
import java.time.Instant;
import dev.hilla.Nonnull;
public class Message {
private @Nonnull String text;
private Instant time;
private @Nonnull String userName;
public String getText() {
return text;
public void setText(String text) {
this.text = text;
public Instant getTime() {
return time;
public void setTime(Instant time) {
this.time = time;
public String getUserName() {
return userName;
public void setUserName(String userName) {
this.userName = userName;
Hilla 框架使用@Nonnull注解来指导 TypeScript 类型生成:它们对 Java 行为没有影响。
使用 Kafka 发送自定义对象
在本教程中,您将发送 Java 对象作为消息,而不是使用像字符串或数字这样的原始方法。为此,您需要创建自定义序列化器和反序列化器。
在同一个包中,创建以下两个新类,MessageSerializer.java并MessageDeserializer.java具有以下内容:
package com.example.application.model;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
public class MessageSerializer implements Serializer {
public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();
@Override
public byte[] serialize(String topic, Message message) {
try {
return mapper.writeValueAsBytes(message);
} catch (JsonProcessingException e) {
throw new SerializationException(e);
package com.example.application.model;
import java.io.IOException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
public class MessageDeSerializer implements Deserializer {
public static final ObjectMapper mapper = JsonMapper.builder()
.findAndAddModules()
.build();
@Override
public Message deserialize(String topic, byte[] data) {
try {
return mapper.readValue(data, Message.class);
} catch (IOException e) {
throw new SerializationException(e);
序列化器和反序列化器使用 Jackson 将对象与 JSON 进行转换。findAndAddModules()builder 方法允许 Jackson 通过您添加的依赖项支持 JSR310 数据类型。
配置Kafka
接下来,通过将以下内容添加到src/main/resources/application.properties文件中来配置 Kafka:
# A custom property to hold the name of our Kafka topic:
topic.name=chat
# Set up Kafka:
spring.kafka.bootstrap-servers=localhost:9092
# Configure the consumer:
spring.kafka.consumer.client-id=chat-consumer
spring.kafka.consumer.group-id=chat-group
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.application.model.MessageDeSerializer
# Configure the producer:
spring.kafka.producer.client-id=chat-producer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.application.model.MessageSerializer
更新Application.java以编程方式配置主题。
package com.example.application;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
import com.vaadin.flow.component.page.AppShellConfigurator;
import com.vaadin.flow.server.PWA;
import com.vaadin.flow.theme.Theme;
* The entry point of the Spring Boot application.
* Use the @PWA annotation to make the application installable on phones, tablets, and some desktop
* browsers.
@SpringBootApplication
@Theme(value = "hilla-kafka")\
@PWA(name = "hilla-kafka", shortName = "hilla-kafka", offlineResources = {})
@Configuration
public class Application implements AppShellConfigurator {
@Value("${topic.name}")
private String topicName;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
@Bean
NewTopic chatTopic() {
return TopicBuilder
.name(topicName)
.partitions(1)
.replicas(1)
.build();
以下是解释的基本部分:
通过 Spring 注入主题名称。
使用 TopicBuilder bean 配置来定义和配置新主题。在这个示例应用程序中,您只设置了一个分区和一个副本。在实际应用中,您会希望设置更多的分区和副本,以确保集群运行良好且可靠。
创建服务器端点
您现在已准备好开始使用 Kafka。接下来,创建将与 Kafka 代理和客户端 Web 应用程序通信的服务器端点。
MessageEndpoint.java,在包中创建一个新的 Java 文件com.example.application并将以下代码添加到其中:
package com.example.application;
import java.time.Instant;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import com.example.application.model.Message;
import com.vaadin.flow.server.auth.AnonymousAllowed;
import dev.hilla.Endpoint;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.Sinks.EmitResult;
import reactor.core.publisher.Sinks.Many;
@Endpoint
@AnonymousAllowed
public class MessageEndpoint {
@Value("${topic.name}")
private String topicName;
private final Many chatSink;
private final Flux chat;
private final KafkaTemplate kafkaTemplate;
MessageEndpoint(KafkaTemplate kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
chatSink = Sinks.many().multicast().directBestEffort();
chat = chatSink.asFlux();
public Flux join() {
return chat;
public void send(Message message) {
message.setTime(Instant.now());
kafkaTemplate.send(topicName, message);
@KafkaListener(topics = "chat", groupId = "chat-group")
private void consumer(Message message) {
chatSink.emitNext(message,
(signalType, emitResult) -> emitResult == EmitResult.FAIL_NON_SERIALIZED);
以下是解释的基本部分:
@Endpoint 注释告诉 Hilla 将所有公共方法作为 TypeScript 方法提供给客户端。@AnonymousAllowed 关闭此端点的身份验证。
chatSink 是一种将数据传递给系统的编程方式。它发出消息,以便任何订阅了相关聊天 Flux 的客户端都会收到它们。
构造函数获取 Spring 注入的 KafkaTemplate 并将其保存到字段中。
join() 方法返回您将在客户端订阅的聊天 Flux。
send() 方法接收一条消息,用发送时间标记它,然后使用 kafkaTemplate 发送它。
consumer() 方法有一个 @KafkaListener 注释,它告诉 Spring Kafka 在传入消息上运行此方法。该方法将接收到的消息发送到 chatSink,它会通知所有订阅了聊天 Flux 的客户端。
启用反应式端点
在编写本教程 (1.2) 时,Hilla 的当前版本通过功能标志支持 Flux 端点方法。通过创建一个src/main/resources/vaadin-featureflags.properties,包含以下内容的新文件来启用该功能:
# Push support in Hilla
com.vaadin.experimental.hillaPush=true
创建用于发送和接收消息的视图
现在您已经配置了 Kafka 并设置了服务器以发送和接收消息,最后一步是创建一个可用于发送和接收消息的 Web 视图。
Hilla 包括 Vaadin 组件集,其中包含 40 多个组件。您可以使用和组件来构建主聊天 UI。您还可以使用该组件来捕获当前用户的姓名。
Hilla 使用 Lit 创建视图。Lit 在概念上类似于 React:组件由状态和模板组成。每当状态发生变化时,模板都会重新呈现。
首先重命名生成的占位符视图。frontend/views/empty/empty-view.ts将文件夹和文件重命名为frontend/views/messages/messages-view.ts. 用以下代码替换文件的内容:
import { View } from "Frontend/views/view";
import { customElement, state } from "lit/decorators.js";
import { html } from "lit";
import "@vaadin/message-list";
import "@vaadin/message-input";
import "@vaadin/text-field";
import { TextFieldChangeEvent } from "@vaadin/text-field";
import { MessageEndpoint } from "Frontend/generated/endpoints";
import Message from "Frontend/generated/com/example/application/model/Message";
@customElement("messages-view")
export class MessagesView extends View {
@state() messages: Message[] = [];
@state() userName = "";
render() {
return html`
Kafka message center
class="flex-grow"
.items=${this.messages}
placeholder="Your name"
@change=${this.userNameChange}
class="flex-grow"
@submit=${this.submit}
userNameChange(e: TextFieldChangeEvent) {
this.userName = e.target.value;
async submit(e: CustomEvent) {
MessageEndpoint.send({
text: e.detail.value,
userName: this.userName,
connectedCallback() {
super.connectedCallback();
this.classList.add("flex", "flex-col", "h-full", "box-border");
MessageEndpoint.join().onNext(
(message) => (this.messages = [...this.messages, message])
以下是解释的基本部分:
Lit 跟踪 @state() 修饰的属性,并且只要它们发生变化,模板就会重新渲染。
Message 数据类型由 Hilla 根据您在服务器上创建的 Java 对象生成。
消息列表通过 .items=${this.messages} 绑定到消息列表组件。items 前面的句点告诉 Lit 将数组作为属性而不是属性传递。
每当使用 @change=${this.userNameChange} 更改值时,文本字段都会调用 userNameChange 方法(@ 表示事件侦听器)。
消息输入组件在提交时调用 MessageEndpoint.send()。请注意,您正在调用 TypeScript 方法。Hilla 负责调用服务器上的底层 Java 方法。
最后,在 connectedCallback 中调用 MessageEndpoint.join() 开始接收传入的聊天消息。
除了 Vaadin 组件之外,您还使用Hilla CSS 实用程序类进行基本布局(flex、flex-grow、flex-col)。
最后,更新路由以匹配视图的新名称。将 的内容替换为routes.ts以下内容:
import { Route } from "@vaadin/router";
import "./views/messages/messages-view";
export const routes: Route[] = [{ path: "", component: "messages-view" }];
运行已完成的应用程序
如果您的应用程序仍在运行,请重新启动它。服务器启动后,您可以通过 http://localhost:8080 访问应用程序。尝试在多个浏览器中打开应用程序,以查看所有浏览器实时显示的消息。
./mvnw
带有显示一条消息“Hello Kafka”的应用程序的浏览器窗口。 在窗口的底部,有两个输入,一个用于名称,一个用于消息,以及一个用于发送消息的按钮。
特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。
Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.