您现在的位置是:首页 >技术交流 >spark 读写数据网站首页技术交流
spark 读写数据
SparkSQL 数据源的加载与保存
JOEL-T99
于 2021-12-22 17:57:31 发布
2191
收藏 3
分类专栏: BigData 文章标签: spark scala sparksql
版权
BigData
专栏收录该内容
58 篇文章3 订阅
订阅专栏
Spark SQL 支持通过 DataFrame 接口对多种数据源进行操作。可以使用关系转换对 DataFrame 进行操作,也可以用于创建临时视图。将 DataFrame 注册为临时视图允许对其数据运行 SQL 查询。
- 通用的加载/保存功能
数据源由它们的完全限定名称(即org.apache.spark.sql.parquet)指定,但对于内置源,可以使用它们的短名称(json、parquet、jdbc、orc、libsvm、csv、text)。从任何数据源类型加载的 DataFrame 都可以使用此语法转换为其他类型。
🌟 默认的数据加载/保存格式为 parquet
1.1 加载数据
spark.read.load 是加载数据的通用方法!对不同的数据,可以使用不同的数据格式进行设定。
语法格式:
spark.read.format(“…”)[.option(“…”)].load(“…”)
1
参数描述:
format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
load:加载数据的路径
option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable)
使用 SQL 方式:文件格式.文件路径
spark.sql(select * from json.path
).show
1
1.2 保存数据
df.write.save 是保存数据的通用方法!
语法格式:
df.write.format(“…”)[.option(“…”)].save(“…”)
1
参数描述:
format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
save:保存数据的路径
option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable);也可以通过 mode 方法使用 SaveMode 指明如何处理数据
SaveMode 是一个枚举类型,都是没有加锁的,也不是原子操作!
Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) “error” or “errorifexists” (default) 如果数据已经存在,则抛出异常
SaveMode.Append “append” 如果数据/表已经存在,则追加
SaveMode.Overwrite “overwrite” 如果数据/表已经存在,则覆盖
SaveMode.Ignore “ignore” 如果数据已经存在,则不操
1.3 持久化到表中
DataFrames 也可以使用 saveAsTable 命令将其作为持久表保存到 Hive Metastore 中。需要注意的是,使用此功能不需要现有的 Hive 部署。Spark 将会创建一个默认的本地 Hive 元存储(使用 Derby)。
与 createOrReplaceTempView 命令不同, saveAsTable 将实现 DataFrame 的内容,并创建一个指向Hive metastore 中的数据的指针。只要持有 metastore 的连接,即使 Spark 程序重新启动,表也仍然存在。持久化表的 DataFrame 可以通过调用 SparkSession 上的 table 方法来创建。
对于基本文件的数据源,例如 text、parquet、json 等,您可以通过 path 选项指定自定义表路径 ,例如 df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表时,不会删除自定义表路径,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入到仓库目录下的默认表路径中。当表被删除时,默认的表路径也将被删除。
从 Spark 2.1 开始,持久数据源表将每个分区的元数据存储在 Hive 元存储中。这带来了几个好处:
因为metastore只能为查询返回必要的分区,所以不再需要在第一个查询中查所有的分区。
Hive DDL 操作比如ALTER TABLE PARTITION … SET LOCATION,现在可用数据源 API 创建的表。
需注意,在创建外部数据源表(带有path选项的表)时,默认情况下不会收集分区信息。如果要同步 Metastore 中的分区信息,可以调用 MSCK REPAIR TABLE。
1.4 分桶、排序、分区
基于文件的数据源,可以对输出进行分桶或分区并排序。
分桶和排序仅适用于持久化到表中:
peopleDF.write.bucketBy(42, “name”).sortBy(“age”).saveAsTable(“people_bucketed”)
1
当使用 Dataset API 时,使用save和saveAsTable 之前可使用分区。
usersDF.write.partitionBy(“favorite_color”).format(“parquet”).save(“namesPartByColor.parquet”)
1
对单个表同时使用分区和分桶:
usersDF
.write
.partitionBy(“favorite_color”)
.bucketBy(42, “name”)
.saveAsTable(“users_partitioned_bucketed”)
1
2
3
4
5
partitionBy 会创建一个目录结构,因此,它对具有高基数的列的适用性有限。相反, bucketBy将数据分布在固定数量的桶中,并且可以在唯一值的数量不受限制时使用。
- 文件格式
2.1 Parquet
Spark SQL 的默认数据源为 Parquet 格式,Parquet 是一种流行的列式存储格式,可以高效的存储具有嵌套字段的记录。
存储方式的示意图如下:
Parquet 加载/保存:
加载数据:
val df = spark.read.load(“path”)
1
保存数据:
df.write.mode(“…”).save(“path”)
1
2.2 JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将其加载为 DataSet[Row]。
JSON 加载/保存:
加载数据:
val df = spark.read.format(“json”).load(“path”)
1
val df = spark.read.json(“path”)
1
保存数据:
df.write.format(“json”)[.mode(“…”)].save(“path”)
1
2.3 CSV
Spark 可以配置 CSV 文件的列信息,读取 CSV 文件时,第一行可作为表头。
spark.read.format(“csv”).option(“sep”, “;”).option(“inferSchema”, “true”).option(“header”, “true”).load(“path”)
1
————————————————
版权声明:本文为CSDN博主「JOEL-T99」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_47243236/article/details/122091740