0

我想将 Kafka 主题中的所有数据存储到 Amazon S3 中。我有一个 Kafka 集群,它在一个主题中每秒接收 200.000 条消息,每个值消息有 50 个字段(字符串、时间戳、整数和浮点数)。

我的主要想法是使用 Kafka 连接器将数据存储在存储桶 s3 中,然后使用 Amazon Glue 转换数据并将其保存到另一个存储桶中。我有下一个问题:

1)怎么做?这种架构会很好用吗?我尝试使用 Amazon EMR (Spark Streaming),但有太多顾虑如何使用 Apache Spark 从 Apache Kafka 流式传输事件来减少处理时间和失败的任务?

2) 我尝试使用 Confluent 的 Kafka Connect,但我有几个问题:

  • 我可以从其他 Kafka 实例连接到我的 Kafka 集群并以独立方式运行我的 Kafka 连接器 s3 吗?

  • 这个错误“ERROR Task s3-sink-0 throw an unaught an
    unrecoverable exception”是什么意思?

ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:26,086] 错误任务正在被杀死,并且在手动重新启动之前无法恢复(org.apache.kafka.connect.runtime.WorkerTask:143)[2018-10-05 15:32:27,980] WARN 无法创建目录使用 url 文件中的目录:/targ。跳过。(org.reflections.Reflections:104) java.lang.NullPointerException at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239) at org.reflections.vfs.Vfs.fromURL(Vfs.java:98) ) 在 org.reflections.vfs.Vfs.fromURL(Vfs.java:91) 在 org.reflections.Reflections.scan(Reflections.java:237) 在 org.reflections.Reflections.scan(Reflections.java:204) 在 org .reflections.Reflections。(Reflections.java:129) 在 org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) 在 org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:377)在 java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:27,981] 警告无法从 url 创建 Vfs.Dir。忽略异常并继续 (org.reflections.Reflections:208) org.reflections.ReflectionsException: 无法从 url 创建 Vfs.Dir,找不到匹配的 UrlType [file:/targ] 要么使用 fromURL(final URL url, final List urlTypes) 或使用静态 setDefaultURLTypes(final List urlTypes) 或 addDefaultURLTypes(UrlType urlType) 与您的专门 UrlType。在 org.reflections.vfs.Vfs.fromURL(Vfs.java:109) 在 org.reflections.vfs.Vfs.fromURL(Vfs.java:91) 在 org.reflections.Reflections。

  • 如果您可以恢复连接到 Kafka 的步骤并从
    另一个 Kafka 实例继续使用 s3,您会怎么做?
  • 什么是所有这些字段 key.converter, value.converter, key.converter.schemas.enable, value.converter.schemas.enable, internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable , internal.value.converter.schemas.enable?

key.converter、value.converter 的可能值是多少?

3)一旦我的原始数据在存储桶中,我想使用 Amazon Glue 来获取这些数据,反序列化 Protobuffer,更改某些字段的格式,最后将其存储在 Parquet 中的另一个存储桶中。如何在 Amazon Glue 中使用我自己的 java protobuffer 库?

4) 如果我想用 Amazon Athena 查询,如何自动加载分区(年、月、日、小时)?使用 Amazon Glue 的爬虫和调度程序?

4

2 回答 2

1

我们使用 S3 Connect 处理数百个主题,并使用 Hive、Athena、Spark、Presto 等处理数据。似乎工作正常,但我觉得实际数据库可能会更快地返回结果。

无论如何,回答关于连接

我可以从其他 Kafka 实例连接到我的 Kafka 集群并以独立方式运行我的 Kafka 连接器 s3 吗?

我不确定我是否理解这个问题,但是 Kafka Connect 需要连接到一个集群,您不需要两个 Kafka 集群来使用它。您通常会将 Kafka Connect 进程作为其自己集群的一部分运行,而不是在代理上运行。

这个错误“ERROR Task s3-sink-0 throw an unaught an unrecoverable exception”是什么意思?

这意味着您需要查看日志以找出引发了什么异常并阻止连接器读取数据。

