从kafka消费数据到hdfs,目前主要是通过程序的方式来实现的,在这在这个过程中做了简单的数据处理。
其实通过flume就可以实现数据从kafka到hdfs
一,下载flume
# wget https://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz # tar zxvf apache-flume-1.9.0-bin.tar.gz # mv apache-flume-1.9.0-bin /bigdata/flume
二,设置环境变量
# vim ~/.bashrc //添加以下内容 export FLUME_HOME=/bigdata/flume export PATH=$FLUME_HOME/bin:$PATH # source ~/.bashrc
三,flume读取kafka配置
1,配置文件说明
[root@testing ~]# ll /bigdata/flume/conf/ 总用量 16 -rw-r--r-- 1 tank_1 tank_1 1661 11月 16 2017 flume-conf.properties.template //启动时的配置文件 -rw-r--r-- 1 tank_1 tank_1 1455 11月 16 2017 flume-env.ps1.template //启动时的环境配置文件 -rw-r--r-- 1 tank_1 tank_1 1568 8月 30 2018 flume-env.sh.template //启动时的环境配置文件 -rw-rw-r-- 1 tank_1 tank_1 3107 12月 10 2018 log4j.properties //日志配置文件
2,启动时的环境变量文件
# cp flume-env.sh.template flume-env.sh && vim flume-env.sh #内存配置 export JAVA_OPTS="-Xms1024m -Xmx1024m -Xss256m -Xmn512m -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-UseGCOverheadLimit" #日志配置 export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true " #flume的目录 FLUME_CLASSPATH="/bigdata/flume"
java home的环境变量,配置过就没有在这里配置了。
flume 启动时的最大堆内存比较小的,为了避免java.lang.OutOfMemoryError: GC overhead limit exceeded 或者:java.lang.OutOfMemoryError: Java heap space,要在flume-env.sh加大
3,启动时的环境变量
# vim /bigdata/flume/conf/kafka2hdfs.conf #各组件命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 指定数据源 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # kafka地址 a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092 # 组ID a1.sources.r1.kafka.consumer.group.id = flume_test #topic多个逗号隔开 a1.sources.r1.kafka.topics = track_pc,track_app # 参数过多,下面单独说明 a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = test a1.sinks.k1.hdfs.fileSuffix = .data a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 12800000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.batchSize = 2000 a1.sinks.k1.hdfs.threadsPoolSize = 10 # channel类型 a1.channels.c1.type = memory # channel存储的事件容量 a1.channels.c1.capacity = 1500000 # 事务容量 a1.channels.c1.transactionCapacity = 10000 # 绑定source和sink到channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
channel的type为memory,有以下问题
Flume的HDFS sink在数据写入/读出Channel时,都有Transcation的保证。当Transaction失败时,会回滚,然后重试。但由于HDFS不可修改文件的内容,假设有1万行数据要写入HDFS,而在写入5000行时,网络出现问题导致写入失败,Transaction回滚,然后重写这10000条记录成功,就会导致第一次写入的5000行重复。这些问题是 HDFS 文件系统设计上的特性缺陷,并不能通过简单的Bugfix来解决。我们只能关闭批量写入,单条事务保证,或者启用监控策略,两端对数。
Memory和exec的方式可能会有数据丢失,file 是 end to end 的可靠性保证的,但是性能较前两者要差。要根据实际情况来选择类型。
end to end、store on failure 方式 ACK 确认时间设置过短(特别是高峰时间)也有可能引发数据的重复写入。
Flume中的HDFS Sink参数如下:
channel
type
hdfs
path
写入hdfs的路径,需要包含文件系统标识,比如:hdfs://namenode/flume/webdata/
可以使用flume提供的日期及%{host}表达式。
filePrefix
默认值:FlumeData
写入hdfs的文件名前缀,可以使用flume提供的日期及%{host}表达式。
fileSuffix
写入hdfs的文件名后缀,比如:.lzo .log等。
inUsePrefix
临时文件的文件名前缀,hdfs sink会先往目标目录中写临时文件,再根据相关规则重命名成最终目标文件;
inUseSuffix
默认值:.tmp
临时文件的文件名后缀。
rollInterval
默认值:30
hdfs sink间隔多长将临时文件滚动成最终目标文件,单位:秒;
如果设置成0,则表示不根据时间来滚动文件;
注:滚动(roll)指的是,hdfs sink将临时文件重命名成最终目标文件,并新打开一个临时文件来写入数据;
rollSize
默认值:1024
当临时文件达到该大小(单位:bytes)时,滚动成目标文件;
如果设置成0,则表示不根据临时文件大小来滚动文件;
rollCount
默认值:10
当events数据达到该数量时候,将临时文件滚动成目标文件;
如果设置成0,则表示不根据events数据来滚动文件;
idleTimeout
默认值:0
当目前被打开的临时文件在该参数指定的时间(秒)内,没有任何数据写入,则将该临时文件关闭并重命名成目标文件;
batchSize
默认值:100
每个批次刷新到HDFS上的events数量;
codeC
文件压缩格式,包括:gzip, bzip2, lzo, lzop, snappy
fileType
默认值:SequenceFile
文件格式,包括:SequenceFile, DataStream,CompressedStream
当使用DataStream时候,文件不会被压缩,不需要设置hdfs.codeC;
当使用CompressedStream时候,必须设置一个正确的hdfs.codeC值;
maxOpenFiles
默认值:5000
最大允许打开的HDFS文件数,当打开的文件数达到该值,最早打开的文件将会被关闭;
minBlockReplicas
默认值:HDFS副本数
写入HDFS文件块的最小副本数。
该参数会影响文件的滚动配置,一般将该参数配置成1,才可以按照配置正确滚动文件。
writeFormat
写sequence文件的格式。包含:Text, Writable(默认)
callTimeout
默认值:10000
执行HDFS操作的超时时间(单位:毫秒);
threadsPoolSize
默认值:10
hdfs sink启动的操作HDFS的线程数。
rollTimerPoolSize
默认值:1
hdfs sink启动的根据时间滚动文件的线程数。
kerberosPrincipal
HDFS安全认证kerberos配置;
kerberosKeytab
HDFS安全认证kerberos配置;
proxyUser
代理用户
round
默认值:false
是否启用时间上的”舍弃”,这里的”舍弃”,类似于”四舍五入”,后面再介绍。如果启用,则会影响除了%t的其他所有时间表达式;
roundValue
默认值:1
时间上进行“舍弃”的值;
roundUnit
默认值:seconds
时间上进行”舍弃”的单位,包含:second,minute,hour
timeZone
默认值:Local Time
时区。
useLocalTimeStamp
默认值:flase
是否使用当地时间。
closeTries
默认值:0
hdfs sink关闭文件的尝试次数;
如果设置为1,当一次关闭文件失败后,hdfs sink将不会再次尝试关闭文件,这个未关闭的文件将会一直留在那,并且是打开状态。
设置为0,当一次关闭失败后,hdfs sink会继续尝试下一次关闭,直到成功。
retryInterval
默认值:180(秒)
hdfs sink尝试关闭文件的时间间隔,如果设置为0,表示不尝试,相当于于将hdfs.closeTries设置成1.
serializer
默认值:TEXT
序列化类型。其他还有:avro_event或者是实现了EventSerializer.Builder的类名。
4,启动flume消费kafka
# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/kafka2hdfs.conf --name a1 -Dflume.root.logger=INFO,console 。。。。。。。。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。。。。。。。。。 19/10/18 14:26:29 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Revoking previously assigned partitions [] 19/10/18 14:26:29 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group 19/10/18 14:26:29 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 1 19/10/18 14:26:29 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2] 19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned. 19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned. 19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned. 19/10/18 14:26:29 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned. 19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-0 to offset 23. 19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-2 to offset 0. 19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_app-0 to offset 58417. 19/10/18 14:26:29 INFO internals.Fetcher: [Consumer clientId=consumer-1, groupId=flume_test] Resetting offset for partition track_pc-1 to offset 22. 19/10/18 14:38:18 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false 19/10/18 14:38:19 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp //消费数据才会产生
5,启动Producer生产消息,flume就会消费
[root@bigserver1 conf]# cd /bigdata/kafka [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_app >22222222222 33333333333333 4444444444444 55555555555 # hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp 22222222222 33333333333333 4444444444444 55555555555
只有消费产生时,hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571380698990.data.tmp,这个文件才会生成
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2204.html
我试了,为什么hdfs就是没有消息呢?我也尝试先使用logger这种sink接收,但是在agent-ng的窗口也没有消息。。会是什么原因呢?