从kafka消费数据到hdfs,或者hbase,是最常见的二种方式。
1,配置文件
# vim /bigdata/flume/conf/kafka2hbase.conf #各组件命名 a1.sources = r1 a1.sinks = k1 a1.channels = c1 # 指定数据源 a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource # kafka地址 a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092 # 组ID a1.sources.r1.kafka.consumer.group.id = flume_test #topic多个逗号隔开 a1.sources.r1.kafka.topics = track_pc,track_app # 指定hbase为数据存储 a1.sinks.k1.type = asynchbase #空间名:表名 a1.sinks.k1.table = test_ns:flume #列族名 a1.sinks.k1.columnFamily = data a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer # channel类型 a1.channels.c1.type = memory # channel存储的事件容量 a1.channels.c1.capacity = 1500000 # 事务容量 a1.channels.c1.transactionCapacity = 10000 # 绑定source和sink到channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
2,创建hbase表
hbase(main):001:0> create 'test_ns:flume','data' 0 row(s) in 2.5900 seconds => Hbase::Table - test_ns:flume
3,启动flume
# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/kafka2hbase.conf --name a1 -Dflume.root.logger=INFO,console 。。。。。。。。。。。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。。。。。。。。。。。。 19/10/21 13:35:10 INFO async.HBaseClient: Connecting to .META. region @ 10.0.40.193:16020 19/10/21 13:35:10 INFO zookeeper.ZooKeeper: Session: 0x102ff2108b00002 closed 19/10/21 13:35:10 INFO zookeeper.ClientCnxn: EventThread shut down 19/10/21 13:35:10 INFO async.HBaseClient: Added client for region RegionInfo(table="flume", region_name="test_ns:flume,,1571635406437.ea5aca55a43e98e022016497794c25a5.", stop_key=""), which was added to the regions cache. Now we know that RegionClient@350743955(chan=[id: 0x67edf410, /10.0.40.175:58864 => /10.0.40.175:16020], #pending_rpcs=0, #batched=0, #rpcs_inflight=0) is hosting 1 region. 19/10/21 13:35:10 INFO clients.Metadata: Cluster ID: J68hgrgcQ9-s58FUEIpDLQ 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Discovered group coordinator bigserver1:9092 (id: 2147483645 rack: null) 19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Revoking previously assigned partitions [] 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 15 19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2] 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned. 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned. 19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: table found 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned. 19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: callback received 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.
4,启动kafka生产消息
[root@bigserver1 conf]# cd /bigdata/kafka [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc >22222222222 55555555555 4444444444444 33333333333333
5,通过hbase shell查看
hbase(main):004:0> scan 'test_ns:flume' ROW COLUMN+CELL default04e56672-6dbb-498a-89de-a6ec4e65feae column=data:pCol, timestamp=1571636111694, value=22222222222 default295f9649-6f38-4f3f-b079-9e05d8553017 column=data:pCol, timestamp=1571636111694, value=55555555555 default302f0b8f-5d72-4eb7-a332-01e5e0f65226 column=data:pCol, timestamp=1571636111694, value=4444444444444 default832eb01c-7700-4074-b4c9-db65df9096ba column=data:pCol, timestamp=1571636111694, value=33333333333333 incRow column=data:iCol, timestamp=1571636111714, value=\x00\x00\x00\x00\x00\x00\x00\x04 5 row(s) in 0.0120 seconds
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2205.html