网易首页 > 网易号 > 正文 申请入驻

如何将Pulsar数据快速且无缝接入Apache Doris

0
分享至

导读:Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢?本次分享将介绍利用 KoP 如何将 Pulsar 数据快速且无缝接入 Apache Doris。

KoP 架构介绍

KoP 是 Kafka on Pulsar 的简写,顾名思义就是如何在 Pulsar 上实现对 Kafka 数据的读写。KoP 将 Kafka 协议处理插件引入 Pulsar Broker 来实现 Apache Pulsar 对 Apache Kafka 协议的支持。将 KoP 协议处理插件添加到现有 Pulsar 集群后,用户不用修改代码就可以将现有的 Kafka 应用程序和服务迁移到 Pulsar。

Apache Pulsar 主要特点如下:

  • 利用企业级多租户特性简化运营。

  • 避免数据搬迁,简化操作。

  • 利用 Apache BookKeeper 和分层存储持久保留事件流。

  • 利用 Pulsar Functions 进行无服务器化事件处理。

KoP 架构如下图,通过图可以看到 KoP 引入一个新的协议处理插件,该协议处理插件利用 Pulsar 的现有组件(例如 Topic 发现、分布式日志库-ManagedLedger、cursor 等)来实现 Kafka 传输协议。

Routine Load 订阅 Pulsar 数据思路

Apache Doris Routine Load 支持了将 Kafka 数据接入 Apache Doris,并保障了数据接入过程中的事务性操作。Apache Pulsar 定位为一个云原生时代企业级的消息发布和订阅系统,已经在很多线上服务使用。那么 Apache Pulsar 用户如何将数据接入 Apache Doris 呢,答案是通过 KoP 实现。

由于 KoP 直接在 Pulsar 侧提供了对 Kafka 的兼容,那么对于 Apache Doris 来说可以像使用 Kafka 一样使用 Plusar。整个过程对于 Apache Doris 来说无需任务改变,就能将 Pulsar 数据接入 Apache Doris,并且可以获得 Routine Load 的事务性保障。



| Apache Doris |
| | Routine Load | |

|Kafka Protocol(librdkafka)
------------v--------------
| | KoP | |
| Apache Pulsar |

操作实践

Pulsar Standalone 安装环境准备:

  1. JDK 安装:略

  2. 下载 Pulsar 二进制包,并解压:


#下载
wget https://archive.apache.org/dist/pulsar/pulsar-2.10.0/apache-pulsar-2.10.0-bin.tar.gz
#解压并进入安装目录
tar xvfz apache-pulsar-2.10.0-bin.tar.gz
cd apache-pulsar-2.10.0

组件编译和安装

1. 下载 KoP 源码


git clone https://github.com/streamnative/kop.git
cd kop

2. 编译 KoP 项目


mvn clean install -DskipTests

3. protocols 配置:在解压后的 apache-pulsar 目录下创建 protocols文 件夹,并把编译好的 nar 包复制到 protocols 文件夹中。


mkdir apache-pulsar-2.10.0/protocols
# mv kop/kafka-impl/target/pulsar-protocol-handler-kafka-{{protocol:version}}.nar apache-pulsar-2.10.0/protocols
cp kop/kafka-impl/target/pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar apache-pulsar-2.10.0/protocols

4. 添加后的结果查看


[root@17a5da45700b apache-pulsar-2.10.0]# ls protocols/
pulsar-protocol-handler-kafka-2.11.0-SNAPSHOT.nar

KoP 配置添加

1. 在 standalone.conf 或者 broker.conf 添加如下配置


#kop适配的协议
messagingProtocols=kafka
#kop 的NAR文件路径
protocolHandlerDirectory=./protocols
#是否允许自动创建topic
allowAutoTopicCreationType=partitioned

2. 添加如下服务监听配置


# Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
kafkaListeners=PLAINTEXT://127.0.0.1:9092# This config is not required unless you want to expose another address to the Kafka client.
# If it’s not configured, it will be the same with `kafkaListeners` config by default
kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
brokerDeleteInactiveTopicsEnabled=false

当出现如下错误:


java.lang.IllegalArgumentException: Broker has disabled transaction coordinator, please enable it before using transaction.

添加如下配置,开启 transactionCoordinatorEnabled


kafkaTransactionCoordinatorEnabled=true
transactionCoordinatorEnabled=true

Pulsar 启动


#前台启动
#bin/pulsar standalone
#后台启动
pulsar-daemon start standalone

创建 Doris 数据库和建表


#进入Doris
mysql -u root -h 127.0.0.1 -P 9030
# 创建数据库
create database pulsar_doris;
#切换数据库
use pulsar_doris;
#创建clicklog表
CREATE TABLE IF NOT EXISTS pulsar_doris.clicklog
`clickTime` DATETIME NOT NULL COMMENT "点击时间",
`type` String NOT NULL COMMENT "点击类型",
`id` VARCHAR(100) COMMENT "唯一id",
`user` VARCHAR(100) COMMENT "用户名称",
`city` VARCHAR(50) COMMENT "所在城市"
DUPLICATE KEY(`clickTime`, `type`)
DISTRIBUTED BY HASH(`type`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"

创建Routine Load 任务


CREATE ROUTINE LOAD pulsar_doris.load_from_pulsar_test ON clicklog
COLUMNS(clickTime,id,type,user)
PROPERTIES
"desired_concurrent_number"="3",
"max_batch_interval" = "20",
"max_batch_rows" = "300000",
"max_batch_size" = "209715200",
"strict_mode" = "false",
"format" = "json"
FROM KAFKA
"kafka_broker_list" = "127.0.0.1:9092",
"kafka_topic" = "test",
"property.group.id" = "doris"

上述命令中的参数解释如下:

  • pulsar_doris :Routine Load 任务所在的数据库

  • load_from_pulsar_test:Routine Load 任务名称

  • clicklog:Routine Load 任务的目标表,也就是配置 Routine Load 任务将数据导入到 Doris 哪个表中。

  • strict_mode:导入是否为严格模式,这里设置为 False。

  • format:导入数据的类型,这里配置为 Json。

  • kafka_broker_list:Kafka Broker 服务的地址

  • kafka_broker_list:Kafka Topic 名称,也就是同步哪个 Topic 上的数据。

  • property.group.id:消费组 ID

数据导入和测试

1. 数据导入

构造一个 ClickLog 的数据结构,并调用 Kafka 的 Producer 发送 5000 万条数据到 Pulsar。

ClickLog 数据结构如下:


public class ClickLog {
private String id;
private String user;
private String city;
private String clickTime;
private String type;
... //省略getter和setter

消息构造和发送的核心代码逻辑如下:


String strDateFormat = "yyyy-MM-dd HH:mm:ss";
@Autowired
private Producer producer;
try {
for(int j =0 ; j<50000;j++){
int batchSize = 1000;
for(int i = 0 ; i ClickLog clickLog = new ClickLog();
clickLog.setId(UUID.randomUUID().toString());
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(strDateFormat);
clickLog.setClickTime(simpleDateFormat.format(new Date()));
clickLog.setType("webset");
clickLog.setUser("user"+ new Random().nextInt(1000) +i);
producer.sendMessage(Constant.topicName, JSONObject.toJSONString(clickLog));

} catch (Exception e) {
e.printStackTrace();

2. ROUTINE LOAD 任务查看执行 SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;命令,查看导入任务的状态。


mysql> SHOW ALL ROUTINE LOAD FOR load_from_pulsar_test \G;
*************************** 1. row ***************************
Id: 87873
Name: load_from_pulsar_test
CreateTime: 2022-05-31 12:03:34
PauseTime: NULL
EndTime: NULL
DbName: default_cluster:pulsar_doris
TableName: clicklog1
State: RUNNING
DataSourceType: KAFKA
CurrentTaskNum: 1
JobProperties: {"partitions":"*","columnToColumnExpr":"clickTime,id,type,user","maxBatchIntervalS":"20","whereExpr":"*","dataFormat":"json","timezone":"Europe/London","send_batch_parallelism":"1","precedingFilter":"*","mergeType":"APPEND","format":"json","json_root":"","maxBatchSizeBytes":"209715200","exec_mem_limit":"2147483648","strict_mode":"false","jsonpaths":"","deleteCondition":"*","desireTaskConcurrentNum":"3","maxErrorNum":"0","strip_outer_array":"false","currentTaskConcurrentNum":"1","execMemLimit":"2147483648","num_as_string":"false","fuzzy_parse":"false","maxBatchRows":"300000"}
DataSourceProperties: {"topic":"test","currentKafkaPartitions":"0","brokerList":"127.0.0.1:9092"}
CustomProperties: {"group.id":"doris","kafka_default_offsets":"OFFSET_END","client.id":"doris.client"}
Statistic: {"receivedBytes":5739001913,"runningTxns":[],"errorRows":0,"committedTaskNum":168,"loadedRows":50000000,"loadRowsRate":23000,"abortedTaskNum":1,"errorRowsAfterResumed":0,"totalRows":50000000,"unselectedRows":0,"receivedBytesRate":2675000,"taskExecuteTimeMs":2144799}
Progress: {"0":"51139566"}
Lag: {"0":0}
ReasonOfStateChanged:
ErrorLogUrls:
OtherMsg:
1 row in set (0.00 sec)
ERROR:
No query specified

从上面结果可以看到 totalRows 为 50000000,errorRows 为 0。说明数据不丢不重的导入 Apache Doris 了。

3. 数据统计验证执行如下命令统计表中的数据,发现统计的结果也是 50000000,符合预期。


mysql> select count(*) from clicklog;
| count(*) |
| 50000000 |
1 row in set (3.73 sec)
mysql>

通过 KoP 我们实现了将 Apache Pulsar 数据无缝接入 Apache Doris ,无需对 Routine Load 任务进行任何修改,并保障了数据导入过程中的事务性。与此同时,Apache Doris 社区已经启动了 Apache Pulsar 原生导入支持的设计,相信在不久后就可以直接订阅 Pulsar 中的消息数据,并保证数据导入过程中的 Exactly-Once 语义。

SelectDB 官方网站:

https://selectdb.com

Apache Doris 官方网站:

http://doris.apache.org

Apache Doris Github:

https://github.com/apache/doris

Apache Doris 开发者邮件组:

dev@doris.apache.org

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
0-3惨败?被日本女排双杀?中国女排二队,已经退无可退

0-3惨败?被日本女排双杀?中国女排二队,已经退无可退

体育就你秀
2024-06-20 15:18:00
沪指险守3000点,悬着的心还在悬着

沪指险守3000点,悬着的心还在悬着

每经牛眼
2024-06-20 15:49:04
凌晨3点,商贩与执法城管起争执,西瓜被砸碎一地!当地回应

凌晨3点,商贩与执法城管起争执,西瓜被砸碎一地!当地回应

鲁中晨报
2024-06-20 08:39:07
有人质疑排名第十二的姜萍,为什么没有人质疑排名第一的韦东奕

有人质疑排名第十二的姜萍,为什么没有人质疑排名第一的韦东奕

王朝风云
2024-06-20 07:08:48
有上市公司因拖欠18万税款,被要求补缴滞纳金3500多万?

有上市公司因拖欠18万税款,被要求补缴滞纳金3500多万?

小萝卜丝
2024-06-20 17:34:45
某公益项目5月募捐共19万,给患者1万9,合唱团开销16万,0排练0演出

某公益项目5月募捐共19万,给患者1万9,合唱团开销16万,0排练0演出

可达鸭面面观
2024-06-20 17:36:38
走私 596 颗 CPU、每颗 1.8 万元、被拦截!

走私 596 颗 CPU、每颗 1.8 万元、被拦截!

云头条
2024-06-19 23:59:49
电动车因颜值高4天内被盗3次!警方:3人到手后疯狂驾驶2小时,仍意犹未尽

电动车因颜值高4天内被盗3次!警方:3人到手后疯狂驾驶2小时,仍意犹未尽

每日经济新闻
2024-06-20 10:50:27
中联部部长在海参崴出席“抵制新殖民主义”论坛,让人如鲠在喉

中联部部长在海参崴出席“抵制新殖民主义”论坛,让人如鲠在喉

顾礼先生
2024-06-20 14:53:15
照片作者,被枪毙了!

照片作者,被枪毙了!

人间颂
2024-06-20 13:17:27
下午6点中国女排决战日本,看到大名单球迷怒喷:她上场就关电视

下午6点中国女排决战日本,看到大名单球迷怒喷:她上场就关电视

我就是一个说球的
2024-06-20 12:44:08
网友:苏州昆山房价现在跌成啥样了!继续跌吧,我只看看…

网友:苏州昆山房价现在跌成啥样了!继续跌吧,我只看看…

火山诗话
2024-06-20 09:11:54
放弃社会主义制度的20多个国家,现在的状况都怎么样了

放弃社会主义制度的20多个国家,现在的状况都怎么样了

云舟史策
2024-06-17 19:30:33
热闻|柳州两任市委书记同日被通报,此前为“老搭档”,曾同受处分

热闻|柳州两任市委书记同日被通报,此前为“老搭档”,曾同受处分

齐鲁壹点
2024-06-20 14:13:31
“谈判桌上,不会有她的一席之地”

“谈判桌上,不会有她的一席之地”

观察者网
2024-06-20 15:38:11
多省设立“警税合成作战中心”,背后有何考量?如何打消疑虑

多省设立“警税合成作战中心”,背后有何考量?如何打消疑虑

南方都市报
2024-06-20 15:49:09
复旦毕业生打老师后续:同学曝打人原因,本人发声道歉,评论炸锅

复旦毕业生打老师后续:同学曝打人原因,本人发声道歉,评论炸锅

180°视角
2024-06-20 10:21:17
华为官宣自研AI芯片超越英伟达

华为官宣自研AI芯片超越英伟达

中关村在线
2024-06-19 15:25:16
还是斧头好用,一砸一个洞,菲律宾补给船全军覆没

还是斧头好用,一砸一个洞,菲律宾补给船全军覆没

三叔的装备空间
2024-06-20 11:05:19
网友们又看出了我们的一段痛史

网友们又看出了我们的一段痛史

清晖有墨
2024-06-20 11:29:04
2024-06-20 18:44:49
开源中国
开源中国
每天为开发者推送最新技术资讯
6335文章数 34226关注度
往期回顾 全部

科技要闻

小米SU7流量泼天,富贵却被蔚来接住了

头条要闻

女大学生称按摩时遭男技师扒内裤 警方初步判断是擦边

头条要闻

女大学生称按摩时遭男技师扒内裤 警方初步判断是擦边

体育要闻

绿军的真老大,开始备战下赛季了

娱乐要闻

叶舒华参加柯震东生日聚会,五毒俱全

财经要闻

深圳一网红学位房14万/平跌到4万/平

汽车要闻

售价11.79-14.39万元 新一代哈弗H6正式上市

态度原创

旅游
艺术
家居
健康
公开课

旅游要闻

铁路儿童票新规 已有超4900万小旅客免费出行

艺术要闻

穿越时空的艺术:《马可·波罗》AI沉浸影片探索人类文明

家居要闻

自然开放 实现灵动可变空间

晚餐不吃or吃七分饱,哪种更减肥?

公开课

近视只是视力差?小心并发症

无障碍浏览 进入关怀版