网易首页 > 网易号 > 正文 申请入驻

大数据开发技术之Spark SQL的多种使用方法

0
分享至

Spark SQL支持多种数据源,如JDBC、HDFS、HBase。它的内部组件,如SQL的语法解析器、分析器等支持重定义进行扩展,能更好满足不同的业务场景。与Spark Core无缝集成,提供了DataSet/DataFrame的可编程抽象数据模型,并且可被视为一个分布式的SQL查询引擎。

DataSet/DataFrame

DataSet/DataFrame都是Spark SQL提供的分布式数据集,相对于RDD而言,除了记录数据以外,还记录表的schema信息。

DataFrame是DataSet以命名列方式组织的分布式数据集,类似于RDBMS中的表,或者R和Python中的 data frame。DataFrame API支持Scala、Java、Python、R。在Scala API中,DataFrame变成类型为Row的Dataset:

type DataFrame = Dataset[Row]。

DataFrame在编译期不进行数据中字段的类型检查,在运行期进行检查。但DataSet则与之相反,因为它是强类型的。此外,二者都是使用catalyst进行sql的解析和优化。为了方便,以下统一使用DataSet统称。

DataSet创建

DataSet通常通过加载外部数据或通过RDD转化创建。

1.加载外部数据

以加载json和mysql为例:

val ds = sparkSession.read.json("/路径/people.json")
val ds = sparkSession.read.format("jdbc")
.options(Map("url" -> "jdbc:mysql://ip:port/db",
"driver" -> "com.mysql.jdbc.Driver",
"dbtable" -> "tableName", "user" -> "root", "root" -> "123")).load()

2.RDD转换为DataSet

通过RDD转化创建DataSet,关键在于为RDD指定schema,通常有两种方式(伪代码):

1.定义一个case class,利用反射机制来推断

1) 从HDFS中加载文件为普通RDD
val lineRDD = sparkContext.textFile("hdfs://ip:port/person.txt").map(_.split(" "))
2) 定义case class(相当于表的schema)
case class Person(id:Int, name:String, age:Int)
3) 将RDD和case class关联
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
4) 将RDD转换成DataFrame
val ds= personRDD.toDF
2.手动定义一个schema StructType,直接指定在RDD上
val schemaString ="name age"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRdd = peopleRdd.map(p=>Row(p(0),p(1)))
val ds = sparkSession.createDataFrame(rowRdd,schema)

操作DataSet的两种风格语法

DSL语法

1.查询DataSet部分列中的内容

personDS.select(col("name"))

personDS.select(col("name"), col("age"))

2.查询所有的name和age和salary,并将salary加1000

personDS.select(col("name"), col("age"), col("salary") + 1000)

personDS.select(personDS("name"), personDS("age"), personDS("salary") + 1000)

3.过滤age大于18的

personDS.filter(col("age") > 18)

4.按年龄进行分组并统计相同年龄的人数

personDS.groupBy("age").count()

注意:直接使用col方法需要import org.apache.spark.sql.functions._

SQL语法

如果想使用SQL风格的语法,需要将DataSet注册成表

personDS.registerTempTable("person")

//查询年龄最大的前两名

val result = sparkSession.sql("select * from person order by age desc limit 2")

//保存结果为json文件。注意:如果不指定存储格式,则默认存储为parquet

result.write.format("json").save("hdfs://ip:port/res2")

Spark SQL的几种使用方式

1.sparksql-shell交互式查询

就是利用Spark提供的shell命令行执行SQL

2.编程

首先要获取Spark SQL编程"入口":SparkSession(当然在早期版本中大家可能更熟悉的是SQLContext,如果是操作hive则为HiveContext)。这里以读取parquet为例:

val spark = SparkSession.builder()
.appName("example").master("local[*]").getOrCreate();

val df = sparkSession.read.format("parquet").load("/路径/parquet文件")

然后就可以针对df进行业务处理了。

3.Thriftserver

beeline客户端连接操作

启动spark-sql的thrift服务,sbin/start-thriftserver.sh,启动脚本中配置好Spark集群服务资源、地址等信息。然后通过beeline连接thrift服务进行数据处理。

hive-jdbc驱动包来访问spark-sql的thrift服务

在项目pom文件中引入相关驱动包,跟访问mysql等jdbc数据源类似。示例:

Class.forName("org.apache.hive.jdbc.HiveDriver")
val conn = DriverManager.getConnection("jdbc:hive2://ip:port", "root", "123");
try {
val stat = conn.createStatement()
val res = stat.executeQuery("select * from people limit 1")
while (res.next()) {
println(res.getString("name"))
}
} catch {
case e: Exception => e.printStackTrace()
} finally{
if(conn!=null) conn.close()
}

Spark SQL 获取Hive数据

