我正在使用 Kafka 0.8.2.1 SimpleConsumer 。有人可以澄清 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。(我尝试将这个问题发布到 kafka 用户组 - 但没有运气):
-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int ' soTimeout'参数——这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从任何 [或特定??] 对 Kafka 的请求(如 FetchRequest)获得响应时超时?还有什么?
kafka.javaapi.consumer.SimpleConsumer
(val host: String,
val port: Int,
val soTimeout: Int,
val bufferSize: Int,
val clientId: String)
-- Q2: 另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是每次从 Kafka 读取的最大字节数 - 如果有更多数据可用,会发生多次读取?
-- 通过 FetchRequestBuilder(见下文)构建 FetchRequest 时,我还需要指定“ fetchSize ”:
FetchRequest req= newFetchRequestBuilder ()
.clientId(kafkaGroupId)
.addFetch(topic, partition, offset, fetchSizeInBytes)
.build();
查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专业人士)这些调用转换为以下方法调用 - 并且传递给 FetchRequest 的最终参数称为“ minBytes ”,暗示这不是确切的获取大小,可能吗?. 这是否意味着它甚至不会获取任何东西,除非至少有“minBytes”的数据可用?
class FetchRequestBuilder():
def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)
def build() = {
val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)
FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
correlationId: Int = FetchRequest.DefaultCorrelationId,
clientId: String = ConsumerConfig.DefaultClientId,
replicaId: Int = Request.OrdinaryConsumerId,
maxWait: Int = FetchRequest.DefaultMaxWait,
**minBytes: Int = FetchRequest.DefaultMinBytes**,
...)
所以,我的最后一个问题是:
-- Q3:' bufferSize '和' fetchSize/minBytes '有什么关系?他们到底定义了什么?我必须确保一个比另一个更小或更大吗?
谢谢,
码头