问题标签 [streamparse]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
0 回答
363 浏览

nimbus - 带有 streamparse 的 Storm UI

我正在一个streamparse安装了 ZooKeeper 和 Nimbus 的 AWS 实例上开发一个项目。我想使用 Storm UI。我sparse submit使用以下 config.json 文件运行:

它给出了以下错误跟踪:

0 投票
0 回答
1297 浏览

clojure - 尝试调用未绑定的 fn

就我而言,streamparse api用于在本地运行并将代码提交到STORM集群,当我在本地运行它时没问题,但是当它提交到STORM集群时,我得到了

java.lang.RuntimeException: java.lang.IllegalStateException: 试图调用未绑定的 fn

clojure 版本是 1.5.1,storm 版本是 0.9.5,sparse 2.1.3

该错误是由于在 spout 执行中调用“call-to-outside-function”引起的,该函数也可以是 3rd 方库,代码如下:

以前有人有这个问题吗?

0 投票
0 回答
253 浏览

python - Streamparse/Python - 自定义 fail() 方法不适用于错误元组

我正在使用 Storm 实时处理来自 Kafka 的消息,并使用 streamparse 来构建我的拓扑。对于这个用例,我们必须 100% 保证进入 Storm 的任何消息都得到处理和确认。我已经使用 try/catch 在我的螺栓上实现了逻辑(见下文),除了将其写入 Kafka 中的另一个“错误”主题之外,我还想让 Storm 重播这些消息。

在我的 KafkaSpout 中,我将 tup_id 分配为等于我的消费者从中提供数据的 Kafka 主题的偏移量 id。但是,当我使用错误的变量引用在 Bolt 中强制出错时,我没有看到消息被重播。我确实看到有人写到“错误”Kafka 主题,但只有一次——这意味着元组永远不会被重新提交到我的螺栓中。我对 TOPOLOGY_MESSAGE_TIMEOUT_SEC=60 的设置,我希望 Storm 每 60 秒继续重播一次失败的消息,并让我的错误捕获继续写入错误主题,永远。

KafkaSpout.py

进程螺栓.py

对于如何正确实现这种情况下的自定义失败逻辑,我将不胜感激。提前致谢。

0 投票
0 回答
685 浏览

celery - Python Celery 和 Apache Storm 比较

要求是 Python 中的分布式任务处理和编程任务,以实现高消息率。Celery 和 Storm(使用 streamparse)如何比较以下原则:

  1. 可扩展性——不仅在工人方面,而且在经纪人的背景下。Celery 可以与水平可扩展的代理(分片 rabbitmq 集群)一起使用吗?Storm有这样的限制吗?

  2. 任务的灵活性——如果任务必须针对系统中的特定条件,那么当它下一次被工作人员吸收时,它很可能会被重新排队和处理。Storm 是否提供这样的功能?

  3. 工作流程——工作流程往往会变得复杂——一些是顺序的,一些是并行的,然后是它们的组合。这里哪个更好?

  4. 监控 - 对工作人员、他们的状态、他们的消息率的实时监控支持。出现错误时的警报功能。

  5. 易于部署

0 投票
1 回答
1019 浏览

python - 如何在streamparse spout中使用tweepy接受twitter流并将推文传递给bolt?

最近,我开始研究storm并且对python更加熟悉,我决定使用streamparse来处理storm。我打算在 spout 中接受一个 twitter 流并在 bolt 中执行一些计算。但我无法弄清楚如何在 spout 中编码。我已经阅读了各种流解析教程,但它们都显示了从静态列表中发出的 spout 元组,并且没有像 twitter 流 api 提供的流。这是我的风暴代码:

这是我的 tweepy 代码:

我应该如何整合这两个代码?

0 投票
0 回答
140 浏览

python - 风暴流解析喷口“完全延迟”始终为 0

我现在使用 streamparse 有一段时间了,但我被困在一个主题上。我们使用storm-0.10.0streamparse==2.1.4。我们让所有的默认值(no auto_anchor = False或类似的东西)。

我们没有在 spout 中实现 ack 或 fail 方法,也没有在 bolts 中调用任何 ack 或 fail 方法。

在螺栓方面,进程延迟和执行延迟看起来是正确的。进程延迟是>>执行延迟。

我担心的是spout Complete Latency我们可以在storm UI 中找到的。它总是在 0.00 。我读到完整的延迟是消息在拓扑中花费的总时间。因此,这是一个非常好的 KPI,因为它可以帮助检测大量工作人员、任务甚至节点。

在一个螺栓中,我做了一个 self.log(tup) 并且我确实有一个带有这种输出的 tuple.id :

name:engine-bolt Tuple(id=u'2213630928741732943',
component=u'request-spout-1',
stream=u'default',
task=24,
values= etc
...

那么我怎样才能获得一些关于 spouts 的“完全延迟”的值呢?

如果您还需要什么,请询问:)

0 投票
2 回答
201 浏览

apache-storm - StreamParse:IOError:本地端口:6627 已在使用中,无法打开到 nimbus.server.local:6627 的 ssh 隧道

设置:

  • 风暴 0.10.0
  • 流解析 2.1.4
  • 森托斯 6.5
  • Python 2.7(Streamparse 需要它)

(是的,我知道它们已经过时了,但是我无法使用 Storm 1.0,它只是被 streamparse 3 破坏了)

