单flume消费kafka数据到hbase

张映 发表于 2019-10-21

分类目录: hadoop/spark/scala

标签:, ,

从kafka消费数据到hdfs,或者hbase,是最常见的二种方式。

1,配置文件

  1. # vim /bigdata/flume/conf/kafka2hbase.conf  
  2.   
  3. #各组件命名  
  4. a1.sources = r1  
  5. a1.sinks = k1  
  6. a1.channels = c1  
  7.   
  8. # 指定数据源  
  9. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource  
  10. # kafka地址  
  11. a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092  
  12. # 组ID  
  13. a1.sources.r1.kafka.consumer.group.id = flume_test  
  14. #topic多个逗号隔开  
  15. a1.sources.r1.kafka.topics = track_pc,track_app  
  16.   
  17. # 指定hbase为数据存储  
  18. a1.sinks.k1.type = asynchbase  
  19. #空间名:表名  
  20. a1.sinks.k1.table = test_ns:flume  
  21. #列族名  
  22. a1.sinks.k1.columnFamily = data  
  23. a1.sinks.k1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer  
  24.   
  25. # channel类型  
  26. a1.channels.c1.type = memory  
  27. # channel存储的事件容量  
  28. a1.channels.c1.capacity = 1500000  
  29. # 事务容量  
  30. a1.channels.c1.transactionCapacity = 10000  
  31.   
  32. # 绑定source和sink到channel  
  33. a1.sources.r1.channels = c1  
  34. a1.sinks.k1.channel = c1  

2,创建hbase表

  1. hbase(main):001:0> create 'test_ns:flume','data'  
  2. 0 row(s) in 2.5900 seconds  
  3.   
  4. => Hbase::Table - test_ns:flume  

3,启动flume

  1. # flume-ng agent --conf conf --conf-file /bigdata/flume/conf/kafka2hbase.conf --name a1 -Dflume.root.logger=INFO,console  
  2.   
  3. 。。。。。。。。。。。。。。。。。。。。。。。。。。省略。。。。。。。。。。。。。。。。。。。。。。。。。。。。  
  4.   
  5. 19/10/21 13:35:10 INFO async.HBaseClient: Connecting to .META. region @ 10.0.40.193:16020  
  6. 19/10/21 13:35:10 INFO zookeeper.ZooKeeper: Session: 0x102ff2108b00002 closed  
  7. 19/10/21 13:35:10 INFO zookeeper.ClientCnxn: EventThread shut down  
  8. 19/10/21 13:35:10 INFO async.HBaseClient: Added client for region RegionInfo(table="flume", region_name="test_ns:flume,,1571635406437.ea5aca55a43e98e022016497794c25a5.", stop_key=""), which was added to the regions cache. Now we know that RegionClient@350743955(chan=[id: 0x67edf410, /10.0.40.175:58864 => /10.0.40.175:16020], #pending_rpcs=0, #batched=0, #rpcs_inflight=0) is hosting 1 region.  
  9. 19/10/21 13:35:10 INFO clients.Metadata: Cluster ID: J68hgrgcQ9-s58FUEIpDLQ  
  10. 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Discovered group coordinator bigserver1:9092 (id: 2147483645 rack: null)  
  11. 19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Revoking previously assigned partitions []  
  12. 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group  
  13. 19/10/21 13:35:10 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 15  
  14. 19/10/21 13:35:10 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]  
  15. 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.  
  16. 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.  
  17. 19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: table found  
  18. 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.  
  19. 19/10/21 13:35:10 INFO hbase.AsyncHBaseSink: callback received  
  20. 19/10/21 13:35:10 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.  

4,启动kafka生产消息

  1. [root@bigserver1 conf]# cd /bigdata/kafka  
  2. [root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc  
  3. >22222222222  
  4. 55555555555  
  5. 4444444444444  
  6. 33333333333333  

5,通过hbase shell查看

  1. hbase(main):004:0> scan 'test_ns:flume'  
  2. ROW COLUMN+CELL  
  3.  default04e56672-6dbb-498a-89de-a6ec4e65feae column=data:pCol, timestamp=1571636111694, value=22222222222  
  4.  default295f9649-6f38-4f3f-b079-9e05d8553017 column=data:pCol, timestamp=1571636111694, value=55555555555  
  5.  default302f0b8f-5d72-4eb7-a332-01e5e0f65226 column=data:pCol, timestamp=1571636111694, value=4444444444444  
  6.  default832eb01c-7700-4074-b4c9-db65df9096ba column=data:pCol, timestamp=1571636111694, value=33333333333333  
  7.  incRow column=data:iCol, timestamp=1571636111714, value=\x00\x00\x00\x00\x00\x00\x00\x04  
  8. 5 row(s) in 0.0120 seconds  


转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2205.html