您现在的位置是:首页 >其他 >Spark的dropDuplicates或distinct 对数据去重网站首页其他

Spark的dropDuplicates或distinct 对数据去重

Code_LT 2023-06-14 04:00:02
简介Spark的dropDuplicates或distinct 对数据去重

消除重复的数据可以通过使用 distinct 和 dropDuplicates 两个方法。

distinct数据去重

distinct 是所有的列进行去重的操作,假如你的 DataFrame里面有10列,那么只有这10列完全相同才会去重。
使用distinct:返回当前DataFrame中不重复的Row记录。该方法和接下来的dropDuplicates()方法不传入指定字段时的结果相同。

dropDuplicates()y有四个重载方法

  1. 第一个def dropDuplicates(): Dataset[T] = dropDuplicates(this.columns)
    这个方法,不需要传入任何的参数,默认根据所有列进行去重,然后按数据行的顺序保留每行数据出现的第一条。
 def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
  1. 第二个def dropDuplicates(colNames: Seq[String])
    传入的参数是一个序列。你可以在序列中指定你要根据哪些列的重复元素对数据表进行去重,然后也是返回每一行数据出现的第一条
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
    val resolver = sparkSession.sessionState.analyzer.resolver
    val allColumns = queryExecution.analyzed.output
    val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
      // It is possibly there are more than one columns with the same name,
      // so we call filter instead of find.
      val cols = allColumns.filter(col => resolver(col.name, colName))
      if (cols.isEmpty) {
        throw new AnalysisException(
          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
      }
      cols
    }
    Deduplicate(groupCols, planWithBarrier)
  }
  1. 第三个def dropDuplicates(colNames: Array[String])
    传入的参数是一个数组,然后方法会把数组转换为序列然后再调用第二个方法。
def dropDuplicates(colNames: Array[String]): Dataset[T] = dropDuplicates(colNames.toSeq)
  1. 第四个def dropDuplicates(col1: String, cols: String*)
    传入的参数为字符串,在方法体内会把你传入的字符串组合成一个序列再调用第二个方法。
  @scala.annotation.varargs
  def dropDuplicates(col1: String, cols: String*): Dataset[T] = {
    val colNames: Seq[String] = col1 +: cols
    dropDuplicates(colNames)
  }

第三和第四个本质上还是调用了第二个方法,所以我们在使用的时候如果需要根据指定的列进行数据去重,可以直接传入一个Seq。

第一个方法默认根据所有列去重,实际上也是调用了第二个方法,然后传入参数this.columns,即所有的列组成的Seq。

所以各位想深究dropDuplicate()去重的核心代码,只需要研究第二个去重方法即可。

特别的

使用dropDuplicate时,可以先排序,去重时,会保留排第一的,如:

val sourceByTime = df.select("url", "name","updatetime").repartition(col("url"))//这一步是为了解决下面注意事项里出现的问题
            .orderBy(desc("updatetime")).dropDuplicates(Seq("url")) 

上面代码,会保留每个url最新的数据。

注意事项

用dropDuplicates可能会出现重复数据,原因:

数据存在多个excuter中,因为spark是分布式计算的,数据在计算的时候会分布在不同的excutor上,使用dropDuplicate去重的时候,可能只是一个excutor内的数据进行了去重,别的excutor上可能还会有重复的数据。

数据是存放在不同分区的,因为spark是分布式计算的,数据在计算的时候会分散在不同的分区中,使用dropDuplicate去重的时候,不同的区分可能还会存在相同的数据。

参考

https://www.cnblogs.com/Jaryer/p/13558701.html
https://blog.csdn.net/qq_39900031/article/details/115797287
https://www.yangch.net/archives/114

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。