您现在的位置是:首页 >技术交流 >Spark 离线开发框架设计与实现网站首页技术交流
Spark 离线开发框架设计与实现
一、背景
随着 Spark 以及其社区的不断发展,Spark 本身技术也在不断成熟,Spark 在技术架构和性能上的优势越来越明显,目前大多数公司在大数据处理中都倾向使用 Spark。Spark 支持多种语言的开发,如 Scala、Java、Sql、Python 等。
Spark SQL 使用标准的数据连接,与 Hive 兼容,易与其它语言 API 整合,表达清晰、简单易上手、学习成本低,是开发者开发简单数据处理的首选语言,但对于复杂的数据处理、数据分析的开发,使用 SQL 开发显得力不从心,维护成本也非常高,使用高级语言处理会更高效。
在日常的数据仓库开发工作中,我们除了开发工作外,也涉及大量的数据回溯任务。对于创新型业务来说,口径变化频繁、业务迅速迭代,数据仓库的回溯非常常见,通过回溯几个月甚至一年是非常普遍的,但传统的回溯任务方式效率极低,而且需要人力密切关注各任务状态。
针对目前现状,我们开发了一套 Spark 离线开发框架,如下表所示,我们例举了目前存在的问题及解决方案。框架的实现不仅让开发变得简单高效,而且对于数据的回溯工作在不需要任何开发的情况下,快速高效地完成大量的回溯工作。
二、框架设计
框架旨在封装重复的工作,让开发变得简单。框架如图 2-1 所示,主要分为三个部分,基础框架、可扩展工具及应用程序,开发者只需关注应用程序即可简单快速实现代码开发。
2.1 基础框架
基础框架中,我们对于所有类型的应用实现代码与配置分离机制,资源配置统一以 XML 文件形式保存并由框架解析处理。框架会根据开发者配置的任务使用资源大小,完成了 SparkSession、SparkContext、SparkConf 的创建,同时加载了常用环境变量,开发了通用的 UDF 函数(如常用的 url 参数解析等)。其中 Application 为所有应用的父类,处理流程如图所示,开发者只需编写关注绿色部分即可。
目前,离线框架所支持的常用环境变量如下表所示。
2.2 可扩展工具
可扩展工具中包含了大量的工具类,服务于应用程序及基础框架,常用有,配置文件解析类,如解析任务资源参数等;数据库工具类,用于读写数据库;日期工具类,用于日期加减、转换、识别并解析环境变量等。服务于应用程序的通用工具模块可统称为可扩展工具,这里不再赘述。
2.3 应用程序
2.3.1 SQL 应用
对于 SQL 应用,只需要创建 SQL 代码及资源配置即可,应用类为唯一类(已实现),有且只有一个,供所有 SQL 应用使用,开发者无需关心。如下配置所示,class 为所有应用的唯一类名,开发者要关心的是 path 中的 sql 代码及 conf 中该 sql 所使用的资源大小。
<?xml version="1.0" encoding="UTF-8"?>
<project name="test">
<class>com.way.app.instance.SqlExecutor</class>
<path>sql文件路径</path>
<!-- sparksession conf -->
<conf>
<spark.executor.memory>1G</spark.executor.memory>
<spark.executor.cores>2</spark.executor.cores>
<spark.driver.memory>1G</spark.driver.memory>
<spark.executor.instances>20</spark.executor.instances>
</conf>
</project>
2.3.2 Java 应用
对于复杂的数据处理,SQL 代码不能满足需求时,我们也支持 Java 程序的编写,与 SQL 不同的是,开发者需要创建新的应用类,继承 Application 父类并实现 run 方法即可,run 方法中开发者只需要关注数据的处理逻辑,对于通用的 SparkSession、SparkContext 等创建及关闭无需关注,框架还帮助开发者封装了代码的输入、输出逻辑,对于输入类型,框架支持 HDFS 文件输入、SQL 输入等多种输入类型,开发者只需调用相关处理函数即可。
如下为一个简单的 Java 数据处理应用,从配置文件可以看出,仍需配置资源大小,但与 SQL 不同的是,开发者需要定制化编写对应的 Java 类(class 参数),以及应用的输入(input 参数)和输出参数(output 参数),此应用中输入为 SQL 代码,输出为 HDFS 文件。从 Test 类实现可以看出,开发者只需三步走:获取输入数据、逻辑处理、结果输出,即可完成代码编写。
<?xml version="1.0" encoding="UTF-8"?>
<project name="ecommerce_dwd_hanwuji_click_incr_day_domain">
<class>com.way.app.instance.ecommerce.Test</class>
<input>
<type>table</type>
<sql>select
clk_url,
clk_num
from test_table
where event_day='{DATE}'
and click_pv > 0
and is_ubs_spam=0
</sql>
</input>
<output>
<type>afs_kp</type>
<path>test/event_day={DATE}</path>
</output>
<conf>
<spark.executor.memory>2G</spark.executor.memory>
<spark.executor.cores>2</spark.executor.cores>
<spark.driver.memory>2G</spark.driver.memory>
<spark.executor.instances>10</spark.executor.instances>
</conf>
</project>
package com.way.app.instance.ecommerce;import com.way.app.Application;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.sql.Row;import java.util.Map;import org.apache.spark.api.java.function.FilterFunction;import org.apache.spark.sql.Dataset;public class Test extends Application { @Override public void run() { // 输入 Map<String, String> input = (Map<String, String>) property.get("input"); Dataset<Row> ds = sparkSession.sql(getInput(input)).toDF("url", "num"); // 逻辑处理(简单的筛选出url带有部分站点的日志) JavaRDD<String> outRdd = ds.filter((FilterFunction<Row>) row -> { String url = row.getAs("url").toString(); return url.contains(".jd.com") || url.contains(".suning.com") || url.contains("pin.suning.com") || url.contains(".taobao.com") || url.contains("detail.tmall.hk") || url.contains(".amazon.cn") || url.contains(".kongfz.com") || url.contains(".gome.com.cn") || url.contains(".kaola.com") || url.contains(".dangdang.com") || url.contains("aisite.wejianzhan.com") || url.contains("w.weipaitang.com"); }) .toJavaRDD() .map(row -> row.mkString("