您现在的位置是:首页 >学无止境 >Flink+hadoop部署及Demo网站首页学无止境

Flink+hadoop部署及Demo

Persistence___ 2024-06-26 14:23:25
简介Flink+hadoop部署及Demo

Hadoop集群高可用部署

下载hadoop包地址

https://dlcdn.apache.org/hadoop/common/hadoop-3.2.4/hadoop-3.2.4.tar.gz
上传并解压到3台服务器
配置3台主机的hosts和免密登录
在这里插入图片描述

1.修改.bash_profile

vi .bash_profile
# HADOOP_HOME
export HADOOP_HOME=/apps/svr/hadoop-3.2.4
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

source .bash_profile
hadoop version查看hadoop下载版本
在这里插入图片描述

架构如下:

hadoop01hadoop02hadoop03
HDFSNameNode,DataNodeDataNodeNameNode,DataNode
YARNResouceManager,NodeManagerNodeManagerResouceManager,NodeManager

NameNode 通过rpc高可用
ResouceManager通过zk高可用
主备目录

在集群各节点创建目录

mkdir -p /apps/svr/hadoop-3.2.4/tmp
mkdir -p /apps/svr/hadoop-3.2.4/dfs/name
mkdir -p /apps/svr/hadoop-3.2.4/dfs/data
mkdir -p /apps/svr/hadoop-3.2.4/journaldata

2.配置core-site.xml

        <property>
         <name>fs.defaultFS</name>
         <value>hdfs://mycluster/</value>
        </property>
        
        <!-- 指定hadoop工作目录 -->
        <property>
          <name>hadoop.tmp.dir</name>
          <value>/apps/svr/hadoop-3.2.4/tmp</value>
        </property>
      
        <!-- 指定zookeeper集群访问地址 -->
        <property>
          <name>ha.zookeeper.quorum</name>
          <value>10.251.75.112:2181,10.251.75.113:2181,10.251.75.114:2181</value>
        </property>
      
        <!-- 配置为了解决以后其他组件连接HDFS集群  -->
        <property>
          <name>hadoop.proxyuser.bigdata.hosts</name>
          <value>*</value>
        </property>
      
        <property>
          <name>hadoop.proxyuser.bigdata.groups</name>
          <value>*</value>
        </property>

3.修改配置文件hdfs-site.xml

<!-- NameNode 存放的位置 -->
  <property>
        <name>dfs.namenode.name.dir</name>
        <value>/apps/svr/hadoop-3.2.4/dfs/name</value>
    </property>
  <!-- DataNode 存放的位置 -->
    <property>
        <name>dfs.datanode.data.dir</name>
        <value>/apps/svr/hadoop-3.2.4/dfs/data</value>
    </property>
<!-- 指定HDFS的nameservices为mycluster,需要跟core-site.xml中保持一致 -->
  <property>

    <name>dfs.replication</name>

    <value>2</value>

  </property>
<!-- 指定HDFS的nameservices为mycluster,需要跟core-site.xml中保持一致 -->
  <property>
          <name>dfs.nameservices</name>
          <value>mycluster</value>
  </property>
      
        <!-- 设置mycluster集群有两个namenode, 分别为nn1,nn2 -->
  <property>
          <name>dfs.ha.namenodes.mycluster</name>
          <value>nn1,nn2</value>
  </property>
      
        <!-- 配置nn1 的RPC通信地址 -->
   <property>
          <name>dfs.namenode.rpc-address.mycluster.nn1</name>
          <value>10.251.75.112:9000</value>
   </property>
        
        <!-- 配置nn1的http通信地址 -->
   <property>
          <name>dfs.namenode.http-address.mycluster.nn1</name>
          <value>10.251.75.112:50070</value>
   </property>
 
 <!-- 配置nn2 的RPC通信地址 -->
   <property>
          <name>dfs.namenode.rpc-address.mycluster.nn2</name>
          <value>10.251.75.114:9000</value>
   </property>
      
        <!-- 配置nn2的http通信地址 -->
   <property>
          <name>dfs.namenode.http-address.mycluster.nn2</name>
          <value>10.251.75.114:50070</value>
   </property>
      
        <!-- 指定JournalNode 在本地磁盘存放数据的位置 -->
   <property>
          <name>dfs.journalnode.edits.dir</name>
          <value>/apps/svr/hadoop-3.2.4/journaldata</value>
   </property>
      
        <!-- 指定NameNode的edits元数据在journalNode上的服务器 -->
   <property>
          <name>dfs.namenode.shared.edits.dir</name>
          <value>qjournal://10.251.75.112:8485;10.251.75.113:8485;10.251.75.114:8485/mycluster</value>
   </property>
 
 <!-- 开启NameNode 自动切换 -->
   <property>
          <name>dfs.ha.automatic-failover.enabled</name>
          <value>true</value>
   </property>
      
        <!-- 配置nameNode失败自动切换的实现方式 -->
   <property>
          <name>dfs.client.failover.proxy.provider.mycluster</name>
          <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
   </property>
      
        <!-- 配置隔离机制方法 -->
   <property>
          <name>dfs.ha.fencing.methods</name>
          <value>
            sshfence
            shell(/bin/true)
          </value>
   </property>
      
        <!-- 使用sshfence隔离机制时需要ssh免密登陆 -->
   <property>
          <name>dfs.ha.fencing.ssh.private-key-files</name>
          <value>/apps/.ssh/id_rsa</value>
   </property>
 
  <!-- 配置sshfence隔离机制超时时间 -->
   <property>
          <name>dfs.ha.fencing.ssh.connect-timeout</name>
          <value>30000</value>
   </property>
     
   <property>
         <name>dfs.webhdfs.enabled</name>
         <value>true</value>
   </property>

