您现在的位置是:首页 >技术杂谈 >Spark写入Hive报错Mkdir failed on :com.alibaba.jfs.JindoRequestPath网站首页技术杂谈

Spark写入Hive报错Mkdir failed on :com.alibaba.jfs.JindoRequestPath

我是坏人哦 2024-07-13 12:01:02
简介Spark写入Hive报错Mkdir failed on :com.alibaba.jfs.JindoRequestPath

1. 报错内容

23/05/31 14:32:13 INFO [Driver] FsStats: cmd=mkdirs, src=oss://sync-to-bi.[马赛克].aliyuncs.com/tmp/hive, dst=null, size=0, parameter=FsPermission:rwx-wx-wx, time-in-ms=32, version=3.5.0
23/05/31 14:32:13 ERROR [Driver] ApplicationMaster: User class threw exception: org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.io.IOException: Mkdir failed on :com.alibaba.jfs.JindoRequestPath@7b61ed9f;
org.apache.spark.sql.AnalysisException: java.lang.RuntimeException: java.io.IOException: Mkdir failed on :com.alibaba.jfs.JindoRequestPath@7b61ed9f;
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:111)
        at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
        at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:138)
        at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:122)
        at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:165)
        at org.apache.spark.sql.internal.SharedState.globalTempViewManager(SharedState.scala:160)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$2.apply(HiveSessionStateBuilder.scala:55)
        at org.apache.spark.sql.hive.HiveSessionStateBuilder$$anonfun$2.apply(HiveSessionStateBuilder.scala:55)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager$lzycompute(SessionCatalog.scala:91)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.globalTempViewManager(SessionCatalog.scala:91)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.isTemporaryTable(SessionCatalog.scala:782)
        at org.apache.spark.sql.internal.CatalogImpl.tableExists(CatalogImpl.scala:260)
        at com.tcl.task.terminalmanage.TerminalManageUtils$.saveDataFrame2Hive(TerminalManageUtils.scala:148)
        at com.tcl.task.terminalmanage.warehouse.ods.Ods_Nps_Stability_Crash_Dropbox$.execute(Ods_Nps_Stability_Crash_Dropbox.scala:47)
        at com.tcl.task.terminalmanage.CommonMain.main(CommonMain.scala:28)
        at com.tcl.task.terminalmanage.warehouse.ods.Ods_Nps_Stability_Crash_Dropbox.main(Ods_Nps_Stability_Crash_Dropbox.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Caused by: java.lang.RuntimeException: java.io.IOException: Mkdir failed on :com.alibaba.jfs.JindoRequestPath@7b61ed9f
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:606)
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:544)
        at org.apache.spark.sql.hive.client.HiveClientImpl.newState(HiveClientImpl.scala:199)
        at org.apache.spark.sql.hive.client.HiveClientImpl.<init>(HiveClientImpl.scala:129)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
        at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:284)
        at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:386)
        at org.apache.spark.sql.hive.HiveUtils$.newClientForMetadata(HiveUtils.scala:288)
        at org.apache.spark.sql.hive.HiveExternalCatalog.client$lzycompute(HiveExternalCatalog.scala:67)
        at org.apache.spark.sql.hive.HiveExternalCatalog.client(HiveExternalCatalog.scala:66)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply$mcZ$sp(HiveExternalCatalog.scala:224)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:224)
        at org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$databaseExists$1.apply(HiveExternalCatalog.scala:224)
        at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:102)
        ... 20 more
