producter生产消息,基本上没有出现过问题,但是consumer遇到了不少问题。用是kafka2系列。
1,一个topic,多个consumer
2019-04-23 22:23:55 INFO AbstractCoordinator:654 - [Consumer clientId=consumer-9, groupId=track_pc] Discovered group coordinator bigserver1:9092 (id: 2147483647 rack: null)
2019-04-23 22:23:55 INFO ConsumerCoordinator:458 - [Consumer clientId=consumer-9, groupId=track_pc] Revoking previously assigned partitions []
2019-04-23 22:23:55 INFO AbstractCoordinator:486 - [Consumer clientId=consumer-9, groupId=track_pc] (Re-)joining group
2019-04-23 22:23:57 INFO AbstractCoordinator:831 - [Consumer clientId=consumer-8, groupId=track_pc] Attempt to heartbeat failed since group is rebalancing
2019-04-23 22:24:01 INFO AbstractCoordinator:831 - [Consumer clientId=consumer-8, groupId=track_pc] Attempt to heartbeat failed since group is rebalancing
代码端启动了二个consumer,导致二个consumer之间一直在rebalancing。解决办法,要么起一个consumer。如果不好控制consumer的启动个数,消费完了后就close掉。
2,consumer poll一次数据,处理时间过长,第二次consumer poll又来了
2019-04-24 16:47:32 WARN AbstractCoordinator:1011 - [Consumer clientId=consumer-1, groupId=track_pc] This member will leave the group because consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
2019-04-24 16:47:32 INFO AbstractCoordinator:782 - [Consumer clientId=consumer-1, groupId=track_pc] Sending LeaveGroup request to coordinator bigserver1:9092 (id: 2147483647 rack: null)
上面的报错已经说的很清楚了,二次poll的时间间隔超过了max.poll.interval.ms设置,解决办法:1,加大max.poll.interval.ms的时间,2,减少max.poll.records的请求的条数,或者二个同时间调整。
在这里要注意:怎么调整参数,要根据topic里面单条数据的大小,以及插入到topic里面数据的频率,cpu,内存来决定。这块要亲自的实验一下,才知道。
如果consumer单条数据很小,max.poll.records设置的又比较少,这样就会导致,入hdfs时,小文件问题,很浪费系统资源。
3,建议使用手动提交
当 enable.auto.commit 属性被设为 true,那么每过 5s,消费者会自动把从 poll()方法接收到的最大偏移量提交上去。这是因为提交时间间隔由 auto.commit.interval.ms 控制,默认值是 5s。与消费者里的其他东西一样,自动提交也是在轮询里进行的。消费者每次在进行轮询时会检查是否该提交偏移量了,如果是,那么就会提交从上一次轮询返回的偏移量。
但是使用这种方式,容易出现提交的偏移量小于客户端处理的最后一个消息的偏移量这种情况的问题。假设我们仍然使用默认的 5s 提交时间间隔,在最近一次提交之后的 3s 发生了再均衡,再均衡之后,消费者从最后一次提交的偏移量位置开始读取消息。这个时候偏移量已经落后了 3s(因为没有达到5s的时限,并没有提交偏移量),所以在这 3s 的数据将会被重复处理。一般情况下不会有什么问题,不过在处理异常或提前退出轮询时要格外小心。
pros.put("enable.auto.commit", "false") //关闭自动提交 。。。。。。。。。。省略。。。。。。。。。。。。 consumer.commitSync() //数据入hdfs后,在提交同步
4,kafka 消费者默认配置如下
auto.commit.interval.ms = 1000 auto.offset.reset = latest bootstrap.servers = [bigserver1:9092, bigserver2:9092, testing:9092] check.crcs = true client.dns.lookup = default client.id = connections.max.idle.ms = 540000 default.api.timeout.ms = 60000 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1 group.id = track_pc heartbeat.interval.ms = 3000 interceptor.classes = [] internal.leave.group.on.close = true isolation.level = read_uncommitted key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer max.partition.fetch.bytes = 1048576 max.poll.interval.ms = 300000 max.poll.records = 500 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor] receive.buffer.bytes = 65536 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 10000 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 session.timeout.ms = 10000 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
主要调优参数如下:
session.timeout.ms
这个值是会话超时时间,什么意思了,就是说如果发送心跳时间超过这个时间,broker就会认为消费者死亡了,默认值是10000ms,也就是10s(这个值一般默认没问题)
heartbeat.interval.ms
这个值是心跳时间,表示多长时间想broker报告一次,这个默认值3000ms,这个值官方推荐不要高于session.timeout.ms 的1/3(这个值默认没问题)
enable.auto.commit
是否启用自动提交。
auto.commit.interval.ms
自动提交间隔
max.poll.interval.ms
每俩次poll拉取数据时间间隔最大超时时间,超过这个值,
auto.offset.reset
earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
heartbeat.interval.ms
heartbeat心跳主要用于沟通交流,及时返回请求响应。这个时间间隔真是越快越好。因为一旦出现reblance,那么就会将新的分配方案或者通知重新加入group的命令放进心跳响应中
request.timeout.ms
这个配置控制一次请求响应的最长等待时间。如果在超时时间内未得到响应,kafka要么重发这条消息,要么超过重试次数的情况下直接置为失败。
转载请注明
作者:海底苍鹰
地址:http://blog.51yip.com/hadoop/2128.html