您现在的位置是:首页 >技术交流 >Spark写入数据到Hbase(hdfs bulkload方式)网站首页技术交流
Spark写入数据到Hbase(hdfs bulkload方式)
简介Spark写入数据到Hbase(hdfs bulkload方式)
- 运行系统变量配置kerberos
- sparksession配置
- spark对hbase的依赖配置
- spark sql读取源数据
- 将数据转换为HFile格式
- 使用HBase的bulkload功能将HFile加载到HBase表中
spakr-kerberos系统环境认证参数配置
System.setProperty("java.security.krb5.conf", "/etc/krb5.conf")
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
System.setProperty("sun.security.krb5.debug", "true")
System.setProperty("dfs.data.transfer.protection", "integrity")
System.setProperty("hadoop.security.authentication", "Kerberos")
System.setProperty("hadoop.security.authorization", "true")
System.setProperty("dfs.namenode.kerberos.principal", Hdfs.NameNode.principal)
System.setProperty("dfs.datanode.kerberos.principal", Hdfs.DataNode.principal)
UserGroupInformation.loginUserFromKeytab(HBase.user, HBase.keytab)
Spark-Session上下文参数配置
val spark: (String, Int) => SparkSession = (appName: String, parallelism: Int) => {
val sparkConf = new SparkConf
//类名+hive表名
sparkConf.setAppName(appName)
sparkConf.set("spark.driver.maxResultSize", "0")
sparkConf.set("spark.default.parallelism", parallelism.toString)
sparkConf.set("spark.sql.shuffle.partitions", parallelism.toString)
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") //.registerKryoClasses(Array(classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[Result])) //告知哪些类型需要序列化
//sparkConf.set("spark.executor.instances", "250") 去掉即可,spark3.0 支持动态创建exectors个数
//sparkConf.set("spark.port.maxRetries", "100")
//sparkConf.set("spark.executor.memoryOverhead", "1024") // 6144 需要根据实际情况调节,去掉
sparkConf.set("spark.network.timeout", "100000000")
sparkConf.set("spark.rpc.askTimeout", "6000000")
sparkConf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
sparkConf.set("spark.yarn.max.executor.failures", "10")
sparkConf.set("spark.shuffle.file.buffer", "10M")
sparkConf.set("spark.reducer.maxSizeInFlight", "64M")
sparkConf.set("spark.eventLog.enabled", "false")
sparkConf.set("spark.sql.debug.maxToStringFields", "5000")
sparkConf.set("spark.ui.showConsoleProgress", "false")
sparkConf.set("spark.dynamicAllocation.executorIdleTimeout", "1800")
if (HBase.kerberos) {
//Frank
sparkConf.set("keytab", Hdfs.keytab)
sparkConf.set("principal", Hdfs.principal)
sparkConf.set("hadoop.security.authentication", "Kerberos")
sparkConf.set("hadoop.security.authorization", "true")
System.setProperty("sun.security.krb5.conf", "/ect/krb5.conf")
sparkConf.set("hive.metastore.uris",Hive.hiveMetastoreUris)
sparkConf.set("hive.metastore.warehouse.dir",Hive.hiveMetastoreWarehouseDir)
/调试kerberos///
sparkConf.set("spark.yarn.appMasterEnv.HADOOP_JAAS_DEBUG", "true")
sparkConf.set("spark.yarn.am.extraJavaOptions", "-Dsun.security.krb5.debug=true -Dsun.security.spnego.debug=true")
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(Hdfs.principal, Hdfs.keytab)
UserGroupInformation.setLoginUser(ugi)
}
val sparkSession = SparkSession.builder.enableHiveSupport().config(sparkConf).getOrCreate()
sparkSession.sparkContext.setLogLevel("WARN")
sparkSession
}
获取HBase 连接和系统参数配置 & Bulkdata数据
val hBaseConf = HBaseConfiguration.create()
hBaseConf.set("hbase.zookeeper.quorum", quorum)
hBaseConf.set("hbase.zookeeper.property.clientPort", port.toString)
hBaseConf.set("zookeeper.znode.parent", znode)
hBaseConf.addResource(getClass.getClassLoader.getResourceAsStream("hdfs-site.xml"))
hBaseConf.addResource(getClass.getClassLoader.getResourceAsStream("core-site.xml"))
hBaseConf.addResource(getClass.getClassLoader.getResourceAsStream("hbase-site.xml"))
configuration.set("hadoop.security.authentication", "kerberos")
configuration.set("hbase.security.authentication", "kerberos")
configuration.set("hbase.security.authorization", "true")
configuration.set("hbase.master.kerberos.principal","hbase/_HOST@{KDC_REALM}")
configuration.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@{KDC_REALM}")
getHBaseAuthentication(configuration)
UserGroupInformation.setConfiguration(configuration)
val keyTab = SparkFiles.get(new File(Hdfs.keytab).getName)
UserGroupInformation.loginUserFromKeytab(Hdfs.principal, keyTab)
ConnectionFactory.createConnection(hBaseConf)
hbase表空间和表关于hdfs配置准备
hbase表空间和表
- void createNamespace(NamespaceDescriptor var1) throws IOException;
- void createTable(TableDescriptor var1, byte[][] var2) throws IOException;
- Future createTableAsync(TableDescriptor var1) throws IOException;
hbase中hdfs配置准备
val localPath = if (flag) HBase.hfilePath +
HBase.hfileName(pushMetaInfo.tblStatic.hiveTB) else HBase.namenode +
HBase.hfilePath + HBase.hfileName(pushMetaInfo.tblStatic.hiveTB)
val bulkPath = HBase.namenode + HBase.hfilePath + HBase.hfileName(pushMetaInfo.tblStatic.hiveTB)
关于建立hbase链接处理的配置参数认证并通过接口写入数据
conf.set("hadoop.proxyuser."+ Hdfs.principal +".hosts", "*")
conf.set("hadoop.proxyuser."+ Hdfs.principal +".groups", "*")
SparkUtils.deleteHdfsPath(fs, path)
conf.set("hadoop.security.authentication", UserGroupInformation.AuthenticationMethod.KERBEROS.name)
conf.addResource(getClass.getClassLoader.getResourceAsStream("hdfs-site.xml"))
conf.addResource(getClass.getClassLoader.getResourceAsStream("core-site.xml"))
conf.addResource(getClass.getClassLoader.getResourceAsStream("hbase-site.xml"))
UserGroupInformation.setConfiguration(conf)
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(loginUser, keytab)
ugi.doAs(new PrivilegedAction[Unit] {
override def run(): Unit = {
val job = Job.getInstance(conf)
job.setMapOutputKeyClass(classOf[ImmutableBytesWritable])
job.setMapOutputValueClass(classOf[KeyValue])
HFileOutputFormat2.configureIncrementalLoadMap(job, TableDescriptorBuilder.newBuilder(TableName.valueOf(tblName)).build())
data.saveAsNewAPIHadoopFile(path, classOf[ImmutableBytesWritable], classOf[KeyValue], classOf[HFileOutputFormat2], conf)
}
})
加载hbase数据到HDFS
初始化HDFS文件系统连接
val conf = new Configuration()
conf.clear()
conf.set("zookeeper.recovery.retry", "3")
conf.set("hbase.client.retries.number", "3")
conf.set("hbase.zookeeper.quorum", HBase.quorum)
conf.set("hbase.zookeeper.property.clientPort", HBase.zkPort.toString)
conf.set("hbase.rpc.timeout", "1800000")
conf.set("hbase.client.operation.timeout", "1800000")
conf.set("hbase.client.scanner.timeout.period", "1800000")
conf.set(TableOutputFormat.OUTPUT_TABLE, tblName)
conf.addResource(getClass.getClassLoader.getResourceAsStream("hdfs-site.xml"))
conf.addResource(getClass.getClassLoader.getResourceAsStream("core-site.xml"))
conf.addResource(getClass.getClassLoader.getResourceAsStream("hbase-site.xml"))
conf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "9999")
val bulkLoader = new LoadIncrementalHFiles(conf)
获取HBase连接
val hAdmin = HBaseUtils.createHBaseConnection(HBase.quorum, HBase.zkPort, HBase.znode).getAdmin
val conn = hAdmin.getConnection
获取HBase表信息
val hTable = conn.getTable(TableName.valueOf(tblName))
加载数据
// 使用hbase API
public Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, Admin admin, Table table, RegionLocator regionLocator) throws TableNotFoundException, IOException {
return this.doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
}
// 调用方法 hdfs文件系统句柄,hbase连接句柄,hbase表信息,获取指定表的 Region
bulkLoader.doBulkLoad(toPath, hAdmin, hTable, conn.getRegionLocator(TableName.valueOf(tblName)))
参考思路实例
您可以使用Apache Spark的HBase Connector将数据写入HBase。以下是一些步骤:
-
在Spark应用程序中添加HBase Connector的依赖项。
-
使用Spark SQL从数据源中读取数据。
-
将数据转换为HFile格式。
-
使用HBase的bulkload功能将HFile加载到HBase表中。
这里是一个示例代码片段,可以将数据写入HBase表中:
import org.apache.spark.sql.SparkSession
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Connection, ConnectionFactory}
import org.apache.hadoop.hbase.spark.HBaseContext
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
val spark = SparkSession.builder()
.appName("WriteToHBase")
.getOrCreate()
val hbaseConf = HBaseConfiguration.create()
val hbaseContext = new HBaseContext(spark.sparkContext, hbaseConf)
val data = spark.sql("SELECT * FROM source_table")
val hfile = data.rdd.map(row => {
// 将数据转换为HFile格式
// 例如:(Bytes.toBytes(row.getString(0)), Seq((Bytes.toBytes("cf1"), Bytes.toBytes("cq1"), Bytes.toBytes(row.getString(1))), (Bytes.toBytes("cf2"), Bytes.toBytes("cq2"), Bytes.toBytes(row.getString(2)))))
})
val hfilePath = "/path/to/hfile"
hfile.saveAsNewAPIHadoopFile(
hfilePath,
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Put],
classOf[org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2],
hbaseConf
)
val tableName = "hbase_table"
val connection = ConnectionFactory.createConnection(hbaseConf)
val table = connection.getTable(TableName.valueOf(tableName))
val regionLocator = connection.getRegionLocator(TableName.valueOf(tableName))
val bulkLoader = new LoadIncrementalHFiles(hbaseConf)
bulkLoader.doBulkLoad(new Path(hfilePath), table.asInstanceOf[HTable], regionLocator)
HDFS Bulk Load
HDFS Bulk Load 是一种将数据快速导入 HDFS 的方式,它可以通过 MapReduce 或 HBase 的 Bulk Load API 来实现。下面分别介绍这两种方式:
- MapReduce 方式
MapReduce 方式的 HDFS Bulk Load 通常使用 Hadoop 自带的工具 hadoop distcp
来实现。具体步骤如下:
- 将数据文件上传到 HDFS 的一个临时目录中。
- 编写一个 MapReduce,将数据从临时目录中读取并写入到目标 HDFS 目录中。
- 运行 MapReduce 程序,将数据导入到 HDFS 中。
- HBase Bulk Load 方式
HBase Bulk Load 方式是一种将数据快速导入 HBase 的方式,它可以通过 HBase 的 Bulk Load API 来实现。具体步骤如下:
- 将数据文件上传到 HDFS 的一个临时目录中。
- 使用 HBase 的 Bulk Load API 将数据从临时目录中导入到 HBase 中。
HBase Bulk Load 方式相对于 MapReduce 方式来说,具有更高的性能和更低的延迟。这是因为 HBase Bulk Load 可以直接将数据写入 HBase 的 HFile 中,而不需要经过 MapReduce 中间过程。
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。