您现在的位置是:首页 >学无止境 >Flume 实时日志收集系统网站首页学无止境
Flume 实时日志收集系统
案例一: NetCat Source:监听一个指定的网络端口,即只要应用程序向这个端口里面写数据,这个source组件就可以获取到信息。
然后用telnet协议来通过端口传递信息
flume官网中NetCat Source描述:
Property Name Default Description
channels –
type – 组件的类型
bind – 日志需要发送到的主机名或者Ip地址,该主机运行着netcat类型的source在监听
port – 日志需要发送到的端口号,该端口号要有netcat类型的source在监听
配置文件:netcat.conf
指定Agent的组件名称(a),一个进程
a.sources=r1
a.channels=c1
a.sinks=k1
//数据收集的类型为netcat
//监控 节点 master 端口
//将source和channel联合在一起
a.sources.r1.type=netcat
a.sources.r1.bind=master
a.sources.r1.port=8888
a.sources.r1.channels=c1
//数据缓冲区的类型是memery存在内存里file存在磁盘
//数据缓冲区的容量大小 , 能力大小
a.channels.c1.type=memory
a.channels.c1.capacity=1000
a.channels.c1.transactionCapacity=1000
//将sink和channel串起来,确定输出类型
a.sinks.k1.channel=c1
a.sinks.k1.type=logger
启动flume agent a 服务端:-n表示:上面的a -c 配置文件 -f文件路径 -Dflume.root.logger=DEBUG,console 设置控制台打印
flume-ng agent -n a -c $FLUME_HOME/conf -f /root/flume/netcat.conf -Dflume.root.logger=DEBUG,console
案例二 :NetCat Source:监听一个指定的端口,即只要应用程序向这个端口里面写数据,
这个source组件就可以获取到信息。 并将文件写入到hdfs
Name the components on this agent
a.sources = r1
a.sinks = k1
a.channels = c1
Describe/configure the source
a.sources.r1.type = netcat
a.sources.r1.bind = node1
a.sources.r1.port = 8888
Describe the sink
a.sinks.k1.type = hdfs
#指定hdfs地址中的输出目录 类型 文件的前缀是时间戳
a.sinks.k1.hdfs.path = hdfs://master:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true
Use a channel which buffers events in file 输出日志的文件
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/flume/checkpoint
a.channels.c1.dataDirs = /usr/flume/data
#将sink、source和channel串起来,确定输出类型
Bind the source and sink to the channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
flume-ng agent -n a -c $FLUME_HOME/conf -f /test/flume/flumetest/flume-hdfs.properties -Dflume.root.logger=DEBUG,console
案例3:Spooling Directory Source:监听一个指定的目录,即只要应用程序向这个指定的目录中添加新的文件,
source组件就可以获取到该信息,并解析该文件的内容,然后写入到channle。写入完成后,标记该文件已完成或者删除该文件。
其中 Sink:logger Channel:memory 将监控记录写入到hdfs
a.sources = r1
a.sinks = k1
a.channels = c1
#Describe/configure the source
a.sources.r1.type = spooldir
#指定监控本节点目录
a.sources.r1.spoolDir = /usr/flume/flumejk
a.sources.r1.fileHeader = true
a.sources.r1.interceptors = i1
a.sources.r1.interceptors.i1.type = timestamp
Describe the sink
a.sinks.k1.type = hdfs
#写入到hdfs路径下
a.sinks.k1.hdfs.path = hdfs://master:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
#写入格式以时间开头
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true
Use a channel which buffers events in memory
a.channels.c1.type = memory
a.channels.c1.capacity = 1000
a.channels.c1.transactionCapacity = 100
Bind the source and sink to the channel
a.sources.r1.channels = c1
a.sinks.k1.channel = c1
flume-ng agent -n a -c $FLUME_HOME/conf -f /test/flume/flumetest/flume-nect.properties -Dflume.root.logger=DEBUG,console
案例五,接收json格式数据
配置文件:flume-http.properties
a.sources=r1
a.channels=c1
a.sinks=k1
#Describe/configure the source
a.sources.r1.type = http
a.sources.r1.port = 8888
a.source.r1.bind = node1
#Describe the sink
a.sinks.k1.type= hdfs
#指定hdfs地址中的输出目录
a.sinks.k1.hdfs.path = hdfs://master:9000/output
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.rollInterval = 10
a.sinks.k1.hdfs.rollSize = 0
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.filePrefix = %Y-%m-%d-%H-%M-%S
a.sinks.k1.hdfs.useLocalTimeStamp = true
#Use a channel which buffers events in file
a.channels.c1.type = file
a.channels.c1.checkpointDir = /usr/flume/checkpoint
a.channels.c1.dataDirs = /usr/flume/data
#Bind the source and sink to the channel
a.sources.r1.channels=c1
a.sinks.k1.channel=c1
flume-ng agent -n a -c $FLUME_HOME/conf -f /test/flume/flumetest/flume-http.properties -Dflume.root.logger=DEBUG,console
#同这个使用http协议 端口传数据 消息头,消息体
curl -X POST -d ‘[{ “headers” :{“a” : “a1”,“b” : “b1”},“body” : “hello~flume”}]’ http://node1:8888
收集所有节点的日志文件,集中到一台节点上,对每台节点都装flume agent 关联 先启动集中端节点
#集中端配置
a.sources = s1
a.channels = c1
a.sinks = k1
define the source
a.sources.s1.type = avro
a.sources.s1.bind=node1
a.sources.s1.port=8888
#define the channel
a.channels.c1.type = memory
a.channels.c1.capacity=1000
a.channels.c1.transactionCapacity=1000
define the sink
a.sinks.k1.type = hdfs
#指定hdfs地址中的输出目录
a.sinks.k1.hdfs.path = hdfs://master:9000/students/%Y-%m-%d-%H
a.sinks.k1.hdfs.writeFormat = Text
a.sinks.k1.hdfs.fileType = DataStream
a.sinks.k1.hdfs.filePrefix = flumeHdfs
a.sinks.k1.hdfs.batchSize = 100000
a.sinks.k1.hdfs.rollSize = 10240
a.sinks.k1.hdfs.rollCount = 0
a.sinks.k1.hdfs.rollInterval = 1
a.sinks.k1.hdfs.useLocalTimeStamp = true
zuhe
a.sources.s1.channels = c1
a.sinks.k1.channel = c1
#收集日志端
a1.sources = s1
a1.channels = c1
a1.sinks = k1
define the source
a1.sources.s1.type = exec
#收集监控日志命令 tail -F 每写入一个文件就输出一个
a1.sources.s1.command = tail -F /test/hbase/hbase-0.98.12.1-hadoop2/logs/hbase-root-regionserver-node1.log
a1.sources.s1.shell = /bin/sh -c
#define the channel
a1.channels.c1.type = memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=1000
a1.sinks.k1.type = avro
a1.sinks.k1.hostname =node1
a1.sinks.k1.port = 8888
zuhe
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1
————————————————————————————————————————————
sqoop 将关系型数据库 oracle mysql 等 与Hadoop数据进行转换的工具
mysqltohive
import
–connect
jdbc:mysql://master:3306/sqoop_test
–username
root
–password
123456
–table
employee
–fields-terminated-by
“ ”
–lines-terminated-by
“
”
–hive-import
–target-dir
/user/hive/warehouse/employ
–hive-table
employ
create table employ(id string,name string);
不要先创建数据 将上面装成文件夹然后s qoop --options-file xxx.opt 执行 -m 启动几个任务
mysqltohbase
import
–connect
jdbc:mysql://master/sqoop_test
–username
root
–password
123456
–table
employee
–hbase-table
employee
–hbase-create-table
–hbase-row-key
id
–split-by
date
-m
1
–column-family
cf1
mysqltohbasehdfs
import
–connect
jdbc:mysql://node1:3306/sqoop_test
–username
root
–password
123456
–table
employee
–m
1
–target-dir
/sqoop/employee
–fields-terminated-by
‘ ’
hdfstomysql
export
–connect
jdbc:mysql://node1/sqoop_test
–username
root
–password
123456
-m
1
–columns
id,name
–export-dir
/user/hive/warehouse/employee
–fields-terminated-by
‘ ’
–table
employee