您现在的位置是:首页 >学无止境 >spark的使用网站首页学无止境

spark的使用

你很棒滴 2024-09-13 00:01:04
简介spark的使用

国内源下载

https://mirrors.cloud.tencent.com/apache/spark/

环境配置(三台机器都要配置)

修改/etc/profile

export JAVA_HOME=/export/server/jdk
export HADOOP_HOME=/export/server/hadoop

export SPARK_HOME=/export/server/spark
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
export HADOOP_CONF_DIR=$HADDOP_HOME/etc/hadoop

修改~/.bashrc

export JAVA_HOME=/export/server/jdk
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python

修改spark-env.sh

JAVA_HOME=/export/server/jdk

## HADOOP软件配置文件目录,读取HDFS上文件和运行YARN集群
HADOOP_CONF_DIR=/export/server/hadoop/etc/hadoop
YARN_CONF_DIR=/export/server/hadoop/etc/hadoop
#配置worker的python环境,否则他会用系统自带的
export PYSPARK_PYTHON=/pythonenv/pyspark/bin/python
## 指定spark老大Master的IP和提交任务的通信端口
# 告知Spark的master运行在哪个机器上
export SPARK_MASTER_HOST=node1
# 告知sparkmaster的通讯端口
export SPARK_MASTER_PORT=7077
# 告知spark master的 webui端口
SPARK_MASTER_WEBUI_PORT=8080

# worker cpu可用核数
SPARK_WORKER_CORES=1
# worker可用内存
SPARK_WORKER_MEMORY=1g
# worker的工作通讯地址
SPARK_WORKER_PORT=7078
# worker的 webui地址
SPARK_WORKER_WEBUI_PORT=8081

## 设置历史服务器
# 配置的意思是  将spark程序运行的历史日志 存到hdfs的/sparklog文件夹中
SPARK_HISTORY_OPTS="-Dspark.history.fs.logDirectory=hdfs://node1:8020/sparklog/ -Dspark.history.fs.cleaner.enabled=true"

启动sparkonyarn

/export/server/spark/bin/pyspark --master yarn --deploy -mode client|culster

在这里插入图片描述

使用spark-submit提交py文件到yarn

#提交到yarn
/export/server/spark/bin/spark-submit --master yarn /sparkproject/00_example/helloword.py
#提交到本地运行
/export/server/spark/bin/spark-submit --master local[*] /sparkproject/00_example/helloword.py

在这里插入图片描述

RDD的五大特性

  • 分区性,rdd是可以增加缩减分区的
  • 通用性,每个rdd方法都会作用于每个分区
  • 血缘性,rdd1,rdd2…每个rdd是链式依赖
  • key,value数据的分区性
  • driver就近构建,driver的构建会尽量贴近数据,从而提高性能.

RDD的创建

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount')
    #2.生成sc对象
    sc=SparkContext(conf=conf)
    #读取一个文件
    word_file = sc.textFile('hdfs://node1:9001/input/words.txt')
    word_add = word_file.flatMap(lambda line:line.split(' '))
    word_with_one_rdd = word_add.map(lambda x:(x,1))
    result_rdd = word_with_one_rdd.reduceByKey(lambda a,b:a+b)
    print(result_rdd.collect())

wholeTextFiles 处理一个文件夹内包含多个小文件

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount')
    #2.生成sc对象
    sc=SparkContext(conf=conf)
    #读取一个文件夹,直接collect()会返回文件位置:文件内容的元祖形式,可通过map获取.
    rdd = sc.wholeTextFile('hdfs://node1:9001/input')
    print(rdd.map(lambda x:x[1]).collect())

rdd的算子

转换算子:只要返回结果是rdd的就是转换算子,是懒加载,只有执行执行算子的时候才会处理
执行算子: 返回的不是rdd就是执行算子

map算子

from pyspark import SparkConf,SparkContext

if __name__ == '__main__':
    # 1.通过sparkcof创建conf对象
    conf = SparkConf().setAppName('wordcount').setMaster('local[*]')
    #2.生成sc对象
    sc=SparkContext(conf=conf)

    rdd = sc.parallelize([1,2,3,4,5,6])
    def math_10(data):
        return data*10
    print(rdd.map(math_10).collect())
    print(rdd.map(lambda x:x*10).collect())

flatmap用法与map相同,限制性map算子,然后在接触数据嵌套
[(1,2,3),(4,5,6),(7,8,9)] ===>> [1,2,3,4,5,6,7,8,9]

reduceByKey

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.reduceByKey(lambda a,b:a+b).collect())
#[('a', 5), ('b', 3)]

mapValues

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.mapValues(lambda values:values*10).collect())
#[('a', 10), ('b', 10), ('b', 20), ('a', 20), ('a', 10), ('a', 10)]

groupBy

    rdd = sc.parallelize([('a',1),('b',1),('b',2),('a',2),('a',1),('a',1)])
    print(rdd.groupBy(lambda t:t[0]).collect())
#[('a', <pyspark.resultiterable.ResultIterable object at 0x7faa2b811370>), ('b', <pyspark.resultiterable.ResultIterable object at 0x7faa2b8113a0>)]

   result = rdd.groupBy(lambda x:x[0])
    print(result.map(lambda t:(t[0],list(t[1]))).collect())
    # [('a', [('a', 1), ('a', 2), ('a', 1), ('a', 1)]), ('b', [('b', 1), ('b', 2)])]

filter ====>rdd.filter(func) 传入参数返回值时bool类型,为true的留下,为false的过滤

    rdd = sc.parallelize([1,2,3,4,5,6,7,8])
    # print(rdd.groupBy(lambda t:t[0]).collect())
    print(rdd.filter(lambda x:x%2==1).collect())
# [1, 3, 5, 7]

groupByKey和reduceByKey的区别?
1.groupByKey只进行了分组后可以自定义聚合函数,reduceByKey内置聚合分组聚合.
2.是reduceByKey会在分组前在每个分区先进行聚合,被shuffle的数据可以极大地减少,然后在执行分组操作,然后在执行聚合.相较于groupByKey来讲:大量节省了磁盘的io操作,在数据量较大的情况下,优先使用reduceByKey.

mappartitions和foreachpartitions的区别?
1.相同点:他们两个都是对一整个分区的数据进行处理的
2.不同点,mappartitions是转换算子返回的是rdd,foreachpartitions是执行算子,由executor执行,返回值为none.

coalesce修改分区数量
两个参数第一个参数是要修改的数量值,第二个参数shuffle=true. 建议只减少分区,不增加分区,增加分区会产生shuffle.

rdd数据是过程数据:即每生成一个新的rdd,老的rdd就会被清理
如果数据再生成rdd3时还想使用rdd1,这时候就可以使用rdd的缓存机制,缓存机制是分散存储.
1.rdd.cache()
2.rdd.persist()
3.rdd.unpersist() 清除缓存
在这里插入图片描述

rdd缓存和CheckPoint的区别

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。