5

我正在使用 Kafka 0.8.2.1 SimpleConsumer 。有人可以澄清 SimpleConsumer 和 FetchRequestBuilder 的一些配置参数的含义吗?由于没有阅读 KAfka 的源代码,我当时找不到任何文档。(我尝试将这个问题发布到 kafka 用户组 - 但没有运气):

-- Q1:在 SimpleConsumer 构造函数的签名中,我看到了 Int ' soTimeout'参数——这个超时是什么意思?这是连接到 Kafka 代理的超时吗?从任何 [或特定??] 对 Kafka 的请求(如 FetchRequest)获得响应时超时?还有什么?

kafka.javaapi.consumer.SimpleConsumer
    (val host: String,
     val port: Int,
     val soTimeout: Int,
     val bufferSize: Int,
     val clientId: String)

-- Q2: 另外,SimpleConsumer 构造函数采用 Int 'bufferSize' 参数。它的意义是什么?这是发出 fetchRequest 时 SimpleConsumer 将读取多少字节?或者它是每次从 Kafka 读取的最大字节数 - 如果有更多数据可用,会发生多次读取?

-- 通过 FetchRequestBuilder(见下文)构建 FetchRequest 时,我还需要指定“ fetchSize ”:

FetchRequest req= newFetchRequestBuilder ()
  .clientId(kafkaGroupId)
  .addFetch(topic, partition, offset, fetchSizeInBytes)
  .build();

查看 FetchRequestBuilder 的源代码,我认为(我不是 Scala 专业人士)这些调用转换为以下方法调用 - 并且传递给 FetchRequest 的最终参数称为“ minBytes ”,暗示这不是确切的获取大小,可能吗?. 这是否意味着它甚至不会获取任何东西,除非至少有“minBytes”的数据可用?

class FetchRequestBuilder():
    def addFetch(topic: String, partition: Int, offset: Long, fetchSize: Int)

    def build() = {
      val fetchRequest= FetchRequest(versionId, correlationId.getAndIncrement, clientId, replicaId, maxWait, minBytes, requestMap.toMap)

FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
    correlationId: Int = FetchRequest.DefaultCorrelationId,
    clientId: String = ConsumerConfig.DefaultClientId,
    replicaId: Int = Request.OrdinaryConsumerId,
    maxWait: Int = FetchRequest.DefaultMaxWait,
    **minBytes: Int = FetchRequest.DefaultMinBytes**,
...)

所以,我的最后一个问题是:

-- Q3:' bufferSize '和' fetchSize/minBytes '有什么关系?他们到底定义了什么?我必须确保一个比另一个更小或更大吗?

谢谢,

码头

4

1 回答 1

2

soTimeout 是等待连接到给定代理的时间(以毫秒为单位)。我不知道连接会发生什么特别的事情,除了你得到验证,那里有一个准备好执行一些后续操作的代理。

我相信构造函数中使用的bufferSize是客户端socket用来接收broker发送的数据的缓冲区的大小。

对于您的最后一个问题,如果 fetch 请求出于某种原因返回的总字节数大于请求的套接字缓冲区大小,那么即使存在一个单个更高级别的 fetch 调用。

于 2015-06-09T14:57:48.360 回答