Caused by: java.io.IOException: Mkdir failed on :com.alibaba.jfs.JindoRequestPath@7b61ed9f
        at com.alibaba.jfs.OssFileletSystem.mkdir(OssFileletSystem.java:184)
        at com.aliyun.emr.fs.internal.ossnative.OssNativeStore.mkdirs(OssNativeStore.java:521)
        at com.aliyun.emr.fs.oss.JindoOssFileSystem.mkdirsCore(JindoOssFileSystem.java:194)
        at com.aliyun.emr.fs.common.AbstractJindoShimsFileSystem.mkdirs(AbstractJindoShimsFileSystem.java:389)
        at org.apache.hadoop.hive.ql.exec.Utilities.createDirsWithPermission(Utilities.java:3385)
        at org.apache.hadoop.hive.ql.session.SessionState.createRootHDFSDir(SessionState.java:705)
        at org.apache.hadoop.hive.ql.session.SessionState.createSessionDirs(SessionState.java:650)
        at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:582)
        ... 36 more
Caused by: java.io.IOException: ErrorCode : 403 , ErrorMsg: HTTP/1.1 403 Forbidden: <?xml version="1.0" encoding="UTF-8"?>
<Error>
  <Code>AccessDenied</Code>
  <Message>You have no right to access this object because of bucket acl.</Message>
  <RequestId>6[马赛克]5</RequestId>
  <HostId>sync-to-bi.[马赛克].aliyuncs.com</HostId>
  <EC>0003-00000001</EC>
</Error>
 ERROR_CODE : 1010
        at com.alibaba.jboot.JbootFuture.get(JbootFuture.java:145)
        at com.alibaba.jfs.OssFileletSystem.mkdir(OssFileletSystem.java:178)
        ... 43 more

2. 报错程序

package com.tcl.task.terminalmanage.warehouse.ods

import com.tcl.task.terminalmanage.{CommonMain, TerminalManageUtils}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object Ods_Nps_Stability_Crash_Dropbox extends CommonMain {
  val HiveDatabase = "te[马赛克]"
  val HiveTableName = "ods_[马赛克]_di"
  val ck_Table = "ods_[马赛克]_cluster"

  val colNames = Array("[马赛克]", "[马赛克]","[反正就是一些字段名]")

  override def execute(spark: SparkSession, calcDate: String): Unit = {
      spark.sql("set spark.sql.caseSensitive=true")
      val sc = spark.sparkContext
      val logPath = "oss://[马赛克]@sync-to-bi.[马赛克]/" + dateConverYYmm(calcDate) + "*"

      if (!Mutils.isPathExistTest(logPath, sc)) {
        return
      }
      var df = spark.read.json(logPath)

      for (col <- colNames) {
        if (!df.columns.contains(col)) {
          df = df.withColumn(col, lit(""))
        }
      }

    val result = df.withColumn("recordDate",lit(calcDate))
      .select("[马赛克]", "[马赛克]","[反正就是一些字段名]","recordDate")

    TerminalManageUtils.saveDataFrame2Hive(spark,result,HiveDatabase,HiveTableName,calcDate,0)
  }

  //2022-10-15
  def dateConverYYmm(date: String) = {
    val str1 = date.substring(0, 4)
    val str2 = date.substring(5, 7)
    val str3 = date.substring(8, 10)
    str1 + str2 + str3
  }

}

        程序很简单,就是数仓ODS层计算逻辑,直接从阿里云OSS读取数据,补充上一些必要的列,最后数据落盘到hive表。

3. 问题分析

3.1 分析报错内容

        根据下面两段报错提示可以得出:Spark Driver在写入Hive时,试图在oss://sync-to-bi.[马赛克].aliyuncs.com/tmp/hive这个路径下创建目录。但是sync-to-bi这个是数据源桶,只有读权限,没有写权限,自然会AccessDenied。

      

        问题的关键在于:为什么Spark Driver要在写入Hive时,往数据源的/tmp/hive创建目录?

/tmp/hive目录存放的是Hive的临时操作目录比如插入数据,insert into插入Hive表数据的操作,Hive的操作产生的操作临时文件都会存储在这里,或者比如在${HIVE_HOME}/bin下执行,sh hive,进入Hive的命令行模式,都会在这里/tmp/hive目录下产生一个Hive当前用户名字命名的临时文件夹,这个文件夹权限是700,默认是hadoop的启动用户,我的hadoop用户是hadoopadmin,所以名字是hadoopadmin的文件夹  