4.配置mapred-site.xml

    <!-- 指定mapreduce运算时资源调度为 yarn 模式 -->
       <property>
          <name>mapreduce.framework.name</name>
         <value>yarn</value>
        </property>
      
        <!-- 配置mapreduce历史服务器地址 端口号 -->
        <property>
          <name>mapreduce.jobhistory.address</name>
          <value>10.251.75.112:10020</value>
        </property>
      
        <!-- 配置mapreduce历史服务器WEB访问地址 -->
        <property>
          <name>mapreduce.jobhistory.webapp.address</name>
          <value>10.251.75.112:19888</value>
        </property>

5.配置yarn-site.xml

<!-- Site specific YARN configuration properties -->
 
 <!-- 开启高可用 -->
  <property>
    <name>yarn.resourcemanager.ha.enabled</name>
    <value>true</value>
  </property>
 
  <!-- 指定ResourceManager的标识:yrc -->
  <property>
    <name>yarn.resourcemanager.cluster-id</name>
    <value>yrc</value>
  </property>
 
  <!-- 指定RM的名字-->
  <property>
    <name>yarn.resourcemanager.ha.rm-ids</name>
    <value>rm1,rm2</value>
  </property>
 
 <!-- 指定rm1服务器 -->
  <property>
    <name>yarn.resourcemanager.hostname.rm1</name>
    <value>10.251.75.112</value>
  </property>
 
  <!-- 指定rm2服务器 -->
  <property>
    <name>yarn.resourcemanager.hostname.rm2</name>
    <value>10.251.75.114</value>
  </property>
 
  <!-- 指定rm 被管理的zk 地址 -->
  <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>10.251.75.112:2181,10.251.75.113:2181,10.251.75.114:2181</value>
  </property>
 
<!-- 运行mapreduce任务需要使用的服务 -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>
 
  <!-- 开启yarn集群的日志聚合功能 -->
  <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
  </property>
 
  <!-- 设置日志保存时间 -->
  <property>
    <name>yarn.log-aggregation.retain-seconds</name>
    <value>86400</value>
  </property>
 
  <!-- 启动rm自动恢复功能 -->
  <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
  </property>
 
 <!-- 制定rm 状态信息存储在zookeeper集群上 -->
  <property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
  </property>
  <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <property>
          <name>yarn.nodemanager.pmem-check-enabled</name>
          <value>false</value>
  </property>
 <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
  <property>
          <name>yarn.nodemanager.vmem-check-enabled</name>
          <value>false</value>
  </property>

6.修改文件workers文件

10.251.75.112
10.251.75.113
10.251.75.114

7.修改hadoop-env.sh

配置JAVA_HOME和主机上的jdk环境变量一样

6.将core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml传到另外2台主机

scp core-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hdfs-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp mapred-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp yarn-site.xml apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp workers apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hadoop-env.sh apps@10.251.75.113:/apps/svr/hadoop-3.2.4/etc/hadoop

scp core-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hdfs-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp mapred-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp yarn-site.xml apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp workers apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop
scp hadoop-env.sh apps@10.251.75.114:/apps/svr/hadoop-3.2.4/etc/hadoop

启动集群各个节点监控NameNode的管理日志的JournalNode
在各节点执行
hdfs --daemon start journalnode
在这里插入图片描述
在node01上格式化namenode
hdfs namenode -format
在node1上启动namenode
hdfs --daemon start namenode
在这里插入图片描述
同步nn2 及node03上同步nn1及node01的源数据
hdfs namenode -bootstrapStandby
在node1节点上格式化ZKFC
hdfs zkfc -formatZK

