flume高可用集群 从kafka消费数据到hdfs

张映 发表于 2019-10-22

分类目录: hadoop/spark/scala

标签:, , ,

Flume NG是一个分布式,高可用,可靠的系统,它能将不同的海量数据收集,移动并存储到一个数据存储系统中。轻量,配置简单,适用于各种日志收集,并支持 Failover和负载均衡。并且它拥有非常丰富的组件。Flume NG采用的是三层架构:Agent层,Collector层和Store层,每一层均可水平拓展。其中Agent包含Source,Channel和 Sink,三者组建了一个Agent。三者的职责如下所示:

•Source:用来消费(收集)数据源到Channel组件中
•Channel:中转临时存储,保存所有Source组件信息
•Sink:从Channel中读取,读取成功后会删除Channel中的信息

一,服务器说明

10.0.40.175 testing
10.0.40.237 bigserver1
10.0.40.222 bigserver2
10.0.40.193 bigserver3

flume ng集群

flume ng集群

想要达到的目的,flume client和flume server,各宕机了一台,从kafka到hadoop的流程不断

怎么安装flume,参考:单flume消费kafka数据到hdfs

二,flume client配置

# vim /bigdata/flume/conf/flume-client.conf 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1
a1.sinkgroups = g1

# Describe/configure the source
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers = bigserver1:9092,bigserver2:9092,testing:9092
a1.sources.r1.kafka.consumer.group.id = flume_test
a1.sources.r1.kafka.topics = track_pc,track_app

# set sink1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = bigserver2
a1.sinks.k1.port = 52020

# set sink2
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = bigserver3
a1.sinks.k2.port = 52020

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000

#set sink group
a1.sinkgroups.g1.sinks = k1 k2
#set failover
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 10
a1.sinkgroups.g1.processor.priority.k2 = 1
a1.sinkgroups.g1.processor.maxpenalty = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1
a1.sinks.k1.channel = c1

二台客户端testing,bigserver1配置一样

三,服务端配置

# vim /bigdata/flume/conf/flume-server.conf  

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigserver2
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = bigserver2

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .data
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 12800000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.batchSize = 2000
a1.sinks.k1.hdfs.threadsPoolSize = 10

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在bigserver2上面的配置

# vim /bigdata/flume/conf/flume-server.conf 

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = bigserver3
a1.sources.r1.port = 52020
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = Collector
a1.sources.r1.interceptors.i1.value = bigserver3

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test
a1.sinks.k1.hdfs.useLocalTimeStamp = false
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.filePrefix = test
a1.sinks.k1.hdfs.fileSuffix = .data
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 12800000000
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.threadsPoolSize = 10
a1.sinks.k1.hdfs.batchSize = 2000
a1.sinks.k1.hdfs.threadsPoolSize = 10

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1500000
a1.channels.c1.transactionCapacity = 10000

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

在bigserver3上面的配置

四,启动flume ng集群,测试kafka到hdfs

1,先启动flume server

# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-server.conf --name a1 -Dflume.root.logger=INFO,console

2,再启动flume client

# flume-ng agent --conf conf --conf-file /bigdata/flume/conf/flume-client.conf --name a1 -Dflume.root.logger=INFO,console

都启动好后,

bigserver2部分显示

19/10/22 17:38:48 INFO node.Application: Starting Sink k1
19/10/22 17:38:48 INFO node.Application: Starting Source r1
19/10/22 17:38:48 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver2, port: 52020 }...
19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
19/10/22 17:38:48 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
19/10/22 17:38:48 INFO source.AvroSource: Avro source r1 started.
19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] OPEN
19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020
19/10/22 17:39:34 INFO ipc.NettyServer: [id: 0x4599e6e4, /10.0.40.237:49350 => /10.0.40.222:52020] CONNECTED: /10.0.40.237:49350
19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] OPEN
19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] BOUND: /10.0.40.222:52020
19/10/22 17:39:37 INFO ipc.NettyServer: [id: 0xced7db3f, /10.0.40.175:39086 => /10.0.40.222:52020] CONNECTED: /10.0.40.175:39086

