您现在的位置是:首页 >其他 >Spark的dropDuplicates或distinct 对数据去重网站首页其他
Spark的dropDuplicates或distinct 对数据去重
消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法。
distinct数据去重
distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重。
使用distinct:返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。
dropDuplicates()y有四个重载方法
- 第一个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
- 第二个def dropDuplicates(colNames: Seq[String])
传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
if (cols.isEmpty) {
throw new AnalysisException(
s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
}
cols
}
Deduplicate(groupCols, planWithBarrier)
}
- 第三个def dropDuplicates(colNames: Array[String])
传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法。
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
- 第四个def dropDuplicates(col1: String, cols: String*)
传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。
@scala.annotation.varargs
def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
val colNames: Seq[String] = col1 +: cols
dropDuplicates(colNames)
}
第三和第四个本质上还是调用了第二个方法,所以我们在使用的时候如果需要根据指定的列进行数据去重,可以直接传入一个Seq。
第一个方法默认根据所有列去重,实际上也是调用了第二个方法,然后传入参数this.columns,即所有的列组成的Seq。
所以各位想深究dropDuplicate()去重的核心代码,只需要研究第二个去重方法即可。
特别的
使用dropDuplicate时,可以先排序,去重时,会保留排第一的,如:
val sourceByTime = df.select("url", "name","updatetime").repartition(col("url"))//这一步是为了解决下面注意事项里出现的问题
.orderBy(desc("updatetime")).dropDuplicates(Seq("url"))
上面代码,会保留每个url最新的数据。
注意事项
用dropDuplicates可能会出现重复数据,原因:
数据存在多个excuter中,因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使用dropDuplicate去重的时候,可能只是一个excutor内的数据进行了去重,别的excutor上可能还会有重复的数据。
数据是存放在不同分区的,因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使用dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。
参考
https://www.cnblogs.com/Jaryer/p/13558701.html
https://blog.csdn.net/qq_39900031/article/details/115797287
https://www.yangch.net/archives/114