当我尝试从我的 nimbus 服务器或拓扑中的另一台服务器启动“streamparse submit”时,我收到以下错误:

“IOError:本地端口:6627 已在使用中,无法打开到 nimbus.server.local:6627 的 ssh 隧道。”

但是当然 6627 正在我的 nimbus 服务器上使用?它的 Thrify 端口。所以我尝试将 Thrifty 端口移动到 6637 并重新启动 Nimbus。但是我从提交它的客户那里得到了同样的错误:

IOError:本地端口:6627 已在使用中,无法打开到 nimbus.server.local:6627 的 ssh 隧道。

即使是 netstat tuanp 也显示 6627 表明在 nimbus 上的该端口或执行提交的盒子上没有任何东西在监听。

我感觉与 SSHD 配置和允许隧道有关,但 Nimbus 没有正确处理,并且在尝试建立隧道时给出了不正确的错误。

有没有其他人经历过这个?

0 投票
2 回答
33690 浏览

apache-kafka - Streamparse wordcount 示例

我一直想使用 Apache Storm 从 Kafka 流式传输。我对 Python 更熟悉,所以我决定使用 streamparse ( https://github.com/Parsely/streamparse )。字数统计示例是介绍性示例。我一直试图让它在我的本地机器上工作。我安装了以下版本的 JDK、lein 和storm:

在遵循 streamparse 之后,我运行以下步骤:

我收到以下错误:

我的 project.clj 文件如下所示:

因此,我的 lein 和storm core 版本设置正确。我不确定我哪里出错了。有人可以帮帮我吗?

-谢谢

0 投票
2 回答
103 浏览

python - 如何从源代码安装 streamparse?

我需要在没有 Internet 访问权限的 CentOS 机器上使用 streamparse,这意味着我不能使用 pip。我可以使用的唯一启用网络的服务是 scp 和 ssh。我的计划是在我的本地机器(Ubuntu)上获取流解析,然后将流解析文件 scp 到 CentOS 机器并从那里手动安装。

关于如何做到这一点的任何想法?

编辑:由于这是“搁置为题外话”,我将通过解决社区帮助页面(https://stackoverflow. com/help/on-topic)。

  1. 一个具体的编程问题:安装是一种编程问题,尤其是当你必须编写(程序,动词)shell脚本(程序,名词)来完成软件的安装时,会导致更多的编程。

  2. 软件算法:我正在寻找一系列步骤(也称为算法)来在指定的技术限制内安装某些东西。

  3. 程序员常用的软件工具:我要安装的东西是软件工具。它被称为流解析。它被程序员使用。

  4. 软件开发特有的一个实用的、可回答的问题:我问这个问题并不是出于理论上的原因——因此它是实用的,而且我相信通过绕过防火墙来安装东西是软件开发所特有的。我承认这不能被视为“软件开发”,而是“devops”,但是这两件事正在合并,所以在这里给我一个骨头。

0 投票
1 回答
226 浏览

apache-kafka - 风暴批处理后向kafka提交偏移量

当批处理螺栓完成处理批处理时,仅提交每个分区的最高偏移量的正确方法是什么?我主要担心的是机器在处理批次时会死机,因为整个 shebang 将在 AWS 现场实例中运行。

我是 Storm 开发的新手,我似乎找不到 IMO 的答案是非常直接地使用 kafka 和storm。

设想:

基于保证消息处理指南,假设我有一个元组的蒸汽(kafka 主题),("word",count)处理 X 元组的批处理螺栓,进行一些聚合并创建 CSV 文件,将文件上传到 hdfs/db 和 acks。

在非 strom“天真”实现中,我会读取 X msgs(或读取 Y 秒),聚合,写入 hdfs,一旦上传完成,将每个分区的最新(最高)偏移量提交给 kafka。如果机器或进程在 db 提交之前死亡 - 下一次迭代将从前一个位置开始。

在暴风雨中,我可以创建批处理螺栓,它将锚定所有批处理元组并立即确认它们,但是我找不到将每个分区的最高偏移量提交给 kafka 的方法,因为喷口不知道批处理,所以一旦批处理螺栓确认元组,每个喷口实例都会一个接一个地确认他的元组,所以我可以按照我的看法:

  1. 在 spout 的每个 ack 上提交 acked 消息的偏移量。这将导致许多提交(每批可能是几 K 的元组),可能是乱序的,如果在提交偏移量时 spout 工作死了,我最终将部分替换一些事件。
  2. 与 1 相同。但我可以在提交的最高偏移量中添加一些本地偏移量管理(修复无序偏移量提交)并提交每隔几秒看到的 highets 偏移量(减少大量提交)但我仍然可以部分结束如果 spout 死亡,提交的偏移量
  3. 将偏移量提交逻辑移动到螺栓 - 我可以将每条消息的分区和偏移量添加到发送到批处理螺栓的数据中,并将每个分区的最高已处理偏移量作为批处理的一部分提交(发送到“偏移提交者”螺栓处批次结束)。这将解决偏移跟踪、多次提交和局部重播问题,但这会为螺栓添加特定于 kafka 的逻辑,从而将螺栓代码与 kafka 相结合,一般来说,在我看来,这似乎是在重新发明轮子。
  4. 更进一步的轮子改造和手动管理 ZK 中最高处理的 patition-offset 组合,并在我初始化 spout 时读取这个值。