zookeeper kafka 集群 安装配置

张映 发表于 2019-01-28

分类目录: hadoop/spark/scala

标签:, , ,

kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。

1,zookeeper集群配置

关于zookeeper的集群安装配置,请参考:zookeeper 集群安装配置

2,kafka下载

http://mirrors.hust.edu.cn/apache/kafka/2.1.0/

3,配置kafka

# tar -zxvf kafka_2.12-2.1.0.tgz
# cp -r kafka_2.12-2.1.0 /bigdata/kafka
# mkdir /bigdata/kafka/data
# cd /bigdata/kafka/config
# vim server.properties

broker.id=0     //不同的机器,要不一样,不然启动时会报错
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://bigserver1:9092   //本机的hostname:9092
zookeeper.connect=bigserver1:2181,bigserver2:2181,testing:2181   //zookeeper集群

配置好后,scp到其他zookeeper的机器。

4,配置zookeeper的自动启动程序

# cat /usr/lib/systemd/system/zookeeper.service
[Unit]
Description=zookeeper
After=syslog.target network.target

[Service]
Type=forking
User=root
Group=root
ExecStart=/bigdata/zookeeper/bin/zkServer.sh start
ExecStop=/bigdata/zookeeper/bin/zkServer.sh stop
Restart=on-failure

[Install]
WantedBy=multi-user.target

# systemctl start zookeeper

5,kafka自启动配置

# cat /usr/lib/systemd/system/kafka.service
[Unit]
Description=kafka
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
User=root
Group=root
ExecStart=/bigdata/kafka/bin/kafka-server-start.sh /bigdata/kafka/config/server.properties
ExecStop=/bigdata/kafka/bin/kafka-server-stop.sh

[Install]
WantedBy=multi-user.target

# systemctl start kafka

6,检查是否启动成功

# jps
6199 Kafka
3370 QuorumPeerMain
6586 Jps

7,测试kafka集群

7.1,创建topic

# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 1 --partitions 1 --topic test

在什么节点创建都行

7.2,查看topic列表

# bin/kafka-topics.sh --list --zookeeper bigserver1:2181,bigserver2:2181,testing:2181
__consumer_offsets
test

7.2,查看topic

# bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
 Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0

7.3,删除topic

# bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper bigserver1:2181,bigserver2:2181,testing:2181

7.4,启动Producer生产消息

# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic test

7.5,启动Consumer接收消息

# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic test --from-beginning

注意:

kafka2.11后,没有--zookeeper,没有--zookeeper,没有--zookeeper

端口是9092,不是2181;端口是9092,不是2181;端口是9092,不是2181;看下图

kafka 接收消息
kafka 接收消息

可以在多个机器上面启动kafka-console-consumer,都是可以接收的。

[root@bigserver1 config]# jps   //又发又收
6199 Kafka
3370 QuorumPeerMain
6586 Jps
5916 ConsoleConsumer
5630 ConsoleProducer

root@testing:/bigdata/kafka/data# jps   //只收
15193 Kafka
13275 QuorumPeerMain
15661 ConsoleConsumer
16142 Jps


转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2047.html