WARN could not create Dir using directory from url file:/targ...如果您使用的是 HDFS 连接器,我认为您不应该使用默认的 file:// URI

如果您可以恢复连接到 Kafka 的步骤并从另一个 Kafka 实例继续使用 s3,您会怎么做?

您不能“从另一个 Kafka 实例恢复”。如前所述,Connect 只能从单个 Kafka 集群消费,任何消费的偏移量和消费者组都存储在其中。

所有这些字段是什么意思

这些字段已从最新的 Kafka 版本中删除,您可以忽略它们。你绝对不应该改变它们

internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable, internal.value.converter.schemas.enable

这些是您的序列化器和反序列化器,就像常规的生产者消费者 API 一样

key.converter, value.converter

我相信这些只对 JSON 转换器很重要。见https://rmoff.net/2017/09/06/kafka-connect-jsondeserializer-with-schemas-enable-requires-schema-and-payload-fields

key.converter.schemas.enable, value.converter.schemas.enable

反序列化 Protobuf,改变一些字段的格式,最后存储在 Parquet 的另一个桶中

Kafka Connect 需要加载一个 Protobuf 转换器,我不知道有没有(我认为 Blue Apron 写了一些东西......搜索 github)。

一般来说,Avro 会更容易转换为 Parquet,因为已经有本地库可以做到这一点。Confluent 的 S3 Connect 目前不编写 Parquet 格式,但在一个开放的 PR 中。另一种方法是使用Pinterest Secor库。

我不知道 Glue,但如果它像 Hive,你会ADD JAR在查询期间使用来加载外部代码插件和函数

我对 Athena 的经验很少,但 Glue 将所有分区维护为 Hive 元存储。自动部分是爬虫,您可以在查询上放置一个过滤器来进行分区修剪

于 2018-10-05T14:35:33.730 回答
1

补充@cricket_007的答案

我可以从其他 Kafka 实例连接到我的 Kafka 集群并以独立方式运行我的 Kafka 连接器 s3 吗?

Kafka S3 连接器是 Confluent 发行版的一部分,该发行版还包括 Kafka 以及其他相关服务,但它并不意味着直接在您的代理上运行,而是:

  • 作为运行服务启动时给定的连接器配置的独立工作者
  • 或者作为在 Kafka Brokers 集群一侧运行的额外工作集群。在这种情况下,通过 Kafka Connect REST API 更好地交互/运行连接器(搜索“管理 Kafka 连接器”以获取带有示例的文档)

如果您可以恢复连接到 Kafka 的步骤并从另一个 Kafka 实例继续使用 s3,您会怎么做?

您是在谈论另一个 Kafka Connect实例吗?

  • 如果是这样,您可以简单地以分布式模式执行 Kafka Connect 服务,这旨在提供您似乎正在寻找的可靠性......

还是您的意思是另一个 Kafka(经纪人)集群

  • 在这种情况下,您可以尝试(但这将是实验性的,我自己还没有尝试过......)以独立模式运行 Kafka Connect,并简单地更新bootstrap.servers连接器配置的参数以指向新集群。为什么这可能有效:在独立模式下,接收器连接器的偏移量本地存储在您的工作程序上(与偏移量直接存储在 Kafka 集群上的分布式模式相反......)。为什么这可能不起作用:它根本不适合这种用途,我猜你可能需要你的主题和分区完全相同......?

key.converter、value.converter 的可能值是多少?

检查Confluent 的 kafka-connect-s3 文档;)

如何在 Amazon Glue 中使用我自己的 java protobuffer 库?

不确定实际的方法,但是 Glue 作业在幕后产生了一个 EMR 集群,所以我不明白为什么它不应该是可能的......

如果我想使用 Amazon Athena 进行查询,如何自动加载分区(年、月、日、小时)?使用 Amazon Glue 的爬虫和调度程序?

是的。

假设每天进行分区,您实际上可以安排在早上第一件事运行爬虫,只要您可以预期新数据已在 S3 上创建当天的文件夹(因此当天至少有一个对象存在于 S3 )...爬虫将添加当天的分区,然后可用于查询任何新添加的对象。

于 2019-01-02T14:27:02.360 回答