38

我正在评估 AWS Managed Service Kafka (MSK),我知道它目前处于预览模式,因此可能没有所有功能或适当的文档。我尝试设置 msk 集群并验证 msk 是否可以满足我们公司的所有用例/要求,但目前它缺乏文档和示例。

https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html

我有以下查询:

i) 如何使用在我的本地系统上运行的 Kafka 客户端访问 AWS MSK?

ii) MSK 是否支持模式演化和仅一次语义?

iii) MSK 是否会提供一些方法来更新某些集群或调整配置?就像 aws 胶水在其托管环境中为 spark executr 和驱动程序内存提供参数更改一样。

iv) 是否可以将 MSK 与其他 AWS 服务(例如 Redshift、EMR 等)集成?

v) 我可以通过 ksql 将流式 sql 与 MSK 一起使用吗?如何使用 MSK 设置 KSQL?

vi) 如何对流经 MSK 的数据进行实时预测分析?

vii) 与 Azure/confluent 中的其他基于云的 kafka 集群相比,MSK 的可靠性如何,以及与 vanilla kafka 相比的任何性能基准?集群中可以启动的最大代理数量是多少?

4

1 回答 1

93

MSK 基本上是由 aws 定制和管理的 vanilla apache kafka 集群(具有基于集群实例类型、代理数量等的预定义配置设置),针对云环境进行了调整。

理想情况下,它应该能够执行开源 Kafka 支持的所有/大多数事情。此外,如果您有未记录的特定用例或要求,我建议您联系 AWS 支持以进一步说明有关 kafka 集群的托管部分(允许的最大代理数量、可靠性、成本)。

我将尝试根据我的个人经验回答您的问题:

i) 如何使用在我的本地系统上运行的 kafka 客户端访问 AWS MSK?

您不能使用 kafka 客户端或 kafka 流直接从本地或本地计算机访问 MSK。因为代理 url、zookeeper 连接字符串是 msk 集群 vpc/subnet 的私有 ip。要通过kafka客户端访问,需要在MsK的同一个vpc中启动ec2实例并执行kafka客户端(生产者/消费者)访问msk集群。

要从本地机器或本地系统访问 MSK 集群,您可以设置Confluent 开源的kafka Rest Proxy框架,以通过 rest api 从外部访问 MSK 集群。该框架不是成熟的 kafka 客户端,不允许对 kafka 客户端进行所有操作,但您可以在集群上进行大部分操作,从获取集群元数据、主题信息、生产和消费消息等开始。

首先设置 confluent repo 和 ec2 实例安全组(请参阅 -第 1 节:预安装或设置 - 附加 kafka 组件),然后安装/设置 kafka rest 代理。

sudo yum install confluent-kafka-rest 

创建文件名 kafka-rest.properties 并添加以下内容-

bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
zookeeper.connect=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
schema.registry.url=http://localhost:8081

** 修改 bootstrapserver 和 zookeeper url/ips。

启动休息服务器

kafka-rest-start kafka-rest.properties &

使用 curl 或 rest 客户端/浏览器通过 rest API 访问 MSK。

获取主题列表

curl "http://localhost:8082/topics"

curl "http://<ec2 instance public ip>:8082/topics"

为了从本地或本地机器进行访问,请确保运行其余服务器的 ec2 实例附加了公共 ip 或弹性 ip。

更多 Rest API 操作 https://github.com/confluentinc/kafka-rest

ii) MSK 是否支持模式演化和仅一次语义?

您可以将 avro 消息与“架构注册表”一起使用来实现架构演变和架构维护。

安装和设置模式注册表类似于 confluent kafka-rest 代理。

sudo yum install confluent-schema-registry

创建文件名 schema-registry.propertie 并添加以下内容-

listeners=http://0.0.0.0:8081
kafkastore.connection.url=10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181
kafkastore.bootstrap.servers=PLAINTEXT://10.0.10.106:9092,PLAINTEXT://10.0.20.27:9092,PLAINTEXT://10.0.0.119:9092
kafkastore.topic=_schemas
debug=false

** 修改 bootstrapserver 和 zookeeper(connection) url/ips。

