flume高可用集群 从kafka消费数据到hdfs

张映 发表于 2019-10-22

分类目录: hadoop/spark/scala

标签:, , ,

Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡。并且它拥有非常丰富的组件。Flume NG采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和 Sink,三者组建了一个Agent。三者的职责如下所示:

•Source:用来消费(收集)数据源到Channel组件中
•Channel:中转临时存储,保存所有Source组件信息
•Sink:从Channel中读取,读取成功后会删除Channel中的信息

一,服务器说明

10.0.40.175 testing
10.0.40.237 bigserver1
10.0.40.222 bigserver2
10.0.40.193 bigserver3

flume ng集群

flume ng集群

想要达到的目的,flume client和flume server,各宕机了一台,从kafka到hadoop的流程不断

怎么安装flume,参考:单flume消费kafka数据到hdfs

二,flume client配置

  1. # vim /bigdata/flume/conf/flume-client.conf   
  2.   
  3. # Name the components on this agent  
  4. a1.sources = r1  
  5. a1.sinks = k1 k2  
  6. a1.channels = c1  
  7. a1.sinkgroups = g1  
  8.   
  9. # Describe/configure the source  
  10. a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource  
  11. a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092  
  12. a1.sources.r1.kafka.consumer.group.id = flume_test  
  13. a1.sources.r1.kafka.topics = track_pc,track_app  
  14.   
  15. # set sink1  
  16. a1.sinks.k1.type = avro  
  17. a1.sinks.k1.hostname = bigserver2  
  18. a1.sinks.k1.port = 52020  
  19.   
  20. # set sink2  
  21. a1.sinks.k2.type = avro  
  22. a1.sinks.k2.hostname = bigserver3  
  23. a1.sinks.k2.port = 52020  
  24.   
  25. # Use a channel which buffers events in memory  
  26. a1.channels.c1.type = memory  
  27. a1.channels.c1.capacity = 1500000  
  28. a1.channels.c1.transactionCapacity = 10000  
  29.   
  30. #set sink group  
  31. a1.sinkgroups.g1.sinks = k1 k2  
  32. #set failover  
  33. a1.sinkgroups.g1.processor.type = failover  
  34. a1.sinkgroups.g1.processor.priority.k1 = 10  
  35. a1.sinkgroups.g1.processor.priority.k2 = 1  
  36. a1.sinkgroups.g1.processor.maxpenalty = 10000  
  37.   
  38. # Bind the source and sink to the channel  
  39. a1.sources.r1.channels = c1  
  40. a1.sinks.k2.channel = c1  
  41. a1.sinks.k1.channel = c1  

二台客户端testing,bigserver1配置一样

三,服务端配置

  1. # vim /bigdata/flume/conf/flume-server.conf    
  2.   
  3. # Name the components on this agent  
  4. a1.sources = r1  
  5. a1.sinks = k1  
  6. a1.channels = c1  
  7.   
  8. # Describe/configure the source  
  9. a1.sources.r1.type = avro  
  10. a1.sources.r1.bind = bigserver2  
  11. a1.sources.r1.port = 52020  
  12. a1.sources.r1.interceptors = i1  
  13. a1.sources.r1.interceptors.i1.type = static  
  14. a1.sources.r1.interceptors.i1.key = Collector  
  15. a1.sources.r1.interceptors.i1.value = bigserver2  
  16.   
  17. # Describe the sink  
  18. a1.sinks.k1.type = hdfs  
  19. a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test  
  20. a1.sinks.k1.hdfs.useLocalTimeStamp = false  
  21. a1.sinks.k1.hdfs.writeFormat = Text  
  22. a1.sinks.k1.hdfs.filePrefix = test  
  23. a1.sinks.k1.hdfs.fileSuffix = .data  
  24. a1.sinks.k1.hdfs.fileType = DataStream  
  25. a1.sinks.k1.hdfs.rollInterval = 3600  
  26. a1.sinks.k1.hdfs.rollSize = 12800000000  
  27. a1.sinks.k1.hdfs.rollCount = 0  
  28. a1.sinks.k1.hdfs.threadsPoolSize = 10  
  29. a1.sinks.k1.hdfs.batchSize = 2000  
  30. a1.sinks.k1.hdfs.threadsPoolSize = 10  
  31.   
  32. # Use a channel which buffers events in memory  
  33. a1.channels.c1.type = memory  
  34. a1.channels.c1.capacity = 1500000  
  35. a1.channels.c1.transactionCapacity = 10000  
  36.   
  37. # Bind the source and sink to the channel  
  38. a1.sources.r1.channels = c1  
  39. a1.sinks.k1.channel = c1  

