Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡。并且它拥有非常丰富的组件。Flume NG采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和 Sink,三者组建了一个Agent。三者的职责如下所示:
•Source:用来消费(收集)数据源到Channel组件中
•Channel:中转临时存储,保存所有Source组件信息
•Sink:从Channel中读取,读取成功后会删除Channel中的信息
一,服务器说明
10.0.40.175 testing
10.0.40.237 bigserver1
10.0.40.222 bigserver2
10.0.40.193 bigserver3
想要达到的目的,flume client和flume server,各宕机了一台,从kafka到hadoop的流程不断
怎么安装flume,参考:单flume消费kafka数据到hdfs
二,flume client配置
# vim /bigdata/flume/conf/flume-client.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 k2 a1.channels = c1 a1.sinkgroups = g1 # Describe/configure the source a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092 a1.sources.r1.kafka.consumer.group.id = flume_test a1.sources.r1.kafka.topics = track_pc,track_app # set sink1 a1.sinks.k1.type = avro a1.sinks.k1.hostname = bigserver2 a1.sinks.k1.port = 52020 # set sink2 a1.sinks.k2.type = avro a1.sinks.k2.hostname = bigserver3 a1.sinks.k2.port = 52020 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 #set sink group a1.sinkgroups.g1.sinks = k1 k2 #set failover a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 10 a1.sinkgroups.g1.processor.priority.k2 = 1 a1.sinkgroups.g1.processor.maxpenalty = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k2.channel = c1 a1.sinks.k1.channel = c1
二台客户端testing,bigserver1配置一样
三,服务端配置
# vim /bigdata/flume/conf/flume-server.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = bigserver2 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = bigserver2 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = test a1.sinks.k1.hdfs.fileSuffix = .data a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 12800000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.batchSize = 2000 a1.sinks.k1.hdfs.threadsPoolSize = 10 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在bigserver2上面的配置
# vim /bigdata/flume/conf/flume-server.conf # Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = avro a1.sources.r1.bind = bigserver3 a1.sources.r1.port = 52020 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = static a1.sources.r1.interceptors.i1.key = Collector a1.sources.r1.interceptors.i1.value = bigserver3 # Describe the sink a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test a1.sinks.k1.hdfs.useLocalTimeStamp = false a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.filePrefix = test a1.sinks.k1.hdfs.fileSuffix = .data a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 12800000000 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.threadsPoolSize = 10 a1.sinks.k1.hdfs.batchSize = 2000 a1.sinks.k1.hdfs.threadsPoolSize = 10 # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1500000 a1.channels.c1.transactionCapacity = 10000 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
在bigserver3上面的配置
四,启动flume ng集群,测试kafka到hdfs
1,先启动flume server
# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-server.conf --name a1 -Dflume.root.logger=INFO,console
2,再启动flume client
# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-client.conf --name a1 -Dflume.root.logger=INFO,console
都启动好后,
bigserver2部分显示
19/10/22 17:38:48 INFO node.Application: Starting Sink k1 19/10/22 17:38:48 INFO node.Application: Starting Source r1 19/10/22 17:38:48 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver2, port: 52020 }... 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 19/10/22 17:38:48 INFO source.AvroSource: Avro source r1 started. 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] OPEN 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020 19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] CONNECTED: /10.0.40.237:49350 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] OPEN 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020 19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] CONNECTED: /10.0.40.175:39086
bigserver3部分显示
19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started 19/10/22 05:38:57 INFO node.Application: Starting Sink k1 19/10/22 05:38:57 INFO node.Application: Starting Source r1 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started 19/10/22 05:38:57 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver3, port: 52020 }... 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean. 19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started 19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started. 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020 19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020 19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266
如果出现上面的提示,说明集群配置成功了。
3,测试从kafka消费数据到hdfs
[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc >1 >2 >3 >4 >
注意:在bigserver2上面会多出些示,
19/10/22 17:41:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 17:41:27 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp
[root@bigserver2 conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp 1 2 3 4
测试过程中,bigserver3没有这些提示,也没有产生文件。
五,测试服务端的高可用
1,关闭sinkgroups中,权重高的服务端,也就是bigserver2。关闭后,客户端有warn产生
19/10/22 17:44:31 WARN sink.FailoverSinkProcessor: Sink k1 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
2,测试kafka到hdfs
[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc >1 >2 >3 >4 >a //关闭bigserver2后 >b >c >d >e >f >
这个时候,bigserver3会产生文件,
19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
[root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp a b c d e f
服务端宕机一个,从kafka到hdfs,流程还是通的。
六,测试客户的高可用
刚才上面关掉的bigserver2,没有在启动了,也就是服务端就一个bigserver3.
1,关掉testing客户端后
另外一个客户端bigserver1会re join
19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group
19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 35
19/10/22 17:51:57 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.
服务端bigserver3,提示
19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started.
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266
19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] DISCONNECTED
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] UNBOUND
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] CLOSED
19/10/22 05:51:54 INFO ipc.NettyServer: Connection to /10.0.40.175:46266 disconnected.
2,测试kafka到hdfs
[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc >1 >2 >3 >4 >5 >6 >7 >8 >9 >a //关闭bigserver2后 >b >c >d >e >f >g //关闭testing后 >h >i >j > [root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp a b c d e f g j h i
到这儿,flume ng集群安装配置完成,高可用测试完成。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2206.html