单flume消费kafka数据到hdfs

张映 发表于 2019-10-18

分类目录: hadoop/spark/scala

标签:, ,

从kafka消费数据到hdfs,目前主要是通过程序的方式来实现的,在这在这个过程中做了简单的数据处理。

其实通过flume就可以实现数据从kafka到hdfs

 一,下载flume

# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz
# tar zxvf apache-flume-1.9.0-bin.tar.gz
# mv apache-flume-1.9.0-bin /bigdata/flume

二,设置环境变量

# vim ~/.bashrc //添加以下内容

export FLUME_HOME=/bigdata/flume
export PATH=$FLUME_HOME/bin:$PATH

# source ~/.bashrc

三,flume读取kafka配置

1,配置文件说明

[root@testing ~]# ll /bigdata/flume/conf/
总用量 16
-rw-r--r-- 1 tank_1 tank_1 1661 11月 16 2017 flume-conf.properties.template  //启动时的配置文件 
-rw-r--r-- 1 tank_1 tank_1 1455 11月 16 2017 flume-env.ps1.template  //启动时的环境配置文件
-rw-r--r-- 1 tank_1 tank_1 1568 8月 30 2018 flume-env.sh.template  //启动时的环境配置文件
-rw-rw-r-- 1 tank_1 tank_1 3107 12月 10 2018 log4j.properties  //日志配置文件

2,启动时的环境变量文件

# cp flume-env.sh.template flume-env.sh && vim flume-env.sh

#内存配置
export JAVA_OPTS="-Xms1024m -Xmx1024m -Xss256m -Xmn512m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit"
#日志配置
export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "

#flume的目录
FLUME_CLASSPATH="/bigdata/flume"

java home的环境变量,配置过就没有在这里配置了。

flume 启动时的最大堆内存比较小的,为了避免java.lang.OutOfMemoryError: GC overhead limit exceeded 或者:java.lang.OutOfMemoryError: Java heap space,要在flume-env.sh加大

3,启动时的环境变量

# vim /bigdata/flume/conf/kafka2hdfs.conf

#各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定数据源
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# kafka地址
a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092
# 组ID
a1.sources.r1.kafka.consumer.group.id = flume_test
#topic多个逗号隔开
a1.sources.r1.kafka.topics = track_pc,track_app

# 参数过多,下面单独说明
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .data
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 12800000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.batchSize = 2000
a1.sinks.k1.hdfs.threadsPoolSize = 10

# channel类型
a1.channels.c1.type = memory
# channel存储的事件容量
a1.channels.c1.capacity = 1500000
# 事务容量
a1.channels.c1.transactionCapacity = 10000

# 绑定source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

channel的type为memory,有以下问题

Flume的HDFS sink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,然后重试。但由于HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题导致写入失败,Transaction回滚,然后重写这10000条记录成功,就会导致第一次写入的5000行重复。这些问题是 HDFS 文件系统设计上的特性缺陷,并不能通过简单的Bugfix来解决。我们只能关闭批量写入,单条事务保证,或者启用监控策略,两端对数。

Memory和exec的方式可能会有数据丢失,file 是 end to end 的可靠性保证的,但是性能较前两者要差。要根据实际情况来选择类型。

end to end、store on failure 方式 ACK 确认时间设置过短(特别是高峰时间)也有可能引发数据的重复写入。

Flume中的HDFS Sink参数如下:

channel

type

hdfs

path

写入hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/

可以使用flume提供的日期及%{host}表达式。

filePrefix

默认值:FlumeData

写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。

fileSuffix

写入hdfs的文件名后缀,比如:.lzo .log等。

inUsePrefix

临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;

inUseSuffix

默认值:.tmp

临时文件的文件名后缀。

rollInterval

默认值:30

hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;

如果设置成0,则表示不根据时间来滚动文件;

注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;

rollSize

默认值:1024

当临时文件达到该大小(单位:bytes)时,滚动成目标文件;

如果设置成0,则表示不根据临时文件大小来滚动文件;

rollCount

默认值:10

当events数据达到该数量时候,将临时文件滚动成目标文件;

如果设置成0,则表示不根据events数据来滚动文件;

idleTimeout

默认值:0
当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;

batchSize

默认值:100

每个批次刷新到HDFS上的events数量;

codeC

文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy

fileType

默认值:SequenceFile

文件格式,包括:SequenceFile, DataStream,CompressedStream

当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;

当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;

maxOpenFiles

默认值:5000

最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;

minBlockReplicas

默认值:HDFS副本数

写入HDFS文件块的最小副本数。

该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。

writeFormat

写sequence文件的格式。包含:Text, Writable(默认)

callTimeout

默认值:10000

执行HDFS操作的超时时间(单位:毫秒);

threadsPoolSize

默认值:10

hdfs sink启动的操作HDFS的线程数。

rollTimerPoolSize

默认值:1

hdfs sink启动的根据时间滚动文件的线程数。

kerberosPrincipal

HDFS安全认证kerberos配置;

kerberosKeytab

HDFS安全认证kerberos配置;

proxyUser

代理用户

round

默认值:false

是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;

roundValue

默认值:1

时间上进行“舍弃”的值;

roundUnit

默认值:seconds

时间上进行”舍弃”的单位,包含:second,minute,hour

timeZone

默认值:Local Time

时区。

useLocalTimeStamp

默认值:flase

是否使用当地时间。

closeTries

默认值:0

hdfs sink关闭文件的尝试次数;

如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。

设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。

retryInterval

默认值:180(秒)

hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.

serializer

默认值:TEXT

序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。

4,启动flume消费kafka

# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/kafka2hdfs.conf --name a1 -Dflume.root.logger=INFO,console
。。。。。。。。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。。。。。。。。。
19/10/18 14:26:29 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Revoking previously assigned partitions []
19/10/18 14:26:29 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group
19/10/18 14:26:29 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 1
19/10/18 14:26:29 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]
19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.
19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.
19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.
19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.
19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-0 to offset 23.
19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-2 to offset 0.
19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_app-0 to offset 58417.
19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-1 to offset 22.
19/10/18 14:38:18 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/18 14:38:19 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp //消费数据才会产生

5,启动Producer生产消息,flume就会消费

[root@bigserver1 conf]# cd /bigdata/kafka
[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_app
>22222222222
33333333333333
4444444444444
55555555555

# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp
22222222222
33333333333333
4444444444444
55555555555

只有消费产生时,hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp,这个文件才会生成



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2204.html

1 条评论

  1. lcx 留言

    我试了,为什么hdfs就是没有消息呢?我也尝试先使用logger这种sink接收,但是在agent-ng的窗口也没有消息。。会是什么原因呢?