-- Hive的/tmp/hive以及/user/hive/warehouse目录对Hive的影响 | 码农家园

        如果像上面说的,insert into操作会在tmp/hive产生临时文件。那为什么不是在目标OSS创建临时文件,而是在源数据的OSS创建?我能在代码中指定产生临时文件的位置吗?

3.2 根据猜想进行尝试

        尝试修改默认fs,指向目标OSS,即hive表location所在的OSS

         再次运行代码,竟然真的成功了!但是进一步思考,在父类CommonMain中本就是有默认fs的配置

         那么,为什么父类中的配置没有生效?

3.3 添加日志分析父类fs配置不生效的原因

package com.tcl.task.terminalmanage.warehouse.ods

import com.tcl.task.terminalmanage.{CommonMain, TerminalManageUtils}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._


object Ods_Nps_Stability_Crash_Dropbox extends CommonMain {
  private val logger = LoggerFactory.getLogger("Luo")
  val HiveDatabase = "te[马赛克]"
  val HiveTableName = "ods_[马赛克]_di"
  val ck_Table = "ods_[马赛克]_cluster"

  val colNames = Array("[马赛克]", "[马赛克]","[反正就是一些字段名]")

  override def execute(spark: SparkSession, calcDate: String): Unit = {
      logger.info("-------1--------" + spark.sparkContext.hadoopConfiguration.get("fs.defaultFS"))
      spark.sql("set spark.sql.caseSensitive=true")
      val sc = spark.sparkContext
      val logPath = "oss://[马赛克]@sync-to-bi.[马赛克]/" + dateConverYYmm(calcDate) + "*"

      logger.info("-------2--------" + spark.sparkContext.hadoopConfiguration.get("fs.defaultFS"))
      if (!Mutils.isPathExistTest(logPath, sc)) {
        return
      }

      logger.info("-------3--------" + spark.sparkContext.hadoopConfiguration.get("fs.defaultFS"))
      var df = spark.read.json(logPath)
      logger.info("-------4--------" + spark.sparkContext.hadoopConfiguration.get("fs.defaultFS"))

      for (col <- colNames) {
        if (!df.columns.contains(col)) {
          df = df.withColumn(col, lit(""))
        }
      }

    val result = df.withColumn("recordDate",lit(calcDate))
      .select("[马赛克]", "[马赛克]","[反正就是一些字段名]","recordDate")

  logger.info("-------5--------" + spark.sparkContext.hadoopConfiguration.get("fs.defaultFS"))
   TerminalManageUtils.saveDataFrame2Hive(spark,result,HiveDatabase,HiveTableName,calcDate,0)
  }

  //2022-10-15
  def dateConverYYmm(date: String) = {
    val str1 = date.substring(0, 4)
    val str2 = date.substring(5, 7)
    val str3 = date.substring(8, 10)
    str1 + str2 + str3
  }

}

        日志结果:

Luo: -------1--------oss://data[马赛克]
Luo: -------2--------oss://data[马赛克]
Luo: -------3--------oss://[马赛克]@sync-to-bi.[马赛克].aliyuncs.com
Luo: -------4--------oss://[马赛克]@sync-to-bi.[马赛克].aliyuncs.com
Luo: -------5--------oss://[马赛克]@sync-to-bi.[马赛克].aliyuncs.com

        谁承想,问题竟然出现在了一个路径是否存在的分支判断。

        由于很明显程序运行不会进入这个if分支,所以它自动被忽略了。分支不会执行,但判断条件一定是会执行的。 而越容易出问题的,往往就是在这种非常容易被忽略的地方。

4. 总结

        如果程序出现了一些“灵异”现象,很有可能,问题出现在你一开始就忽略的地方。

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。