您现在的位置是:首页 >学无止境 >spark 数据的加载和保存(Parquet、JSON、CSV、MySql)网站首页学无止境
spark 数据的加载和保存(Parquet、JSON、CSV、MySql)
spark数据的加载和保存
SparkSQL 默认读取和保存的文件格式为 parquet
1.加载数据
spark.read.load 是加载数据的通用方法
scala> spark.read.
csv format jdbc json load option options orc parquet schema
table text textFile
如果读取不同格式的数据,可以对不同的数据格式进行设定
scala> spark.read.format("…")[.option("…")].load("…")
format("…"):指定加载的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和
"textFile"。
load("…"):在"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"格式下需要传入加载
数据的路径。
option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable
前面都是使用 read API 先把文件加载到 DataFrame 然后再查询,也可以直接在文件上进行查询: 文件格式.`文件路径`
scala>spark.sql("select * from json.`/opt/tmp/data/test.json`").show
2.保存数据
df.write.save 是保存数据的通用方法
scala>df.write.
csv jdbc json orc parquet textFile…
scala>df.write.format("…")[.option("…")].save("…")
format("…"):指定保存的数据类型,包括"csv"、"jdbc"、"json"、"orc"、"parquet"和"textFile"等
save ("…"):在"csv"、"orc"、"parquet"和"textFile"格式下需要传入保存数据的路径。
option("…"):在"jdbc"格式下需要传入 JDBC 相应参数,url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据,使用 mode()方法来设置。
SaveMode 是一个枚举类,其中的常量包括:
保存:
df.write.mode("append").json("/opt/tmp/data/output")
Parquet
Spark SQL 的默认数据源为 Parquet 格式。Parquet 是一种能够有效存储嵌套数据的列式
存储格式。
数据源为 Parquet 文件时,Spark SQL 可以方便的执行所有的操作,不需要使用 format。
修改配置项 spark.sql.sources.default,可修改默认数据源格式。
1.加载数据
scala> val df = spark.read.load("tmp/resources/test.parquet")
scala> df.show
2.保存数据
scala> var df = spark.read.json("/opt/tmp/data/input/test.json")
//保存为 parquet 格式
scala> df.write.mode("append").save("/opt/tmp/data/output")
JSON
Spark SQL 能够自动推测 JSON 数据集的结构,并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。
注意:Spark 读取的 JSON 文件不是传统的 JSON 文件,每一行都应该是一个 JSON 串。格式如下:
{"name":"zhangsan"}
{"name":"lisi", "age":21}
[{"name":"wangwu", "age":21},{"name":"xiaohua", "age":22}]
1.导入隐式转换
import spark.implicits.
2.加载 JSON 文件
val path = "/opt/tmp/spark-test/test.json"
val peopleDF = spark.read.json(path)
3.创建临时表
peopleDF.createOrReplaceTempView("people")
4.数据查询
val teenagerNamesDF = spark.sql("SELECT name FROM test WHERE age BETWEEN 20 AND 22")teenagerNamesDF.show()
CSV
Spark SQL 可以配置 CSV 文件的列表信息,读取 CSV 文件,CSV 文件的第一行设置为数据列
spark.read.format("csv").option("sep", ";").option("inferSchema", "true").option("header", "true").load("data/test.csv")
MySQL
Spark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame,通过对
DataFrame 一系列的计算后,还可以将数据再写回关系型数据库中。如果使用 spark-shell 操
作,可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类
路径下
bin/spark-shell
--jars mysql-connector-java-5.1.27-bin.jar
Idea 中通过 JDBC 对 Mysql 进行操作
1.导入依赖
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>
2.读取数据(mysql)
package SparkTest.sparkmysql
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
object SparkMysqlRead {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
//方式 1:通用的 load 方法读取
spark.read.format("jdbc")
.option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123")
.option("dbtable", "user_test")
.load().show()
//方式 2:通用的 load 方法读取 参数另一种形式
spark.read.format("jdbc")
.options(Map("url"->"jdbc:mysql://192.168.58.203:3306/testdb?user=root&password=123", "dbtable"->"user_test","driver"->"com.mysql.jdbc.Driver")).load().show()
//方式 3:使用 jdbc 方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user_test", props)
df.show()
//释放资源
spark.stop()
}
}
3.写入数据(mysql)
package SparkTest.sparkmysql
import java.util.Properties
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Dataset, SaveMode, SparkSession}
object SparkMysqlWrite {
case class User(name: String, age: Long)
def main(args: Array[String]): Unit = {
val conf: SparkConf = new
SparkConf().setMaster("local[*]").setAppName("SparkSQL")
//创建 SparkSession 对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._
val rdd: RDD[User] = spark.sparkContext.makeRDD(List(User("zhangsan", 20), User("lisi", 21)))
val ds: Dataset[User] = rdd.toDS()
//方式 1:通用的方式 format 指定写出类型
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://192.168.58.203:3306/testdb")
.option("user", "root")
.option("password", "123")
.option("dbtable", "user1")
.mode(SaveMode.Append)
.save()
//方式 2:通过 jdbc 方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://192.168.58.203:3306/testdb", "user2", props)
//释放资源
spark.stop()
}
}