Spark SQL读取hive数据的关键在于将hive的元数据作为服务暴露给Spark。除了通过上面thriftserver jdbc连接hive的方式,也可以通过下面这种方式:

首先,配置 $HIVE_HOME/conf/hive-site.xml,增加如下内容:


hive.metastore.uris
thrift://ip:port

然后,启动hive metastore

最后,将hive-site.xml复制或者软链到$SPARK_HOME/conf/。如果hive的元数据存储在mysql中,那么需要将mysql的连接驱动jar包如mysql-connector-java-5.1.12.jar放到$SPARK_HOME/lib/下,启动spark-sql即可操作hive中的库和表。而此时使用hive元数据获取SparkSession的方式为:

val spark = SparkSession.builder()

.config(sparkConf).enableHiveSupport().getOrCreate()

UDF、UDAF、Aggregator

UDF

UDF是最基础的用户自定义函数,以自定义一个求字符串长度的udf为例:

val udf_str_length = udf{(str:String) => str.length}
spark.udf.register("str_length",udf_str_length)
val ds =sparkSession.read.json("路径/people.json")
ds.createOrReplaceTempView("people")
sparkSession.sql("select str_length(address) from people")

UDAF

定义UDAF,需要继承抽象类UserDefinedAggregateFunction,它是弱类型的,下面的aggregator是强类型的。以求平均数为例:

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.types._
object MyAverage extends UserDefinedAggregateFunction {
// Data types of input arguments of this aggregate function
def inputSchema: StructType = StructType(StructField("inputColumn", LongType) :: Nil)
// Data types of values in the aggregation buffer
def bufferSchema: StructType = {
StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil)
}
// The data type of the returned value
def dataType: DataType = DoubleType
// Whether this function always returns the same output on the identical input
def deterministic: Boolean = true
// Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
// standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
// the opportunity to update its values. Note that arrays and maps inside the buffer are still
// immutable.
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = 0L
buffer(1) = 0L
}
// Updates the given aggregation buffer `buffer` with new input data from `input`
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
if (!input.isNullAt(0)) {
buffer(0) = buffer.getLong(0) + input.getLong(0)
buffer(1) = buffer.getLong(1) + 1
}
}
// Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0)
buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}
// Calculates the final result
def evaluate(buffer: Row): Double = buffer.getLong(0).toDouble / buffer.getLong(1)
}
// Register the function to access it
spark.udf.register("myAverage", MyAverage)
val df = spark.read.json("examples/src/main/resources/employees.json")
df.createOrReplaceTempView("employees")
df.show()
val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")
result.show()

Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}
import org.apache.spark.sql.expressions.Aggregator
case class Employee(name: String, salary: Long)
case class Average(var sum: Long, var count: Long)
object MyAverage extends Aggregator[Employee, Average, Double] {
// A zero value for this aggregation. Should satisfy the property that any b + zero = b
def zero: Average = Average(0L, 0L)
// Combine two values to produce a new value. For performance, the function may modify `buffer`
// and return it instead of constructing a new object
def reduce(buffer: Average, employee: Employee): Average = {
buffer.sum += employee.salary
buffer.count += 1
buffer
}
// Merge two intermediate values
def merge(b1: Average, b2: Average): Average = {
b1.sum += b2.sum
b1.count += b2.count
b1
}
// Transform the output of the reduction
def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count
// Specifies the Encoder for the intermediate value type
def bufferEncoder: Encoder[Average] = Encoders.product
// Specifies the Encoder for the final output value type
def outputEncoder: Encoder[Double] = Encoders.scalaDouble
}
val ds = spark.read.json("examples/src/main/resources/employees.json").as[Employee]
ds.show()
// Convert the function to a `TypedColumn` and give it a name
val averageSalary = MyAverage.toColumn.name("average_salary")
val result = ds.select(averageSalary)
result.show()

大数据开发之SparkCore中RDD特点

大数据开发技术之Partition分区的分析

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
国足开练,伊万科维奇有决心!

国足开练,伊万科维奇有决心!

新民晚报
2024-05-28 09:50:41
塔图姆为何丢东决MVP?13项数据已证一切 布朗三战全为绿凯救世主

塔图姆为何丢东决MVP?13项数据已证一切 布朗三战全为绿凯救世主

小马哥谈体育
2024-05-28 22:23:10
我市联合调查云南齐星建材观音洞采石场违法占用耕地、林地问题

我市联合调查云南齐星建材观音洞采石场违法占用耕地、林地问题

科学发掘
2024-05-28 14:07:00
重庆冠军赛抽签有猫腻?球迷质疑:针对王曼昱,保送陈梦4强

重庆冠军赛抽签有猫腻?球迷质疑:针对王曼昱,保送陈梦4强

