kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
1,zookeeper集群配置
关于zookeeper的集群安装配置,请参考:zookeeper 集群安装配置。
2,kafka下载
http://mirrors.hust.edu.cn/apache/kafka/2.1.0/
3,配置kafka
# tar -zxvf kafka_2.12-2.1.0.tgz # cp -r kafka_2.12-2.1.0 /bigdata/kafka # mkdir /bigdata/kafka/data # cd /bigdata/kafka/config # vim server.properties broker.id=0 //不同的机器,要不一样,不然启动时会报错 listeners=PLAINTEXT://:9092 advertised.listeners=PLAINTEXT://bigserver1:9092 //本机的hostname:9092 zookeeper.connect=bigserver1:2181,bigserver2:2181,testing:2181 //zookeeper集群
配置好后,scp到其他zookeeper的机器。
4,配置zookeeper的自动启动程序
# cat /usr/lib/systemd/system/zookeeper.service [Unit] Description=zookeeper After=syslog.target network.target [Service] Type=forking User=root Group=root ExecStart=/bigdata/zookeeper/bin/zkServer.sh start ExecStop=/bigdata/zookeeper/bin/zkServer.sh stop Restart=on-failure [Install] WantedBy=multi-user.target # systemctl start zookeeper
5,kafka自启动配置
# cat /usr/lib/systemd/system/kafka.service [Unit] Description=kafka Requires=network.target remote-fs.target After=network.target remote-fs.target zookeeper.service [Service] Type=simple User=root Group=root ExecStart=/bigdata/kafka/bin/kafka-server-start.sh /bigdata/kafka/config/server.properties ExecStop=/bigdata/kafka/bin/kafka-server-stop.sh [Install] WantedBy=multi-user.target # systemctl start kafka
6,检查是否启动成功
# jps 6199 Kafka 3370 QuorumPeerMain 6586 Jps
7,测试kafka集群
7.1,创建topic
# bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 1 --partitions 1 --topic test
在什么节点创建都行
7.2,查看topic列表
# bin/kafka-topics.sh --list --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 __consumer_offsets test
7.2,查看topic
# bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic test Topic:test PartitionCount:1 ReplicationFactor:1 Configs: Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
7.3,删除topic
# bin/kafka-run-class.sh kafka.admin.TopicCommand --delete --topic test --zookeeper bigserver1:2181,bigserver2:2181,testing:2181
7.4,启动Producer生产消息
# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic test
7.5,启动Consumer接收消息
# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic test --from-beginning
注意:
kafka2.11后,没有--zookeeper,没有--zookeeper,没有--zookeeper
端口是9092,不是2181;端口是9092,不是2181;端口是9092,不是2181;看下图
可以在多个机器上面启动kafka-console-consumer,都是可以接收的。
[root@bigserver1 config]# jps //又发又收 6199 Kafka 3370 QuorumPeerMain 6586 Jps 5916 ConsoleConsumer 5630 ConsoleProducer root@testing:/bigdata/kafka/data# jps //只收 15193 Kafka 13275 QuorumPeerMain 15661 ConsoleConsumer 16142 Jps
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2047.html