kafka consumer 参数调优

张映 发表于 2019-04-26

分类目录: hadoop/spark/scala

标签:,

producter生产消息,基本上没有出现过问题,但是consumer遇到了不少问题。用是kafka2系列。

1,一个topic,多个consumer

2019-04-23 22:23:55 INFO AbstractCoordinator:654 - [Consumer clientId=consumer-9, groupId=track_pc] Discovered group coordinator bigserver1:9092 (id: 2147483647 rack: null)
2019-04-23 22:23:55 INFO ConsumerCoordinator:458 - [Consumer clientId=consumer-9, groupId=track_pc] Revoking previously assigned partitions []
2019-04-23 22:23:55 INFO AbstractCoordinator:486 - [Consumer clientId=consumer-9, groupId=track_pc] (Re-)joining group
2019-04-23 22:23:57 INFO AbstractCoordinator:831 - [Consumer clientId=consumer-8, groupId=track_pc] Attempt to heartbeat failed since group is rebalancing
2019-04-23 22:24:01 INFO AbstractCoordinator:831 - [Consumer clientId=consumer-8, groupId=track_pc] Attempt to heartbeat failed since group is rebalancing

代码端启动了二个consumer,导致二个consumer之间一直在rebalancing。解决办法,要么起一个consumer。如果不好控制consumer的启动个数,消费完了后就close掉。

2,consumer poll一次数据,处理时间过长,第二次consumer poll又来了

2019-04-24 16:47:32 WARN AbstractCoordinator:1011 - [Consumer clientId=consumer-1, groupId=track_pc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-04-24 16:47:32 INFO AbstractCoordinator:782 - [Consumer clientId=consumer-1, groupId=track_pc] Sending LeaveGroup request to coordinator bigserver1:9092 (id: 2147483647 rack: null)

上面的报错已经说的很清楚了,二次poll的时间间隔超过了max.poll.interval.ms设置,解决办法:1,加大max.poll.interval.ms的时间,2,减少max.poll.records的请求的条数,或者二个同时间调整。

在这里要注意:怎么调整参数,要根据topic里面单条数据的大小,以及插入到topic里面数据的频率,cpu,内存来决定。这块要亲自的实验一下,才知道。

如果consumer单条数据很小,max.poll.records设置的又比较少,这样就会导致,入hdfs时,小文件问题,很浪费系统资源。

3,建议使用手动提交

当 enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。这是因为提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。

但是使用这种方式,容易出现提交的偏移量小于客户端处理的最后一个消息的偏移量这种情况的问题。假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到5s的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。

pros.put("enable.auto.commit", "false")  //关闭自动提交
。。。。。。。。。。省略。。。。。。。。。。。。
consumer.commitSync()  //数据入hdfs后,在提交同步

4,kafka 消费者默认配置如下

 auto.commit.interval.ms = 1000
 auto.offset.reset = latest
 bootstrap.servers = [bigserver1:9092, bigserver2:9092, testing:9092]
 check.crcs = true
 client.dns.lookup = default
 client.id =
 connections.max.idle.ms = 540000
 default.api.timeout.ms = 60000
 enable.auto.commit = true
 exclude.internal.topics = true
 fetch.max.bytes = 52428800
 fetch.max.wait.ms = 500
 fetch.min.bytes = 1
 group.id = track_pc
 heartbeat.interval.ms = 3000
 interceptor.classes = []
 internal.leave.group.on.close = true
 isolation.level = read_uncommitted
 key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
 max.partition.fetch.bytes = 1048576
 max.poll.interval.ms = 300000
 max.poll.records = 500
 metadata.max.age.ms = 300000
 metric.reporters = []
 metrics.num.samples = 2
 metrics.recording.level = INFO
 metrics.sample.window.ms = 30000
 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
 receive.buffer.bytes = 65536
 reconnect.backoff.max.ms = 1000
 reconnect.backoff.ms = 50
 request.timeout.ms = 10000
 retry.backoff.ms = 100
 sasl.client.callback.handler.class = null
 sasl.jaas.config = null
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 sasl.kerberos.min.time.before.relogin = 60000
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 sasl.kerberos.ticket.renew.window.factor = 0.8
 sasl.login.callback.handler.class = null
 sasl.login.class = null
 sasl.login.refresh.buffer.seconds = 300
 sasl.login.refresh.min.period.seconds = 60
 sasl.login.refresh.window.factor = 0.8
 sasl.login.refresh.window.jitter = 0.05
 sasl.mechanism = GSSAPI
 security.protocol = PLAINTEXT
 send.buffer.bytes = 131072
 session.timeout.ms = 10000
 ssl.cipher.suites = null
 ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
 ssl.endpoint.identification.algorithm = https
 ssl.key.password = null
 ssl.keymanager.algorithm = SunX509
 ssl.keystore.location = null
 ssl.keystore.password = null
 ssl.keystore.type = JKS
 ssl.protocol = TLS
 ssl.provider = null
 ssl.secure.random.implementation = null
 ssl.trustmanager.algorithm = PKIX
 ssl.truststore.location = null
 ssl.truststore.password = null
 ssl.truststore.type = JKS
 value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

主要调优参数如下:

session.timeout.ms
这个值是会话超时时间,什么意思了,就是说如果发送心跳时间超过这个时间,broker就会认为消费者死亡了,默认值是10000ms,也就是10s(这个值一般默认没问题)

heartbeat.interval.ms
这个值是心跳时间,表示多长时间想broker报告一次,这个默认值3000ms,这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)

enable.auto.commit
是否启用自动提交。

auto.commit.interval.ms
自动提交间隔

max.poll.interval.ms
每俩次poll拉取数据时间间隔最大超时时间,超过这个值,

auto.offset.reset
earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

heartbeat.interval.ms
heartbeat心跳主要用于沟通交流,及时返回请求响应。这个时间间隔真是越快越好。因为一旦出现reblance,那么就会将新的分配方案或者通知重新加入group的命令放进心跳响应中

request.timeout.ms
这个配置控制一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka要么重发这条消息,要么超过重试次数的情况下直接置为失败。



转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2128.html