启动模式注册服务

schema-registry-start schema-registry.properties &

更多信息请参考: https ://github.com/confluentinc/schema-registry

https://docs.confluent.io/current/schema-registry/docs/schema_registry_tutorial.html

恰好语义是 apache kafka 的特性,虽然我没有在 msk 上测试过它,但我相信它应该支持这个特性,因为它只是开源 apache kafka 的一部分。

iii) MSK 是否会提供一些方法来更新某些集群或调整配置?就像 aws 胶水在其托管环境中为 spark 执行器和驱动程序内存提供参数更改一样。

是的,可以在运行时更改配置参数。我已经通过使用 kafka 配置工具更改了retention.ms 参数进行了测试,并且该更改立即应用于该主题。所以我认为你也可以更新其他参数,但是 MSK 可能不允许所有配置更改,就像 AWS 胶水只允许很少的 spark 配置参数更改一样,因为允许用户更改所有参数可能容易受到托管环境的影响。

通过 kafka 配置工具进行更改

kafka-configs.sh --zookeeper 10.0.10.83:2181,10.0.20.22:2181,10.0.0.218:2181  --entity-type topics --entity-name jsontest --alter --add-config retention.ms=128000

使用休息验证更改

curl "http://localhost:8082/topics/jsontest"

现在,Amazon MSK 使您能够创建自定义 MSK 配置。

请参阅下面的文档以获取可以更新的配置/参数:

https://docs.aws.amazon.com/msk/latest/developerguide/msk-configuration-properties.html

也是 MSK Kafka 的默认配置:

https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html

iv) 是否可以将 MSK 与其他 AWS 服务(例如 Redshift、EMR 等)集成?

是的,您可以使用 MSK 连接/集成到其他 aws 服务。例如,您可以运行 Kafka 客户端(消费者)从 kafka 读取数据并写入 redshift、rds、s3 或 dynamodb。确保 kafka 客户端在具有适当 iam 角色来访问这些服务的 ec2 实例(在 msk vpc 内)上运行,并且 ec2 实例位于公共子网或私有子网中(具有 s3 的 NAT 或 vpc 端点)。

您也可以在 MSK 集群 vpc/subnet 中启动 EMR,然后通过 EMR(spark) 连接到其他服务。

使用 AWS Managed Service Kafka 进行 Spark 结构流式传输

在 MSK 集群的 vpc 中启动 EMR 集群 在端口 9092 的 MSK 集群安全组的入站规则中允许 EMR 主从安全组

启动 Spark 外壳

spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0

从 spark 结构流连接到 MSK 集群

val kafka = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092").option("subscribe", "jsontest") .load()

开始在控制台上阅读/打印消息

val df=kafka.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").writeStream.format("console").start()

或者

val df=kafka.selectExpr("CAST(value AS STRING)").writeStream.format("console").start()

在此处输入图像描述

在此处输入图像描述

v) 我可以通过 ksql 将流式 sql 与 MSK 一起使用吗?如何使用 MSK 设置 KSQL?

是的,您可以使用 MSK 集群设置KSQL 。基本上,您需要在 MSK 集群的同一 vpc/子网中启动一个 ec2 实例。然后在ec2实例中安装ksql server+客户端并使用。

首先设置 confluent repo 和 ec2 实例安全组(请参阅 -第 1 节:预安装或设置 - 附加 kafka 组件),然后安装/设置 Ksql 服务器/客户端。

之后安装ksql服务器

sudo yum install confluent-ksql 

创建文件名 ksql-server.properties 并添加以下内容-

bootstrap.servers=10.0.10.106:9092,10.0.20.27:9092,10.0.0.119:9092
listeners=http://localhost:8088

** 修改引导服务器 ips/url。

启动ksql服务器

ksql-server-start ksql-server.properties &

之后启动 ksql cli

ksql http://localhost:8088

最后运行命令以获取主题列表

ksql> SHOW TOPICS;

 Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups 
-----------------------------------------------------------------------------------------
 _schemas    | false      | 1          | 3                  | 0         | 0              
 jsontest    | false      | 1          | 3                  | 1         | 1              
----------------------------- --------------------------------------------------

