问题标签 [apache-pulsar]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
892 浏览

java - Apache Beam:Kafka 消费者一遍又一遍地重启

我有这个非常简单的 Beam 管道,它从 Kafka 主题读取记录并将它们写入 Pulsar 主题:

据我了解,这应该恰好创建一个 Kafka 消费者,将其价值推向管道。现在由于某种原因,管道似乎一遍又一遍地重新启动,创建了多个 Kafka 消费者和多个 Pulsar 生产者。

以下是显示正在创建多个 Kafka 消费者的日志的摘录:

为什么 Kafka Consumers 会一遍遍重启?这是预期的行为吗?

0 投票
1 回答
1369 浏览

apache-pulsar - ApachePulsar:如何获取命名空间“sample/standalone/ns1”的策略?

我正在使用 Pulsar Admin Rest API 并希望从文档中使用 API 获取命名空间的策略: /admin/v2/namespaces/{tenant}/{namespace}

Pulsar 有 2 个默认租户:public、sample。由此,我得到:

  • public: public/default, public/functions.
  • sample: sample/standalone/ns1.

我们可以理解为public/default:namespace name 是default,tenant 是public

问题在于sample/standalone/ns1,租户是sample,命名空间是standalone/ns1名称,不是吗?如果是这样,我如何从中获得政策?

我尝试了上面的 API/admin/v2/namespaces/sample/standalone/ns1但得到了405: Method not Allowed

我希望有人可以向我解释这个问题。谢谢!

0 投票
1 回答
939 浏览

java - Test apache pulsar functions in an embedded standalone environment

For testing I have managed to run an embedded standalone pulsar server and client. I also can send and receive messages. However I actually want to (integration-)test functions (implementing org.apache.pulsar.functions.api.Function). How can I register functions in the embedded setup?

0 投票
1 回答
2204 浏览

apache-kafka - 如何设计实时股价的发布/订阅架构

我有一个发布实时财务数据的外部系统(例如来自世界各地交易所的股票报价和价格)。

这个外部系统对每个帐户连接的股票数量有一些限制,因为我们有许多应用程序需要使用这些实时流数据,所以我们不希望每个应用程序都连接到那个外部系统并自己管理容量,因此我们想设计一个单一的系统,对所有股票进行消费,然后发布到某个消息队列(例如 kafka 或 pulsar),然后下游应用程序可以从 kafka 主题中消费。

问题是我们如何设计主题,股票的数量在 1000 万左右,但每个应用程序只对其中的子集感兴趣,子集大小可以大也可以小,不同的子集可以共享相同的股票。

我能想到的是动态创建一些流式作业(例如 kafka 流式传输或单独的 flink 作业来进行预聚合以从所有主题中为每个消费者收集感兴趣的股票,然后为每个消费者发布到另一个主题),在这个这样每个消费者都会有自己的话题,只有自己感兴趣的股票,但肯定会带来消息传输时间、重复消息和延迟的开销,除此之外,如果消费者越来越多,容量也可能会出现问题。

不知道有没有更好的方法可以达到我的要求,请指教,谢谢。

0 投票
0 回答
154 浏览

function - 如何在 Apache Pulsar 中为 docker 创建函数

步骤1:

在容器 pulsar 中复制 Jar:

第2步:

在容器 pulsar 中创建函数:

问候

0 投票
2 回答
628 浏览

apache-pulsar - Apache Pulsar - ioThreads/listenerThreads 和消息排序

我们正在开发一个应用程序,该应用程序需要严格按顺序处理具有相同密钥的消息。另外,出于性能/吞吐量的原因,我们需要引入并行处理。

并行化很容易——我们可以让单个线程接收消息,计算密钥的哈希值,并使用哈希 % 的工作人员将消息分发到特定的阻塞队列,另一侧有工作人员。这保证了具有相同密钥的消息被分派给同一个工作人员,因此保证了排序 - 只要接收者按顺序获取消息。

问题是:

  1. 增加 ioThreads 和 listenerThreads(默认 = 1)是否会对性能产生影响,即我们是否应该期望看到更多消息流过,或者 I/O 是否总是限制因素?

  2. 如果我们增加它们,我们仍然保证订购吗?

Pulsar 文档不清楚...

0 投票
2 回答
1214 浏览

python - 脉冲星客户端无法安装

我根本无法安装pulsar-client,因为它在文档中提到:

收集 pulsar-client 错误:找不到满足 pulsar-client 要求的版本(来自版本:无) 错误:没有找到 pulsar-client 的匹配分布

收集 pulsar-client==2.4.0 错误:找不到满足 pulsar-client==2.4.0 要求的版本(来自版本:无) 错误:没有找到 pulsar-client==2.4.0 的匹配分布

我在 Windows 10python 3版本上。

尝试了python2 和 3 版本。

0 投票
1 回答
2598 浏览

database - Debezium 错误,此连接器不知道架构

我有一个使用 Debezium 的项目,主要基于此示例,然后将其连接到 Apache Pulsar。

我改变了一些配置。该文件现在如下所示:

正如您可能理解的那样,我正在尝试做的是监视数据库history_tableproject_table修改,然后将有效负载写入 Apache Pulsar。

我的问题如下。在我使用的任何快照模式下,当写入偏移量时,我无法重新启动 Debezium 而不会在下一次数据库更新时出现错误。

它只发生在现有offset.dat文件中。我认为这是因为offset.dat文件中的架构为空。以这个为例:

我首先怀疑用于使 JSON 更简洁的schemas.enable=falseinclude.schema.changes=false参数,但它们的值不会改变offset.dat文件中的任何内容。

0 投票
1 回答
293 浏览

java - Apache pulsar java.lang.IllegalArgumentException

我正在开发一个 apache pulsar 集群,它运行良好,但由于某种原因,当我尝试发送消息时,我开始收到以下消息:

ERROR [id: 0xc5de4911, L:/127.0.0.1:55672 - R:localhost/127.0.0.1:6650] Close connection becaues received internal-server error java.lang.IllegalArgumentException: bound must be positive

我也收到以下错误:

我知道该主题已创建,因为我使用 pulsar-admin cli 对其进行了双重检查。我不知道从哪里开始寻找任何人都可以指出我的地方。

0 投票
2 回答
1246 浏览

message-queue - 如何避免在 Apache Pulsar 中自动删除非活动主题

我有一个应用程序在特定主题下向 Pulsar 生成消息,并在完成后关闭应用程序;同时,不存在阅读该主题的消费者。

过了一段时间,当我创建一个消费者,想把写入的数据读出来的时候,我发现我写的主题被Pulsar删除了,所有的数据都丢失了。

如何禁用 Pulsar 中非活动主题的自动删除?