单flume消费kafka数据到hbase

张映 发表于 2019-10-21

分类目录: hadoop/spark/scala

标签:, ,

从kafka消费数据到hdfs,或者hbase,是最常见的二种方式。

1,配置文件

# vim /bigdata/flume/conf/kafka2hbase.conf

#各组件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 指定数据源
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
# kafka地址
a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092
# 组ID
a1.sources.r1.kafka.consumer.group.id = flume_test
#topic多个逗号隔开
a1.sources.r1.kafka.topics = track_pc,track_app

# 指定hbase为数据存储
a1.sinks.k1.type = asynchbase
#空间名:表名
a1.sinks.k1.table = test_ns:flume
#列族名
a1.sinks.k1.columnFamily = data
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

# channel类型
a1.channels.c1.type = memory
# channel存储的事件容量
a1.channels.c1.capacity = 1500000
# 事务容量
a1.channels.c1.transactionCapacity = 10000

# 绑定source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2,创建hbase表

hbase(main):001:0> create 'test_ns:flume','data'
0 row(s) in 2.5900 seconds

=> Hbase::Table - test_ns:flume

3,启动flume

# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/kafka2hbase.conf --name a1 -Dflume.root.logger=INFO,console

。。。。。。。。。。。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。。。。。。。。。。。。

19/10/21 13:35:10 INFO async.HBaseClient: Connecting to .META. region @ 10.0.40.193:16020
19/10/21 13:35:10 INFO zookeeper.ZooKeeper: Session: 0x102ff2108b00002 closed
19/10/21 13:35:10 INFO zookeeper.ClientCnxn: EventThread shut down
19/10/21 13:35:10 INFO async.HBaseClient: Added client for region RegionInfo(table="flume", region_name="test_ns:flume,,1571635406437.ea5aca55a43e98e022016497794c25a5.", stop_key=""), which was added to the regions cache. Now we know that RegionClient@350743955(chan=[id: 0x67edf410, /10.0.40.175:58864 => /10.0.40.175:16020], #pending_rpcs=0, #batched=0, #rpcs_inflight=0) is hosting 1 region.
19/10/21 13:35:10 INFO clients.Metadata: Cluster ID: J68hgrgcQ9-s58FUEIpDLQ
19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Discovered group coordinator bigserver1:9092 (id: 2147483645 rack: null)
19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Revoking previously assigned partitions []
19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group
19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 15
19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]
19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.
19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.
19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: table found
19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.
19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: callback received
19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.

4,启动kafka生产消息

[root@bigserver1 conf]# cd /bigdata/kafka
[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc
>22222222222
55555555555
4444444444444
33333333333333

5,通过hbase shell查看

hbase(main):004:0> scan 'test_ns:flume'
ROW COLUMN+CELL
 default04e56672-6dbb-498a-89de-a6ec4e65feae column=data:pCol, timestamp=1571636111694, value=22222222222
 default295f9649-6f38-4f3f-b079-9e05d8553017 column=data:pCol, timestamp=1571636111694, value=55555555555
 default302f0b8f-5d72-4eb7-a332-01e5e0f65226 column=data:pCol, timestamp=1571636111694, value=4444444444444
 default832eb01c-7700-4074-b4c9-db65df9096ba column=data:pCol, timestamp=1571636111694, value=33333333333333
 incRow column=data:iCol, timestamp=1571636111714, value=\x00\x00\x00\x00\x00\x00\x00\x04
5 row(s) in 0.0120 seconds


转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2205.html

留下评论

留下评论
  • (必需)
  • (必需) (will not be published)
  • (必需)   6X8=?