有关更多信息,请参阅 - https://github.com/confluentinc/ksql

vi) 如何对流经 MSK 斯隆的数据进行实时预测分析?

做预测分析或实时机器学习的东西真的不是 MSK 特有的。您使用 kafka 集群(或任何流式管道)的方式同样适用于 MSK。根据您的确切要求,有多种方法可以实现,但我将描述整个行业中最常见或最广泛使用的一种:

  • 将 Spark 与 MSK (kafka) 结合使用,并通过结构流和 MLIB(拥有您的预测模型)进行分析。

  • 您可以在H20.ai框架中训练您的预测模型,然后将模型导出为 java pojo。然后将 java pojo 模型与 kafka 消费者代码集成,该代码将处理来自 msk(kafka) 主题的消息并进行实时分析。

  • 您可以在 sagemaker 中训练模型和部署,然后从 kafka 客户端消费者代码中调用,通过调用基于 kafka 数据/消息的 sagemaker 模型推理端点来获得实时预测。

vii) 与 Azure/confluent 中的其他基于云的 kafka 集群相比,MSK 的可靠性如何,以及与 vanilla kafka 相比的任何性能基准?集群中可以启动的最大代理数量是多少?

如您所知,MSK 处于预览阶段,因此现在谈论其可靠性还为时过早。但总的来说,与所有其他 AWS 服务一样,随着时间的推移,它应该会变得更加可靠,并有望提供新功能和更好的文档。

我不认为 AWS 或任何云供应商 azure,谷歌云提供他们服务的性能基准,所以你必须从你身边尝试性能测试。并且 kafka 客户端/工具(kafka-producer-perf-test.sh、kafka-consumer-perf-test.sh)提供了一个性能基准脚本,可以执行该脚本以了解集群的性能。同样,在实际生产场景中对服务的性能测试会因各种因素而有很大差异,例如(消息大小、传入 kafka 的数据量、同步或异步生产者、消费者数量等),性能将归结为具体用例而不是通用基准。

关于集群中支持的最大代理数量,最好通过他们的支持系统询问 AWS 人员。


第 1 节:预安装或设置 - 其他 kafka 组件:

在 MSK 集群的 vpc/子网中启动 Ec2 实例。

登录 ec2 实例

设置yum repo通过yum下载confluent kafka组件包

sudo yum install curl which
  sudo rpm --import https://packages.confluent.io/rpm/5.1/archive.key

导航到 /etc/yum.repos.d/ 并创建一个名为 confluent.repo 的文件并添加以下内容

[Confluent.dist]
name=Confluent repository (dist)
baseurl=https://packages.confluent.io/rpm/5.1/7
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1

[Confluent]
name=Confluent repository
baseurl=https://packages.confluent.io/rpm/5.1
gpgcheck=1
gpgkey=https://packages.confluent.io/rpm/5.1/archive.key
enabled=1 

下一个干净的 yum 回购

sudo yum clean all 

允许 ec2 实例的安全组在 MSK 集群安全组的入站规则中为端口 9092(连接代理)和 2081(连接 Zookeeper)。

第 2 节:获取 MSK 集群代理和 zookeeper url/ip 信息的命令

Zookeeper 连接 url 端口

aws kafka describe-cluster --region us-east-1 --cluster-arn <cluster arn>

代理连接 url 端口

aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn <cluster arn>

-------------------------------------------------- --------------------

笔记:

MSK 概述和组件设置:

在此处输入图像描述

  • 请参考 MSK 高级架构以及如何设置各种组件(rest、schema registry、sticky load balancer 等)。以及它将如何与其他 aws 服务连接。它只是一个简单的参考架构而已。

  • 此外,您还可以在容器内进行 dockerize,而不是在 ec2 实例上设置 rest、模式注册表和 ksql。

  • 如果您要设置多个休息代理,那么您需要将该休息代理服务放在粘性负载均衡器(如使用 ip 哈希的 nginx)后面,以确保相同的客户端消费者映射到相同的消费者组,以避免数据获取不匹配/不一致跨数据读取。

希望以上信息对您有用!!

于 2019-03-28T23:44:24.063 回答