您现在的位置是:首页 >技术交流 >spark 读写数据网站首页技术交流

spark 读写数据

行走荷尔蒙 2023-05-14 20:00:02
简介spark 读写数据

SparkSQL 数据源的加载与保存

JOEL-T99

于 2021-12-22 17:57:31 发布

2191
收藏 3
分类专栏: BigData 文章标签: spark scala sparksql
版权

BigData
专栏收录该内容
58 篇文章3 订阅
订阅专栏
Spark SQL 支持通过 DataFrame 接口对多种数据源进行操作。可以使用关系转换对 DataFrame 进行操作,也可以用于创建临时视图。将 DataFrame 注册为临时视图允许对其数据运行 SQL 查询。

  1. 通用的加载/保存功能
    数据源由它们的完全限定名称(即org.apache.spark.sql.parquet)指定,但对于内置源,可以使用它们的短名称(json、parquet、jdbc、orc、libsvm、csv、text)。从任何数据源类型加载的 DataFrame 都可以使用此语法转换为其他类型。

🌟 默认的数据加载/保存格式为 parquet

1.1 加载数据
spark.read.load 是加载数据的通用方法!对不同的数据,可以使用不同的数据格式进行设定。

语法格式:

spark.read.format(“…”)[.option(“…”)].load(“…”)
1
参数描述:

format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
load:加载数据的路径
option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable)
使用 SQL 方式:文件格式.文件路径

spark.sql(select * from json.path).show
1
1.2 保存数据
df.write.save 是保存数据的通用方法!

语法格式:

df.write.format(“…”)[.option(“…”)].save(“…”)
1
参数描述:

format:指定加载的数据类型,包括 csv、jdbc、json、orc、parquet、textFile
save:保存数据的路径
option:在 jdbc 格式下,需要传入 JDBC 相应的参数(url、user、password、dbtable);也可以通过 mode 方法使用 SaveMode 指明如何处理数据
SaveMode 是一个枚举类型,都是没有加锁的,也不是原子操作!

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists (default) “error” or “errorifexists” (default) 如果数据已经存在,则抛出异常
SaveMode.Append “append” 如果数据/表已经存在,则追加
SaveMode.Overwrite “overwrite” 如果数据/表已经存在,则覆盖
SaveMode.Ignore “ignore” 如果数据已经存在,则不操
1.3 持久化到表中
DataFrames 也可以使用 saveAsTable 命令将其作为持久表保存到 Hive Metastore 中。需要注意的是,使用此功能不需要现有的 Hive 部署。Spark 将会创建一个默认的本地 Hive 元存储(使用 Derby)。

与 createOrReplaceTempView 命令不同, saveAsTable 将实现 DataFrame 的内容,并创建一个指向Hive metastore 中的数据的指针。只要持有 metastore 的连接,即使 Spark 程序重新启动,表也仍然存在。持久化表的 DataFrame 可以通过调用 SparkSession 上的 table 方法来创建。

对于基本文件的数据源,例如 text、parquet、json 等,您可以通过 path 选项指定自定义表路径 ,例如 df.write.option(“path”, “/some/path”).saveAsTable(“t”)。删除表时,不会删除自定义表路径,表数据仍然存在。如果未指定自定义表路径,Spark 会将数据写入到仓库目录下的默认表路径中。当表被删除时,默认的表路径也将被删除。

从 Spark 2.1 开始,持久数据源表将每个分区的元数据存储在 Hive 元存储中。这带来了几个好处:

因为metastore只能为查询返回必要的分区,所以不再需要在第一个查询中查所有的分区。
Hive DDL 操作比如ALTER TABLE PARTITION … SET LOCATION,现在可用数据源 API 创建的表。
需注意,在创建外部数据源表(带有path选项的表)时,默认情况下不会收集分区信息。如果要同步 Metastore 中的分区信息,可以调用 MSCK REPAIR TABLE。

1.4 分桶、排序、分区
基于文件的数据源,可以对输出进行分桶或分区并排序。

分桶和排序仅适用于持久化到表中:

peopleDF.write.bucketBy(42, “name”).sortBy(“age”).saveAsTable(“people_bucketed”)
1
当使用 Dataset API 时,使用save和saveAsTable 之前可使用分区。

usersDF.write.partitionBy(“favorite_color”).format(“parquet”).save(“namesPartByColor.parquet”)
1
对单个表同时使用分区和分桶:

usersDF
.write
.partitionBy(“favorite_color”)
.bucketBy(42, “name”)
.saveAsTable(“users_partitioned_bucketed”)
1
2
3
4
5
partitionBy 会创建一个目录结构,因此,它对具有高基数的列的适用性有限。相反, bucketBy将数据分布在固定数量的桶中,并且可以在唯一值的数量不受限制时使用。

  1. 文件格式
    2.1 Parquet
    Spark SQL 的默认数据源为 Parquet 格式,Parquet 是一种流行的列式存储格式,可以高效的存储具有嵌套字段的记录。

存储方式的示意图如下:

Parquet 加载/保存:

加载数据:

val df = spark.read.load(“path”)
1
保存数据:

df.write.mode(“…”).save(“path”)
1
2.2 JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将其加载为 DataSet[Row]。

JSON 加载/保存:

加载数据:

val df = spark.read.format(“json”).load(“path”)
1
val df = spark.read.json(“path”)
1
保存数据:

df.write.format(“json”)[.mode(“…”)].save(“path”)
1
2.3 CSV
Spark 可以配置 CSV 文件的列信息,读取 CSV 文件时,第一行可作为表头。

spark.read.format(“csv”).option(“sep”, “;”).option(“inferSchema”, “true”).option(“header”, “true”).load(“path”)
1

————————————————
版权声明:本文为CSDN博主「JOEL-T99」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/weixin_47243236/article/details/122091740

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。