在bigserver2上面的配置

  1. # vim /bigdata/flume/conf/flume-server.conf   
  2.   
  3. # Name the components on this agent  
  4. a1.sources = r1  
  5. a1.sinks = k1  
  6. a1.channels = c1  
  7.   
  8. # Describe/configure the source  
  9. a1.sources.r1.type = avro  
  10. a1.sources.r1.bind = bigserver3  
  11. a1.sources.r1.port = 52020  
  12. a1.sources.r1.interceptors = i1  
  13. a1.sources.r1.interceptors.i1.type = static  
  14. a1.sources.r1.interceptors.i1.key = Collector  
  15. a1.sources.r1.interceptors.i1.value = bigserver3  
  16.   
  17. # Describe the sink  
  18. a1.sinks.k1.type = hdfs  
  19. a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test  
  20. a1.sinks.k1.hdfs.useLocalTimeStamp = false  
  21. a1.sinks.k1.hdfs.writeFormat = Text  
  22. a1.sinks.k1.hdfs.filePrefix = test  
  23. a1.sinks.k1.hdfs.fileSuffix = .data  
  24. a1.sinks.k1.hdfs.fileType = DataStream  
  25. a1.sinks.k1.hdfs.rollInterval = 3600  
  26. a1.sinks.k1.hdfs.rollSize = 12800000000  
  27. a1.sinks.k1.hdfs.rollCount = 0  
  28. a1.sinks.k1.hdfs.threadsPoolSize = 10  
  29. a1.sinks.k1.hdfs.batchSize = 2000  
  30. a1.sinks.k1.hdfs.threadsPoolSize = 10  
  31.   
  32. # Use a channel which buffers events in memory  
  33. a1.channels.c1.type = memory  
  34. a1.channels.c1.capacity = 1500000  
  35. a1.channels.c1.transactionCapacity = 10000  
  36.   
  37. # Bind the source and sink to the channel  
  38. a1.sources.r1.channels = c1  
  39. a1.sinks.k1.channel = c1  

在bigserver3上面的配置

四,启动flume ng集群,测试kafka到hdfs

1,先启动flume server

  1. # flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-server.conf --name a1 -Dflume.root.logger=INFO,console  

2,再启动flume client

  1. # flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-client.conf --name a1 -Dflume.root.logger=INFO,console  

都启动好后,

bigserver2部分显示

  1. 19/10/22 17:38:48 INFO node.Application: Starting Sink k1  
  2. 19/10/22 17:38:48 INFO node.Application: Starting Source r1  
  3. 19/10/22 17:38:48 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver2, port: 52020 }...  
  4. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.  
  5. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started  
  6. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.  
  7. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started  
  8. 19/10/22 17:38:48 INFO source.AvroSource: Avro source r1 started.  
  9. 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] OPEN  
  10. 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020  
  11. 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] CONNECTED: /10.0.40.237:49350  
  12. 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] OPEN  
  13. 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020  
  14. 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] CONNECTED: /10.0.40.175:39086  

bigserver3部分显示

  1. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started  
  2. 19/10/22 05:38:57 INFO node.Application: Starting Sink k1  
  3. 19/10/22 05:38:57 INFO node.Application: Starting Source r1  
  4. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.  
  5. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started  
  6. 19/10/22 05:38:57 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver3, port: 52020 }...  
  7. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.  
  8. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started  
  9. 19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started.  
  10. 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN  
  11. 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020  
  12. 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570  
  13. 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN  
  14. 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020  
  15. 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266  

如果出现上面的提示,说明集群配置成功了。

3,测试从kafka消费数据到hdfs

  1. [root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc  
  2. >1  
  3. >2  
  4. >3  
  5. >4  
  6. >  

注意:在bigserver2上面会多出些示,

19/10/22 17:41:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 17:41:27 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp

  1. [root@bigserver2 conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp  
  2. 1  
  3. 2  
  4. 3  
  5. 4  

测试过程中,bigserver3没有这些提示,也没有产生文件。

五,测试服务端的高可用

1,关闭sinkgroups中,权重高的服务端,也就是bigserver2。关闭后,客户端有warn产生

19/10/22 17:44:31 WARN sink.FailoverSinkProcessor: Sink k1 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)

2,测试kafka到hdfs

  1. [root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc  
  2. >1  
  3. >2  
  4. >3  
  5. >4  
  6. >a //关闭bigserver2后  
  7. >b  
  8. >c  
  9. >d  
  10. >e  
  11. >f  
  12. >  

这个时候,bigserver3会产生文件,

19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp

  1. [root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp  
  2. a  
  3. b  
  4. c  
  5. d  
  6. e  
  7. f  

服务端宕机一个,从kafka到hdfs,流程还是通的。

六,测试客户的高可用

刚才上面关掉的bigserver2,没有在启动了,也就是服务端就一个bigserver3.

1,关掉testing客户端后

另外一个客户端bigserver1会re join

19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group
19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 35
19/10/22 17:51:57 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.

服务端bigserver3,提示

19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started.
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266
19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] DISCONNECTED
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] UNBOUND
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] CLOSED
19/10/22 05:51:54 INFO ipc.NettyServer: Connection to /10.0.40.175:46266 disconnected.

2,测试kafka到hdfs

  1. [root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc  
  2. >1  
  3. >2  
  4. >3  
  5. >4  
  6. >5  
  7. >6  
  8. >7  
  9. >8  
  10. >9  
  11. >a //关闭bigserver2后  
  12. >b  
  13. >c  
  14. >d  
  15. >e  
  16. >f  
  17. >g //关闭testing后  
  18. >h  
  19. >i  
  20. >j  
  21. >  
  22.   
  23. [root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp  
  24. a  
  25. b  
  26. c  
  27. d  
  28. e  
  29. f  
  30. g  
  31. j  
  32. h  
  33. i  

到这儿,flume ng集群安装配置完成,高可用测试完成。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2206.html