bigserver3部分显示

19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: c1 started
19/10/22 05:38:57 INFO node.Application: Starting Sink k1
19/10/22 05:38:57 INFO node.Application: Starting Source r1
19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: k1 started
19/10/22 05:38:57 INFO source.AvroSource: Starting Avro source r1: { bindAddress: bigserver3, port: 52020 }...
19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
19/10/22 05:38:57 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: r1 started
19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started.
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266

如果出现上面的提示,说明集群配置成功了。

3,测试从kafka消费数据到hdfs

[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc
>1
>2
>3
>4
>

注意:在bigserver2上面会多出些示,

19/10/22 17:41:26 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 17:41:27 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp

[root@bigserver2 conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737286623.data.tmp
1
2
3
4

测试过程中,bigserver3没有这些提示,也没有产生文件。

五,测试服务端的高可用

1,关闭sinkgroups中,权重高的服务端,也就是bigserver2。关闭后,客户端有warn产生

19/10/22 17:44:31 WARN sink.FailoverSinkProcessor: Sink k1 failed and has been sent to failover list
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:398)
at org.apache.flume.sink.FailoverSinkProcessor.process(FailoverSinkProcessor.java:185)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)

2,测试kafka到hdfs

[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc
>1
>2
>3
>4
>a //关闭bigserver2后
>b
>c
>d
>e
>f
>

这个时候,bigserver3会产生文件,

19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp

[root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
a
b
c
d
e
f

服务端宕机一个,从kafka到hdfs,流程还是通的。

六,测试客户的高可用

刚才上面关掉的bigserver2,没有在启动了,也就是服务端就一个bigserver3.

1,关掉testing客户端后

另外一个客户端bigserver1会re join

19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] (Re-)joining group
19/10/22 17:51:57 INFO internals.AbstractCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Successfully joined group with generation 35
19/10/22 17:51:57 INFO internals.ConsumerCoordinator: [Consumer clientId=consumer-1, groupId=flume_test] Setting newly assigned partitions [track_pc-0, track_app-0, track_pc-1, track_pc-2]
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_app - partition 0 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 1 assigned.
19/10/22 17:51:57 INFO kafka.SourceRebalanceListener: topic track_pc - partition 2 assigned.

服务端bigserver3,提示

19/10/22 05:38:57 INFO source.AvroSource: Avro source r1 started.
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] OPEN
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:34 INFO ipc.NettyServer: [id: 0x57ed4ddf, /10.0.40.237:60570 => /10.0.40.193:52020] CONNECTED: /10.0.40.237:60570
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] OPEN
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] BOUND: /10.0.40.193:52020
19/10/22 05:39:37 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 => /10.0.40.193:52020] CONNECTED: /10.0.40.175:46266
19/10/22 05:44:34 INFO hdfs.HDFSDataStream: Serializer = TEXT, UseRawLocalFileSystem = false
19/10/22 05:44:34 INFO hdfs.BucketWriter: Creating hdfs://bigdata1/user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] DISCONNECTED
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] UNBOUND
19/10/22 05:51:54 INFO ipc.NettyServer: [id: 0x5c335501, /10.0.40.175:46266 :> /10.0.40.193:52020] CLOSED
19/10/22 05:51:54 INFO ipc.NettyServer: Connection to /10.0.40.175:46266 disconnected.

2,测试kafka到hdfs

[root@testing kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic track_pc
>1
>2
>3
>4
>5
>6
>7
>8
>9
>a //关闭bigserver2后
>b
>c
>d
>e
>f
>g //关闭testing后
>h
>i
>j
>

[root@testing conf]# hdfs dfs -cat /user/hive/warehouse/tanktest.db/tank_test/test.1571737474202.data.tmp
a
b
c
d
e
f
g
j
h
i

到这儿,flume ng集群安装配置完成,高可用测试完成。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2206.html

留下评论

留下评论
  • (必需)
  • (必需) (will not be published)
  • (必需)   2X3=?