网易首页 > 网易号 > 正文 申请入驻

大数据培训Flink 流怎么来处理 API

0
分享至

5.1 Environment

5.1.1 getExecutionEnvironment

创建一个执行环境,表示当前执行程序的上下文。 如果程序是独立调用的,则此方法返回本地执行环境;如果从命令行客户端调用程序以提交到集群,则此方法返回此集群的执行环境,也就是说,getExecutionEnvironment 会根据查询运行的方式决定返回什么样的运行环境,是最常用的一种创建执行环境的方式。

val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment

val env = StreamExecutionEnvironment.getExecutionEnvironment

如果没有设置并行度,会以 flink-conf.yaml 中的配置为准,默认是 1。

5.1.2 createLocalEnvironment

返回本地执行环境,需要在调用时指定默认的并行度。

val env = StreamExecutionEnvironment.createLocalEnvironment(1)

5.1.3 createRemoteEnvironment

返回集群执行环境,将 Jar 提交到远程服务器。需要在调用时指定 JobManager的 IP 和端口号,并指定要在集群中运行的 Jar 包。

val env = ExecutionEnvironment.createRemoteEnvironment("jobmanage-hostname", 6123,"YOURPATH//wordcount.jar")

5.2 Source

5.2.1 从集合读取数据

// 定义样例类,传感器 id,时间戳,温度

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object Sensor {

def main(args: Array[String]): Unit = {

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream1 = env

.fromCollection(List(

SensorReading("sensor_1", 1547718199, 35.8),

SensorReading("sensor_6", 1547718201, 15.4),

SensorReading("sensor_7", 1547718202, 6.7),

SensorReading("sensor_10", 1547718205, 38.1)

stream1.print("stream1:").setParallelism(1)

env.execute()

5.2.2 从文件读取数据

val stream2 = env.readTextFile("YOUR_FILE_PATH")

5.2.3 以 kafka 消息队列的数据作为来源

需要引入 kafka 连接器的依赖:

pom.xml

org.apache.flink

flink-connector-kafka-0.11_2.12

1.10.1

更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网www.atguigu.com

具体代码如下:

val properties = new Properties()

properties.setProperty("bootstrap.servers", "localhost:9092")

properties.setProperty("group.id", "consumer-group")

properties.setProperty("key.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer")

properties.setProperty("value.deserializer",

"org.apache.kafka.common.serialization.StringDeserializer")

properties.setProperty("auto.offset.reset", "latest")

val stream3 = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new

SimpleStringSchema(), properties))

5.2.4 自定义 Source

除了以上的 source 数据来源,我们还可以自定义 source。需要做的,只是传入一个 SourceFunction 就可以。具体调用如下:

val stream4 = env.addSource( new MySensorSource() )

我们希望可以随机生成传感器数据,MySensorSource 具体的代码实现如下:

class MySensorSource extends SourceFunction[SensorReading]{

// flag: 表示数据源是否还在正常运行

var running: Boolean = true

override def cancel(): Unit = {

running = false

override def run(ctx: SourceFunction.SourceContext[SensorReading]): Unit

// 初始化一个随机数发生器

val rand = new Random()

var curTemp = 1.to(10).map(

i => ( "sensor_" + i, 65 + rand.nextGaussian() * 20 )

while(running){

// 更新温度值

curTemp = curTemp.map(

t => (t._1, t._2 + rand.nextGaussian() )

// 获取当前时间戳

val curTime = System.currentTimeMillis()

curTemp.foreach(

t => ctx.collect(SensorReading(t._1, curTime, t._2))

Thread.sleep(100)

5.3 Transform

转换算子

5.3.1 map

val streamMap = stream.map { x => x * 2 }

5.3.2 flatMap

flatMap 的函数签名:def flatMap[A,B](as: List[A])(f: A ⇒ List[B]): List[B]

例如: flatMap(List(1,2,3))(i ⇒ List(i,i))

结果是 List(1,1,2,2,3,3), 而 List("a b", "c d").flatMap(line ⇒ line.split(" "))

结果是 List(a, b, c, d)。

val streamFlatMap = stream.flatMap{

x => x.split(" ")

5.3.3 Filter

val streamFilter = stream.filter{

x => x == 1

5.3.4 KeyBy

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。

5.3.5 滚动聚合算子(Rolling Aggregation)

这些算子可以针对 KeyedStream 的每一个支流做聚合。

 sum()

 min()

 max()

 minBy()

 maxBy()

5.3.6 Reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

val stream2 = env.readTextFile("YOUR_PATH\\sensor.txt")

.map( data => {

val dataArray = data.split(",")

SensorReading(dataArray(0).trim, dataArray(1).trim.toLong,

dataArray(2).trim.toDouble)

.keyBy("id")

.reduce( (x, y) => SensorReading(x.id, x.timestamp + 1, y.temperature) )

更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网www.atguigu.com

5.3.7 Split 和 Select

Split

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。

Select

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个DataStream。

需求:传感器数据按照温度高低(以 30 度为界),拆分成两个流。

val splitStream = stream2

.split( sensorData => {

if (sensorData.temperature > 30) Seq("high") else Seq("low")

val high = splitStream.select("high")

val low = splitStream.select("low")

val all = splitStream.select("high", "low")

5.3.8 Connect 和 CoMap

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

CoMap,CoFlatMap

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。

val warning = high.map( sensorData => (sensorData.id,

sensorData.temperature) )

val connected = warning.connect(low)

val coMap = connected.map(

warningData => (warningData._1, warningData._2, "warning"),

lowData => (lowData.id, "healthy")

5.3.9 Union

DataStream → DataStream:对两个或者两个以上的 DataStream 进行 union 操作,产生一个包含所有 DataStream 元素的新 DataStream。

//合并以后打印

val unionStream: DataStream[StartUpLog] = appStoreStream.union(otherStream)

unionStream.print("union:::")

Connect 与 Union 区别

1. Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中再去调整成为一样的。

2. Connect 只能操作两个流,Union 可以操作多个。

5.4 支持的数据类型

Flink 流应用程序处理的是以数据对象表示的事件流。所以在 Flink 内部,我们需要能够处理这些对象。它们需要被序列化和反序列化,以便通过网络传送它们;

或者从状态后端、检查点和保存点读取它们。为了有效地做到这一点,Flink 需要明确知道应用程序所处理的数据类型。Flink 使用类型信息的概念来表示数据类型,并为每个数据类型生成特定的序列化器、反序列化器和比较器。

Flink 还具有一个类型提取系统,该系统分析函数的输入和返回类型,以自动获取类型信息,从而获得序列化器和反序列化器。但是,在某些情况下,例如 lambda函数或泛型类型,需要显式地提供类型信息,才能使应用程序正常工作或提高其性能。

Flink 支持 Java 和 Scala 中所有常见数据类型。使用最广泛的类型有以下几种。

5.4.1 基础数据类型

Flink 支持所有的 Java 和 Scala 基础数据类型,Int, Double, Long, String, …

val numbers: DataStream[Long] = env.fromElements(1L, 2L, 3L, 4L)

numbers.map( n => n + 1 )

5.4.2 Java 和 Scala 元组(Tuples)

val persons: DataStream[(String, Integer)] = env.fromElements(

("Adam", 17),

("Sarah", 23) )

persons.filter(p => p._2 > 18)

5.4.3 Scala 样例类(case classes)

case class Person(name:String, age: Int)

val persons: DataStream[Person] = env.fromElements(

Person("Adam", 17),

Person("Sarah", 23) )

persons.filter(p => p.age > 18)

5.4.4 Java 简单对象(POJOs)

public class Person {

public String name;

public int age;

public Person() {}

public Person(String name, int age) {

this.name = name;

this.age = age;

DataStream

persons = env.fromElements(

new Person("Alex", 42),

new Person("Wendy", 23));

更多 Java –大数据 –前端 –python 人工智能资料下载,可百度访问:尚硅谷官网www.atguigu.com

5.4.5 其它(Arrays, Lists, Maps, Enums, 等等)

Flink 对 Java 和 Scala 中的一些特殊目的的类型也都是支持的,比如 Java 的ArrayList,HashMap,Enum 等等。

5.5 实现 UDF 函数——更细粒度的控制流

5.5.1 函数类(Function Classes)

Flink 暴露了所有 udf 函数的接口(实现方式为接口或者抽象类)。例如

MapFunction, FilterFunction, ProcessFunction 等等。

下面例子实现了 FilterFunction 接口:

class FilterFilter extends FilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains("flink")

val flinkTweets = tweets.filter(new FlinkFilter)

还可以将函数实现成匿名类

val flinkTweets = tweets.filter(

new RichFilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains("flink")

我们 filter 的字符串"flink"还可以当作参数传进去。

val tweets: DataStream[String] = ...

val flinkTweets = tweets.filter(new KeywordFilter("flink"))

class KeywordFilter(keyWord: String) extends FilterFunction[String] {

override def filter(value: String): Boolean = {

value.contains(keyWord)

5.5.2 匿名函数(Lambda Functions)

val tweets: DataStream[String] = ...

val flinkTweets = tweets.filter(_.contains("flink"))

5.5.3 富函数(Rich Functions)

“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。

 RichMapFunction

 RichFlatMapFunction

 RichFilterFunction

 …

Rich Function 有一个生命周期的概念。典型的生命周期方法有:

 open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。

 close()方法是生命周期中的最后一个调用的方法,做一些清理工作。

 getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态

class MyFlatMap extends RichFlatMapFunction[Int, (Int, Int)] {

var subTaskIndex = 0

override def open(configuration: Configuration): Unit = {

subTaskIndex = getRuntimeContext.getIndexOfThisSubtask

// 以下可以做一些初始化工作,例如建立一个和 HDFS 的连接

override def flatMap(in: Int, out: Collector[(Int, Int)]): Unit = {

if (in % 2 == subTaskIndex) {

out.collect((subTaskIndex, in))

override def close(): Unit = {

// 以下做一些清理工作,例如断开和 HDFS 的连接。

5.6 Sink

Flink 没有类似于 spark 中 foreach 方法,让用户进行迭代的操作。虽有对外的输出操作都要利用 Sink 完成。最后通过类似如下方式完成整个任务最终输出操作。

stream.addSink(new MySink(xxxx))

官方提供了一部分的框架的 sink。除此以外,需要用户自定义实现 sink。

5.6.1 Kafka

pom.xml

org.apache.flink

flink-connector-kafka-0.11_2.12

1.10.1

主函数中添加 sink:

val union = high.union(low).map(_.temperature.toString)

union.addSink(new FlinkKafkaProducer011[String]("localhost:9092",

"test", new SimpleStringSchema()))

5.6.2 Redis

pom.xml

org.apache.bahir

flink-connector-redis_2.11

1.0

定义一个 redis 的 mapper 类,用于定义保存到 redis 时调用的命令:

class MyRedisMapper extends RedisMapper[SensorReading]{

override def getCommandDescription: RedisCommandDescription = {

new RedisCommandDescription(RedisCommand.HSET, "sensor_temperature")

override def getValueFromData(t: SensorReading): String =

t.temperature.toString

override def getKeyFromData(t: SensorReading): String = t.id

在主函数中调用:

val conf = new

FlinkJedisPoolConfig.Builder().setHost("localhost").setPort(6379).build()

dataStream.addSink( new RedisSink[SensorReading](conf, new MyRedisMapper) )

5.6.3 Elasticsearch

pom.xml

org.apache.flink

flink-connector-elasticsearch6_2.12

1.10.1

在主函数中调用:

val httpHosts = new util.ArrayList[HttpHost]()

httpHosts.add(new HttpHost("localhost", 9200))

val esSinkBuilder = new ElasticsearchSink.Builder[SensorReading]( httpHosts,

new ElasticsearchSinkFunction[SensorReading] {

override def process(t: SensorReading, runtimeContext: RuntimeContext,

requestIndexer: RequestIndexer): Unit = {

println("saving data: " + t)

val json = new util.HashMap[String, String]()

json.put("data", t.toString)

val indexRequest =

Requests.indexRequest().index("sensor").`type`("readingData").source(json)

requestIndexer.add(indexRequest)

println("saved successfully")

dataStream.addSink( esSinkBuilder.build() )

5.6.4 JDBC 自定义 sink

mysql

mysql-connector-java

5.1.44

添加 MyJdbcSink

class MyJdbcSink() extends RichSinkFunction[SensorReading]{

var conn: Connection = _

var insertStmt: PreparedStatement = _

var updateStmt: PreparedStatement = _

// open 主要是创建连接

override def open(parameters: Configuration): Unit = {

super.open(parameters)

conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/test",

"root", "123456")

insertStmt = conn.prepareStatement("INSERT INTO temperatures (sensor,

temp) VALUES (?, ?)")

updateStmt = conn.prepareStatement("UPDATE temperatures SET temp = ? WHERE

sensor = ?")

// 调用连接,执行 sql

override def invoke(value: SensorReading, context:

SinkFunction.Context[_]): Unit = {

updateStmt.setDouble(1, value.temperature)

updateStmt.setString(2, value.id)

updateStmt.execute()

if (updateStmt.getUpdateCount == 0) {

insertStmt.setString(1, value.id)

insertStmt.setDouble(2, value.temperature)

insertStmt.execute()

override def close(): Unit = {

insertStmt.close()

updateStmt.close()

conn.close()

在 main 方法中增加,把明细保存到 mysql 中

dataStream.addSink(new MyJdbcSink() )

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
以军遭遇单日最大伤亡,指挥官丧生,内塔尼亚胡说“心都碎了”

以军遭遇单日最大伤亡,指挥官丧生,内塔尼亚胡说“心都碎了”

笔墨V
2024-06-16 17:38:32
归化专家:目前全球活跃在各级联赛U10队伍以上华裔球员近200位

归化专家:目前全球活跃在各级联赛U10队伍以上华裔球员近200位

直播吧
2024-06-16 16:41:13
36岁刘亦菲近况曝光,全网羡慕:不恋爱、不合群,养了60只猫!

36岁刘亦菲近况曝光,全网羡慕:不恋爱、不合群,养了60只猫!

谈娱新语
2024-06-14 22:11:23
特斯拉“刷单”风波愈演愈烈!朱晓彤全国门店巡查,严查假订单

特斯拉“刷单”风波愈演愈烈!朱晓彤全国门店巡查,严查假订单

小南看车
2024-06-14 16:13:34
妻子陪初恋男友法国游玩,下飞机才通知丈夫,返回后直接愣在机场

妻子陪初恋男友法国游玩,下飞机才通知丈夫,返回后直接愣在机场

局内人
2024-05-15 14:12:33
38岁已婚女与37岁情人,在石凳子上发生关系,温存后被残忍杀害

38岁已婚女与37岁情人,在石凳子上发生关系,温存后被残忍杀害

胖胖侃咖
2024-06-08 08:00:08
夏洛特公主真的应该好好培养,这气场真绝了!

夏洛特公主真的应该好好培养,这气场真绝了!

腊月燥火
2024-06-16 14:17:49
唉!又有一家大企业成功“结业”了!

唉!又有一家大企业成功“结业”了!

翻开历史和现实
2024-06-10 18:54:33
西方国家为何都讨厌中国?布热津斯基:中国150年的耻辱迟早洗刷

西方国家为何都讨厌中国?布热津斯基:中国150年的耻辱迟早洗刷

知鉴明史
2024-06-15 17:34:41
朱立伦号召蓝粉包围立院!王鸿薇:让支持蓝营的声音响彻云霄!

朱立伦号召蓝粉包围立院!王鸿薇:让支持蓝营的声音响彻云霄!

牛锅巴小钒
2024-06-16 22:30:55
哈特谈解说:我不像布伦森赚那么多钱 所以休赛期还得继续工作

哈特谈解说:我不像布伦森赚那么多钱 所以休赛期还得继续工作

直播吧
2024-06-16 17:34:09
“馒头脸”的风吹进央视:钱花了事业毁了,王珞丹素颜出镜赢麻了

“馒头脸”的风吹进央视:钱花了事业毁了,王珞丹素颜出镜赢麻了

谈娱新语
2024-06-15 22:07:57
美国前商务部长:拜登政府对中国的政策就像“精神分裂症”,一面派耶伦、布林肯来谈判,另一面又不断言语攻击中国

美国前商务部长:拜登政府对中国的政策就像“精神分裂症”,一面派耶伦、布林肯来谈判,另一面又不断言语攻击中国

国际在线
2024-06-16 17:33:11
降雨要来了!江苏入梅,有消息了!

降雨要来了!江苏入梅,有消息了!

东太湖七都在线
2024-06-16 20:31:57
新能源汽车全世界都不“玩”了,就中国在“玩”,事实是什么?

新能源汽车全世界都不“玩”了,就中国在“玩”,事实是什么?

创作者朱海平
2024-06-15 16:44:19
IOC提议设立电竞运动会,电竞或将纳入奥林匹克大家庭

IOC提议设立电竞运动会,电竞或将纳入奥林匹克大家庭

saralolita
2024-06-15 22:35:14
曹操墓考古发现:曹操身高1.56米,牙周炎严重,带有强烈口臭

曹操墓考古发现:曹操身高1.56米,牙周炎严重,带有强烈口臭

蜉蝣说
2024-06-03 22:03:43
从15万降到了7.66万,堪称合资价格屠夫,喝92油,一公里才4毛钱

从15万降到了7.66万,堪称合资价格屠夫,喝92油,一公里才4毛钱

看看娱乐与体育
2024-06-16 23:45:04
大家都在关注第十二名的姜萍,却忽略了第一名的徐啸宇

大家都在关注第十二名的姜萍,却忽略了第一名的徐啸宇

小李子体育
2024-06-16 17:53:21
她曾是安徽省省长,后出任全国政协副主席,如今70岁仍在奋斗!

她曾是安徽省省长,后出任全国政协副主席,如今70岁仍在奋斗!

历史龙元阁
2024-06-16 15:35:31
2024-06-17 00:42:44
IT爱好者小尚
IT爱好者小尚
分享IT教育类信息
630文章数 55关注度
往期回顾 全部

科技要闻

iPhone 16会杀死大模型APP吗?

头条要闻

欧洲猪肉业界:中国若限制进口将是梦魇

头条要闻

欧洲猪肉业界:中国若限制进口将是梦魇

体育要闻

没人永远年轻 但青春如此无敌还是离谱了些

娱乐要闻

上影节红毯:倪妮好松弛,娜扎吸睛

财经要闻

打断妻子多根肋骨 上市公司创始人被公诉

汽车要闻

售17.68万-21.68万元 极狐阿尔法S5正式上市

态度原创

房产
教育
时尚
本地
手机

房产要闻

万华对面!海口今年首宗超百亩宅地,重磅挂出!

教育要闻

高考志愿填报,别再踩这些“雷”!

伊姐周日热推:电影《沙漏》;动漫《眷思量2》......

本地新闻

粽情一夏|海河龙舟赛,竟然成了外国人的大party!

手机要闻

荣耀X60i入网:配置全面升级,能否满足你的所有期待?

无障碍浏览 进入关怀版