足球慢镜头
2024-05-28 17:03:46
法比奥穿中国男篮球衣在越南度假:同志们,我们很快回来

法比奥穿中国男篮球衣在越南度假:同志们,我们很快回来

直播吧
2024-05-28 16:16:58
别了曼联,换帅,50岁名帅重返,拉爵发力,瓜迪奥拉有对手了

别了曼联,换帅,50岁名帅重返,拉爵发力,瓜迪奥拉有对手了

球文速递
2024-05-29 00:14:36
震惊!小米智能门锁凌晨3点自己开门!!

震惊!小米智能门锁凌晨3点自己开门!!

悠闲葡萄
2024-05-28 14:49:08
斯波:别忘了约维奇现在才20岁 我们会继续推动他去取得进步

斯波:别忘了约维奇现在才20岁 我们会继续推动他去取得进步

直播吧
2024-05-28 21:25:11
女孩相亲,全程叼着烟,女孩直言:我曾是大哥的女人,你养得起吗

女孩相亲,全程叼着烟,女孩直言:我曾是大哥的女人,你养得起吗

世态言凉
2024-05-28 10:44:59
第一次穿丝袜

第一次穿丝袜

娱乐八卦木木子
2024-05-28 15:58:40
韩德君宴请队友携家属吃饭!美食丰盛,继伟最帅,小丛地位太高

韩德君宴请队友携家属吃饭!美食丰盛,继伟最帅,小丛地位太高

小马哥谈体育
2024-05-28 21:41:05
南京人的身份证上为啥没有省名?解答来了

南京人的身份证上为啥没有省名?解答来了

中国日报
2024-05-28 17:38:19
辽篮三连冠后签下新外援!阿丘尔加盟细节曝光,张镇麟也起到作用

辽篮三连冠后签下新外援!阿丘尔加盟细节曝光,张镇麟也起到作用

元爸体育
2024-05-28 22:04:54
邪门!浙江男子钓鱼时撞见诡异一幕,拍照发网上引发大量关注

邪门!浙江男子钓鱼时撞见诡异一幕,拍照发网上引发大量关注

小胡渔记
2024-05-28 08:20:02
美国防部分析师:中国高超音速武器已经超越俄罗斯,位居全球第一

美国防部分析师:中国高超音速武器已经超越俄罗斯,位居全球第一

笑语娱乐
2024-05-26 00:20:17
又有5人被抓!巨雷彻底炸响!比恒大还多1.14万亿?15万名投资者欲哭无泪…

又有5人被抓!巨雷彻底炸响!比恒大还多1.14万亿?15万名投资者欲哭无泪…

保险课堂
2024-05-28 23:28:07
雷吉-米勒:东欧是史上最强进攻后场组合 追梦提水花被我们质疑

雷吉-米勒:东欧是史上最强进攻后场组合 追梦提水花被我们质疑

直播吧
2024-05-29 00:26:13
美专家:若中国不买美国疫苗,将会导致40万人死亡!

美专家:若中国不买美国疫苗,将会导致40万人死亡!

梦儿在北大荒呀
2024-05-28 17:24:48
大陆演习后,在南海占了28个岛的越南,给了一个不同寻常的表态

大陆演习后,在南海占了28个岛的越南,给了一个不同寻常的表态

星辰故事屋
2024-05-28 20:18:55
中甲第12轮前瞻:冲超集团集体客场出战,重庆铜梁龙迎来良机

中甲第12轮前瞻:冲超集团集体客场出战,重庆铜梁龙迎来良机

小鬼头体育
2024-05-29 02:50:32
2024-05-29 03:50:44
IT爱好者小尚
IT爱好者小尚
分享IT教育类信息
630文章数 55关注度
往期回顾 全部

科技要闻

4月中国手机需求回升 iPhone出货量增长52%

头条要闻

安徽坍塌居民楼结构脆弱 专家称其"像积木搭在墙上"

头条要闻

安徽坍塌居民楼结构脆弱 专家称其"像积木搭在墙上"

体育要闻

官方:曼城当选环足奖欧洲年度最佳俱乐部,击败皇马、药厂等队

娱乐要闻

昆凌晒三胎正面照,2岁妹妹超像周杰伦

财经要闻

东方通收购藏雷 花6亿买来"业绩变脸"

汽车要闻

三联屏/纯电续航318km 岚图FREE 318官图发布

态度原创

本地
游戏
教育
健康
军事航空

本地新闻

食味印象|歙县限定!枇杷味儿的清甜初夏

支持试玩 像素剧情《Until Then》6月25日发售

教育要闻

【中招咨询】一文看懂北京八中教育集团招生政策!

晚餐不吃or吃七分饱,哪种更减肥?

军事要闻

以军装甲部队进入加沙地带南部城市拉法市中心

无障碍浏览 进入关怀版