node1节点上启动HDFS和Yarn
start-dfs.sh
在这里插入图片描述
查看zk,namenode在node03上
在这里插入图片描述

查看hdfs管理页面

http://10.251.75.112:50070/
http://10.251.75.114:50070/
在这里插入图片描述
在这里插入图片描述

start-yarn.sh
进程在node01和node03上
在这里插入图片描述

查看HA zk节点数据 resourceMaster在rm2上
在这里插入图片描述
启动后查看yarn页面 node03页面
http://10.251.75.114:8088/cluster
在这里插入图片描述

Flink部署

下载包地址

flink下载地址
https://www.apache.org/dyn/closer.lua/flink/flink-1.16.1/flink-1.16.1-bin-scala_2.12.tgz

节点服务器hadoop01hadoop02hadoop03
角色JobManager

解压后
配置source

export FLINK_HOME=/apps/svr/flink-1.16.1
export PATH=export PATH=$FLINK_HOME/bin:$PATH

source .bash_profile

Flink Per-Job模式

Per-Job 模式是指每个Flink Job都是一组独立集群,即有自己的JobManager和TaskManager。提交任务后,YARN首先会为该任务分派一个AM容器,该容器内会运行一个JobManager进程,之后JobManager会向Yarn申请运行TaskManager所需要的container,container的数量和container的配置(CPU、内存)会基于客户端的需求来确定,当JobManager和TaskManager进程都拉起来之后,则会执行相应的Flink Job。这样,与Standalone以及yarn-session不同的是,我们不需要准备一个常驻的Flink 集群进程,只需要保证本地有一个Flink环境即可,Flink集群是在我们提交Job时动态创建出来的。
这种方式的优势在于每个任务独立运行,相互不会收到干扰,这个不干扰还包括了运行日志也是隔离的。另外,借助YARN集群的能力,提供了对Flink Job的全方位保障,包括JobManager的高可用,TaskManager的恢复,然后再结合Flink自身提供的健壮性,包括检查点、保存点机制,从而能够很好的保障Flink Job的高可用和健壮性。劣势的话,就是保证了资源隔离的同时也占用了更多的资源,因为每个Job都需要一个JobManager,每个JobManager都会消耗一个AM进程资源。

-yn,--container <arg> 表示分配容器的数量,也就是 TaskManager 的数量。
-d,--detached:设置在后台运行。
-yjm,--jobManagerMemory<arg>:设置 JobManager 的内存,单位是 MB。
-ytm,--taskManagerMemory<arg>:设置每个 TaskManager 的内存,单位是 MB。
-ynm,--name:给当前 Flink application 在 Yarn 上指定名称。
-yq,--query:显示 yarn 中可用的资源(内存、cpu 核数)
-yqu,--queue<arg> :指定 yarn 资源队列
-ys,--slots<arg> :每个 TaskManager 使用的 Slot 数量。
-yz,--zookeeperNamespace<arg>:针对 HA 模式在 Zookeeper 上创建 NameSpace
-yid,--applicationID<yarnAppId> : 指定 Yarn 集群上的任务 ID,附着到一个后台独 立运行的 Yarn Session 中。

使用flink run -m yarn-cluster --help 可查看可用命令

在这里插入图片描述

Demo01

在flink上执行命令跑demo
flink run -m yarn-cluster -t yarn-per-job -yjm 1024 -ytm 1024 /apps/svr/flink-1.16.1/examples/streaming/WordCount.jar
任务执行完成并且
在这里插入图片描述
通过yarn页面查看到任务已完成,并且hdfs上有记录
在这里插入图片描述
在这里插入图片描述

Demo02

登录10.251.75.112 nc -lk 9999开启服务端端口监听
nc -kl 9999
在这里插入图片描述
flink run -m yarn-cluster -t yarn-per-job -yjm 1024 -ytm 1024 /apps/svr/flink-1.16.1/examples/streaming/SocketWindowWordCount.jar --hostname 10.251.75.112 --port 9999
在这里插入图片描述
打开flink集群管理页面
在这里插入图片描述
在服务端nc窗口输入hello word即可看到flink管理页面有对应输出
在这里插入图片描述

在flink taskmanager即可看到对应输出
在这里插入图片描述

将任务关闭

查看管理页面
在这里插入图片描述
在这里插入图片描述
yarn application -kill application_1684908112422_0008
在这里插入图片描述
查看管理页面任务已被关闭
在这里插入图片描述

风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。