我想将 Kafka 主题中的所有数据存储到 Amazon S3 中。我有一个 Kafka 集群,它在一个主题中每秒接收 200.000 条消息,每个值消息有 50 个字段(字符串、时间戳、整数和浮点数)。
我的主要想法是使用 Kafka 连接器将数据存储在存储桶 s3 中,然后使用 Amazon Glue 转换数据并将其保存到另一个存储桶中。我有下一个问题:
1)怎么做?这种架构会很好用吗?我尝试使用 Amazon EMR (Spark Streaming),但有太多顾虑如何使用 Apache Spark 从 Apache Kafka 流式传输事件来减少处理时间和失败的任务?
2) 我尝试使用 Confluent 的 Kafka Connect,但我有几个问题:
我可以从其他 Kafka 实例连接到我的 Kafka 集群并以独立方式运行我的 Kafka 连接器 s3 吗?
这个错误“ERROR Task s3-sink-0 throw an unaught an
unrecoverable exception”是什么意思?
ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 在 java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:26,086] 错误任务正在被杀死,并且在手动重新启动之前无法恢复(org.apache.kafka.connect.runtime.WorkerTask:143)[2018-10-05 15:32:27,980] WARN 无法创建目录使用 url 文件中的目录:/targ。跳过。(org.reflections.Reflections:104) java.lang.NullPointerException at org.reflections.vfs.Vfs$DefaultUrlTypes$3.matches(Vfs.java:239) at org.reflections.vfs.Vfs.fromURL(Vfs.java:98) ) 在 org.reflections.vfs.Vfs.fromURL(Vfs.java:91) 在 org.reflections.Reflections.scan(Reflections.java:237) 在 org.reflections.Reflections.scan(Reflections.java:204) 在 org .reflections.Reflections。(Reflections.java:129) 在 org.apache.kafka.connect.runtime.AbstractHerder.connectorPlugins(AbstractHerder.java:268) 在 org.apache.kafka.connect.runtime.AbstractHerder$1.run(AbstractHerder.java:377)在 java.lang.Thread.run(Thread.java:745) [2018-10-05 15:32:27,981] 警告无法从 url 创建 Vfs.Dir。忽略异常并继续 (org.reflections.Reflections:208) org.reflections.ReflectionsException: 无法从 url 创建 Vfs.Dir,找不到匹配的 UrlType [file:/targ] 要么使用 fromURL(final URL url, final List urlTypes) 或使用静态 setDefaultURLTypes(final List urlTypes) 或 addDefaultURLTypes(UrlType urlType) 与您的专门 UrlType。在 org.reflections.vfs.Vfs.fromURL(Vfs.java:109) 在 org.reflections.vfs.Vfs.fromURL(Vfs.java:91) 在 org.reflections.Reflections。
- 如果您可以恢复连接到 Kafka 的步骤并从
另一个 Kafka 实例继续使用 s3,您会怎么做? - 什么是所有这些字段 key.converter, value.converter, key.converter.schemas.enable, value.converter.schemas.enable, internal.key.converter,internal.value.converter, internal.key.converter.schemas.enable , internal.value.converter.schemas.enable?
key.converter、value.converter 的可能值是多少?
3)一旦我的原始数据在存储桶中,我想使用 Amazon Glue 来获取这些数据,反序列化 Protobuffer,更改某些字段的格式,最后将其存储在 Parquet 中的另一个存储桶中。如何在 Amazon Glue 中使用我自己的 java protobuffer 库?
4) 如果我想用 Amazon Athena 查询,如何自动加载分区(年、月、日、小时)?使用 Amazon Glue 的爬虫和调度程序?