11 消费者执行轮询消息poll方法
- 更新元数据
- 消费者协调器执行poll逻辑
-
消费者协调器查询最少连接节点
-
开始
-
调用KafkaConsumer的poll方法
-
获取轻量级锁然后检测消费者开启状态
-
消费者监控指标KafkaConsumerMetrics记录poll开始执行时间
-
如果 SubscriptionState 的subscriptionType类型为SubscriptionType.NONE则抛出异常
-
执行未超时则循环fetch处理逻辑
-
ConsumerNetworkClient唤醒状态判断
-
updateAssignmentMetadataIfNeeded尝试更新分配元数据,但不需要阻止加入组的计时器
-
Fetcher触发发送逻辑调用的ConsumerNetworkClient类型的sendFetches
-
prepareFetchRequests为在飞行队列中没有现有请求的分区分配的所有节点创建获取请求。 。
-
ConsumerNetworkClient的send方法开始发送当前topic分区数据
-
KafkaClient的.wakeup()唤醒客户端,以防它在轮询中阻塞,以便我们可以发送排队的请求
-
ConsumerNetworkClient的poll
-
调用KafkaClient的send将给定请求排队,以便在后续轮询(长)调用中发送
-
selector.poll执行IO事件
-
处理响应结果
-
结束