0

我正在尝试运行 kafka -storm-cassandra ,在我的情况下,tail2kafka 本身就是一个生产者,当我开始消费主题时,它会抛出下面提到的错误。请帮帮我。

谢谢

[2015-05-13 15:28:51,784] 由于错误 (kafka.network.Processor) java.lang.OutOfMemoryError: Java heap space at kafka.api.ProducerRequest$$anonfun$1$ 错误关闭 /127.0.0.1 的套接字$anonfun$apply$1.apply(ProducerRequest.scala:45) at kafka.api.ProducerRequest$$anonfun$1$$anonfun$apply$1.apply(ProducerRequest.scala:42) at scala.collection.TraversableLike$$anonfun$map $1.apply(TraversableLike.scala:206) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:206) at scala.collection.immutable.Range$ByOne$class.foreach(Range.scala: 282) 在 scala.collection.immutable.Range$$anon$1.foreach(Range.scala:27​​4) 在 scala.collection.TraversableLike$class.map(TraversableLike.scala:206) 在 scala.collection.immutable.Range.map (Range.scala:39) 在 kafka.api。ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:42) at kafka.api.ProducerRequest$$anonfun$1.apply(ProducerRequest.scala:38) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike. scala:227) 在 scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:227) 在 scala.collection.immutable.Range$ByOne$class.foreach(Range.scala:282) 在 scala.collection .immutable.Range$$anon$1.foreach(Range.scala:27​​4) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:227) at scala.collection.immutable.Range.flatMap(Range.scala:39 ) 在 kafka.api.ProducerRequest$.readFrom(ProducerRequest.scala:38) 在 kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys.scala:36) 在 kafka.api.RequestKeys$$anonfun$1.apply(RequestKeys .scala:36) 在 kafka.network.RequestChannel$Request.(RequestChannel.scala:53) 在 kafka.network.Processor.read(SocketServer.scala:353) 在 kafka.network.Processor.run(SocketServer.scala:245) 在 java .lang.Thread.run(Thread.java:745)

我的消费者代码是

导入结构导入时间

导入 kafka.io 导入 kafka.request_type

类消费者(kafka.io.IO):

CONSUME_REQUEST_TYPE = kafka.request_type.FETCH

MAX_SIZE = 1024 * 1024

# 秒。DEFAULT_POLLING_INTERVAL = 2

def init (self, topic, partition=0, host='localhost', port=9092): kafka.io.IO. 初始化(自身,主机,端口)

#: The topic queue to consume.
self.topic        = topic

#: The partition the topic queue is on.
self.partition    = partition

#: Offset in the Kafka queue in bytes?
self.offset       = 1

#: Maximum message size to consume.
self.max_size     = self.MAX_SIZE
self.request_type = self.CONSUME_REQUEST_TYPE
self.polling      = self.DEFAULT_POLLING_INTERVAL

self.connect()

def consume(self): """ 从主题队列中消费数据。"""

self.send_consume_request()

return self.parse_message_set_from(self.read_data_response())

def loop(self): """ 以阻塞方式循环来自队列的传入消息。设置polling检查间隔以秒为单位。"""

while True:
  messages = self.consume()

  if messages and isinstance(messages, list) and len(messages) > 0:
    for message in messages:
      yield message

  time.sleep(self.polling)

# 请求类型 ID + 主题长度 + 主题 + 分区 + 偏移量 + 最大尺寸 def request_size(self): return 2 + 2 + len(self.topic) + 4 + 8 + 4

def encode_request_size(self): return struct.pack('>i', self.request_size())

def encode_request(self): 长度 = len(self.topic)

return struct.pack('>HH%dsiQi' % length, self.request_type, length, self.topic, self.partition, self.offset, self.max_size)

def send_consume_request(self): self.write(self.encode_request_size()) self.write(self.encode_request())

def read_data_response(self): buf_length = struct.unpack('>i', self.read(4))[0]

# Start with a 2 byte offset
return self.read(buf_length)[2:]

def parse_message_set_from(self, data): 消息 = [] 已处理 = 0 长度 = len(data) - 4

while (processed <= length):
  message_size = struct.unpack('>i', data[processed:processed+4])[0]
  messages.append(kafka.message.parse_from(data[processed:processed + message_size + 4]))
  processed += 4 + message_size

self.offset += processed

return messages
4

0 回答 0