1

我们已经成功运行 Camus 大约一年,以仅使用几个 Kafka 主题从 Kafka(0.82 版)中提取 avro 有效负载并将 .avro 文件存储为 HDFS 中。最近,我们公司的一个新团队在我们的预生产环境中注册了大约 60 个新主题,并开始向这些主题发送数据。该团队在将数据路由到 kafka 主题时犯了一些错误,这导致 Camus 将有效负载反序列化为这些主题的 avro 时出错。Camus 作业由于超出“失败的其他”错误阈值而失败。失败后 Camus 中的行为令人惊讶,我想与其他开发人员核实我们观察到的行为是否是预期的,或者我们的实现是否存在问题。

当 Camus 作业由于超过“失败的其他”阈值而失败时,我们注意到了这种行为: 1. 所有映射器任务都成功了,因此允许 TaskAttempt 提交 - 这意味着 Camus 写入的所有数据都被复制到最终的 HDFS 位置。2. CamusJob 在计算 % 错误率时抛出异常(这是在映射器提交之后),导致作业失败 3. 因为作业失败(我认为),Kafka 偏移量没有提前

我们遇到这种行为的问题是我们的 Camus 作业设置为每 5 分钟运行一次。因此,每 5 分钟我们就会看到数据被提交到 HDFS,作业失败,并且 Kafka 偏移量没有更新——这意味着我们写入了重复的数据,直到我们注意到我们的磁盘已满。

我写了一个集成测试来确认结果——它向一个主题提交了 10 条好记录,向同一个主题提交了 10 条使用意外模式的记录,运行 Camus 作业时只将该主题列入白名单,我们可以看到 10 条记录是写入 HDFS 并且 Kafka 偏移量不高级。下面是该测试的日志片段,以及我们在运行作业时使用的属性。

感谢您提供任何帮助 - 我不确定这是否是 Camus 的预期行为,或者我们的实现是否存在问题,以及防止这种行为(复制数据)的最佳方法是什么。

谢谢~马特

测试的 CamusJob 属性:

etl.destination.path=/user/camus/kafka/data
etl.execution.base.path=/user/camus/kafka/workspace
etl.execution.history.path=/user/camus/kafka/history
dfs.default.classpath.dir=/user/camus/kafka/libs

etl.record.writer.provider.class=com.linkedin.camus.etl.kafka.common.AvroRecordWriterProvider
camus.message.decoder.class=com.linkedin.camus.etl.kafka.coders.KafkaAvroMessageDecoder

camus.message.timestamp.format=yyyy-MM-dd HH:mm:ss Z
mapreduce.output.fileoutputformat.compress=false

mapred.map.tasks=15
kafka.max.pull.hrs=1
kafka.max.historical.days=3

kafka.whitelist.topics=advertising.edmunds.admax
log4j.configuration=true

kafka.client.name=camus
kafka.brokers=<kafka brokers>
max.decoder.exceptions.to.print=5
post.tracking.counts.to.kafka=true
monitoring.event.class=class.that.generates.record.to.submit.counts.to.kafka
kafka.message.coder.schema.registry.class=com.linkedin.camus.schemaregistry.AvroRestSchemaRegistry
etl.schema.registry.url=<schema repo url>
etl.run.tracking.post=false
kafka.monitor.time.granularity=10

etl.daily=daily
etl.ignore.schema.errors=false

etl.output.codec=deflate
etl.deflate.level=6
etl.default.timezone=America/Los_Angeles
mapred.output.compress=false
mapred.map.max.attempts=2

测试中的日志片段,显示映射器成功后的提交行为以及由于超过“其他”阈值而导致的后续作业失败:

LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task:attempt_local866350146_0001_m_000000_0 is done. And is in the process of committing

[LocalJobRunner] - advertising.edmunds.admax:2:6; advertising.edmunds.admax:3:7 begin read at 2016-07-08T05:50:26.215-07:00; advertising.edmunds.admax:1:5; advertising.edmunds.admax:2:2; advertising.edmunds.admax:3:3 begin read at 2016-07-08T05:50:30.517-07:00; advertising.edmunds.admax:0:4 > map

[Task] - Task attempt_local866350146_0001_m_000000_0 is allowed to commit now

[EtlMultiOutputFormat] - work path: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0

[EtlMultiOutputFormat] - Destination base path: /user/camus/kafka/data

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.3.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.3.2.2.1467979200000.avro

[EtlMultiOutputFormat] - work file: data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro

[EtlMultiOutputFormat] - Moved file from: file:/user/camus/kafka/workspace/2016-07-08-12-50-20/_temporary/0/_temporary/attempt_local866350146_0001_m_000000_0/data.advertising-edmunds-admax.3.7.1467979200000-m-00000.avro to: /user/camus/kafka/data/advertising-edmunds-admax/advertising-edmunds-admax.3.7.8.8.1467979200000.avro

[Task] - Task 'attempt_local866350146_0001_m_000000_0' done.
[LocalJobRunner] - Finishing task: attempt_local866350146_0001_m_000000_0
[LocalJobRunner] - map task executor complete.
[Job] -  map 100% reduce 0%
[Job] - Job job_local866350146_0001 completed successfully
[Job] - Counters: 23
File System Counters
FILE: Number of bytes read=117251
FILE: Number of bytes written=350942
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
Map-Reduce Framework
Map input records=10
Map output records=15
Input split bytes=793
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=13
Total committed heap usage (bytes)=251658240
com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
DECODE_SUCCESSFUL=10
SKIPPED_OTHER=10
File Input Format Counters 
Bytes Read=0
File Output Format Counters 
Bytes Written=5907
total
data-read=840
decode-time(ms)=123
event-count=20
mapper-time(ms)=58
request-time(ms)=12114
skip-old=0
[CamusJob] - Group: File System Counters
[CamusJob] - FILE: Number of bytes read:    117251
[CamusJob] - FILE: Number of bytes written: 350942
[CamusJob] - FILE: Number of read operations:   0
[CamusJob] - FILE: Number of large read operations: 0
[CamusJob] - FILE: Number of write operations:  0
[CamusJob] - Group: Map-Reduce Framework
[CamusJob] - Map input records: 10
[CamusJob] - Map output records:    15
[CamusJob] - Input split bytes: 793
[CamusJob] - Spilled Records:   0
[CamusJob] - Failed Shuffles:   0
[CamusJob] - Merged Map outputs:    0
[CamusJob] - GC time elapsed (ms):  13
[CamusJob] - Total committed heap usage (bytes):    251658240
[CamusJob] - Group: com.linkedin.camus.etl.kafka.mapred.EtlRecordReader$KAFKA_MSG
[CamusJob] - DECODE_SUCCESSFUL: 10
[CamusJob] - SKIPPED_OTHER: 10
[CamusJob] - job failed: 50.0% messages skipped due to other, maximum allowed is 0.1%
4

1 回答 1

1

我面临着一个非常相似的问题:我的 Kafka/Camus 管道已经运行了大约一年,但最近我在将远程代理的摄取与非常不稳定的连接和频繁的工作失败集成时遇到了重复问题。

今天在查看Gobblin 文档时,我意识到Camus sweeper可能是我们正在寻找的工具。尝试将其集成到您的管道中。

我还认为,在不久的将来迁移到 Gobblin(加缪的继任者)是个好主意。

于 2016-07-27T06:08:20.967 回答