问题标签 [stream-processing]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
223 浏览

c++ - 从 bash 拆分的流是否可以转置为其他语言?

通过分流,我的意思是能够:

  1. 通过第一个函数动态过滤流内容
  2. 流的一部分由第二个函数处理
  3. 流的其余部分由第三个函数处理
  4. 流永远不会存储(在运行中)

一个例子有时比一个冗长的解释更好。此命令行使用tee处理替换来拆分流:

在这个例子中,流被分成两部分:包含的行"AB"和其余的:

但我不喜欢这种流拆分技术,因为流首先被复制(by tee)然后被过滤两次(bygrepgrep -v)。

因此,我想知道在​​ 、等其他语言中是否有类似流拆分的东西......

我在下面提供了一个更复杂的示例。


复杂bash的分流

counter.sh将流分成三个部分(开始、中间和结束)。对于每个部分,流再次被拆分以计算符号的出现次数<|>

此脚本用于计算开始/中间/结束部分中添加/更改/删除的行数。这个脚本的输入是一个流:

counter.sh如何在不将数据存储在临时缓冲区中的情况下在其他编程语言中有效地实现此类?


回答

正如Lennart Regebro所指出的,我在想这个问题。当然,所有这些语言都能够按照ysth的回答拆分输入流。在伪代码中:

结论:流拆分在 /// 中pythonperl使用+进程替换更好awkC++tee

0 投票
1 回答
166 浏览

webgl - WebGL 一对多数据处理

是否有任何使用 WebGL 的方案允许将一条数据记录处理为以前未知数量的记录?

以 OpenGL 为例,几何程序可用于根据属性对顶点进行乘法运算,从而输出未知长度的数据。

以同样的方式使用 WebGL 是否有任何技巧,或者这只能在 JavaScript 端实现?

0 投票
2 回答
189 浏览

java - 如何使用 JSR 353(用于 JSON 处理的 Java API)将对象附加到 json 文件

使用 JSR-353 ( https://jsonp.java.net/index.html ) 我想打开一个 json 文件并在根数组中附加一些对象,例如:

我想要这样的代码:

并最终获得:

注意:我不想在内存中加载完整的 json 来附加我的数据。

0 投票
1 回答
316 浏览

amazon-web-services - 在云端生成计时器事件

我正在尝试解决在亚马逊云上为我的应用程序生成分布式计时事件的问题:

服务器收到一条消息。因此,系统必须在 X 分钟内完成某些操作。我的问题是系统需要在高峰时间每秒处理数百万条消息。此外,在该时间间隔内,收到消息的服务器可能会崩溃。所以我正在寻找一种分布式解决方案,它可以接收一条消息,然后在几分钟后发出另一条有保证的消息。

我可以自己设计一个分片系统,但我希望一些分布式流框架可以轻松地做到这一点。但到目前为止我发现的是那些立即完成交易的。

0 投票
0 回答
374 浏览

algorithm - count-min 草图数据结构的重要用法

我有一个值不断增加的大数组 - 像这样:

我想在上面使用插值搜索算法。数组的大小是可变的,新元素被添加到数组的末尾。

我需要找到某个元素的索引,我们称它为 X。

Y 必须是数组中元素的索引,这样 array[Y] >= X find可以使用二进制搜索来实现,但由于某些复杂的原因,我想使用插值搜索来实现它。插值搜索试图通过查看数组的边界来猜测 X 的正确位置。如果第一个数组值是 0,最后一个是 100,我想找到值 25 的位置,如果数组长度是 1000,我需要先查看索引 250 处的值。如果数组的值是均匀分布的,这很有吸引力。但如果它们分布不均匀,插值搜索的工作速度可能比二分搜索慢(可能有一些优化)。

在这种情况下,我正在尝试使用Count-Min Sketch数据结构来加快搜索速度。当我将新元素附加到数组时,我只是将一些数据添加到 count-min 草图数据结构中。

使用这种方法,我可以大致猜测搜索到的元素 X 的位置。如果猜测正确,这可能会导致搜索速度加快,但我遇到了一些问题。

  1. 我不知道 X 是否在数组中并且我count_min_sketch已经看到了这个值。如果是这种情况,我可以从数据结构中获得正确的值count_min_sketch。如果不是 - 我将得到 0 或其他值(最坏的情况)。

  2. 碰撞。如果我的对象已经看到了值 X,那么count_min_sketch我会得到正确的值或更大的值。如果 count min sketch 用于计算文档中的单词出现次数 - 这不是问题,因为碰撞很少见并且错误小于或等于碰撞次数(它通常像这样使用:count_min_sketch.add(Z, 1))。就我而言,每次碰撞都可能导致大错误,因为我通常为每个键添加大量数字。

是否可以以这种方式使用 count-min 草图(每次添加大量条目)?

0 投票
2 回答
901 浏览

c++ - 如何在 C++ 中编写自定义流转换?

在使用 Haskell 和函数式语言进行了大量工作之后,我正在学习 C++,我发现我一直在尝试解决同样的问题:

  • 从输入流中读取一些数据
  • 根据特定算法对它们进行标记
  • 处理令牌

如果这是 Haskell,我可以简单地利用一切都是惰性的这一事实,并根据我的想法编写我的转换,然后在下游被消耗时应用它。甚至有一些库可以执行这种精确的模式(管道管道)。

假设我想获取序列1 2 3 4 5 6 ...和输出12 34 56 ...。我可以看到如何编写对流进行操作并就地处理数据的临时代码。但我想知道是否有一种抽象机制允许我通过转换来自另一个流的数据(以任何可能的方式)来构建一个新流。这种抽象应该允许我在处理数据时缓冲数据,而不仅仅是将单个元素简单映射到新值。

以下是限制:

  • 除了stdlib之外,我不能使用任何其他库。
  • 它必须在 C++03 上工作(意味着没有 C++11 功能。)

如果你在想,这是作业吗?好吧,我得到了很多课堂作业,这些作业需要我处理数据流(这是没有库和 C++03 限制的原因)。并不是我不知道如何使用while循环来做到这一点,而是我想知道stl中是否存在现有的流抽象,只是等待被发现和使用。

但如果这样做的唯一方法是使用 C++11,那么我想知道。

0 投票
2 回答
210 浏览

distributed-computing - 流处理引擎的并行行为

我一直在学习 Storm 和 Samza,以了解流处理引擎的工作原理,并意识到它们都是独立的应用程序,为了处理事件,我需要将其添加到也连接到流处理引擎的队列中。这意味着我需要将事件添加到队列(这也是一个独立的应用程序,比如说 Kafka),Storm 将从队列中选择事件并在工作进程中处理它。如果我有多个螺栓,每个螺栓将由不同的工作进程处理。(这是我不太了解的事情之一,我看到一家公司在生产中使用了 20 多个螺栓,并且每个事件在某个路径的螺栓之间转移)

但是我真的不明白为什么我需要如此复杂的系统。这些进程涉及太多的 IO 操作(我的程序 -> 队列 -> 风暴 ->> 螺栓),这使得控制和调试它们变得更加困难。

相反,如果我从 Web 服务器收集数据,为什么不直接使用同一个节点进行事件处理呢?这些操作已经通过我用于 Web 服务器的负载均衡器分布在节点上。我可以在相同的 JVM 实例上创建执行器,并将事件从 Web 服务器异步发送到执行器,而不涉及任何额外的 IO 请求。我还可以观察 Web 服务器中的执行程序,并确保执行程序处理了事件(至少一次或完全一次处理保证)。这样,管理我的应用程序会容易得多,并且由于不需要太多的 IO 操作,因此与通过网络将数据发送到另一个节点的其他方式相比,它会更快(这也是不可靠的)并在该节点中处理它。

很可能我在这里遗漏了一些东西,因为我知道许多公司都在积极使用 Storm,而且我认识的许多人推荐使用 Storm 或其他流处理引擎进行实时事件处理,但我就是不明白。

0 投票
1 回答
272 浏览

jms - spring xd jms 源中的 Activemq 通配符支持?

我',使用带有spring xd 1.1的activemq 5.9。我想订阅 activemq 主题。所以,我正在使用 jms 源。Activemq 支持 * , > 等通配符。> 用于递归匹配从此名称开始的任何目标我在订阅 jms 源时尝试使用此通配符,如下所示:
stream create --name streamname --definition "jms --clientId=1 --destination=springin.> --durableSubscription=true --pubSub=true --subscriptionName=streamName | null"

错误:命令失败 org.springframework.xd.rest.client.impl.SpringXDException: XD133E:(pos 42): 'queue' 或 'topic' 的预期频道前缀,但发现 '--' jms --clientId=1 --目的地=springin.> --durableSubscription=true --pubSub=true --subscriptionName=streamName | 无效的

但是当我尝试这样的事情时:
stream create --name streamname --definition "jms --clientId=3 --destination=springin.*.tp1 --durableSubscription=true --pubSub=true --subscriptionName=streamName | null"
正在创建流。

所以支持'*'但'>'不起作用。我想使用这个 '>' 通配符。有什么解决方法吗?

0 投票
1 回答
251 浏览

java - 处理java管道流中的异常

我有一个 xmlStream,我正在使用 .xml 将其转换为 jsonStream org.apache.wink.json4j.utils.XML。这是代码

当 XML.toJson 抛出异常时,我看到主线程没有退出。我该如何处理?你们认为这是将 XML 流转换为 Json 流以进行进一步处理的好方法吗?我真的很感激任何建议。非常感谢!

0 投票
1 回答
643 浏览

python - Streamparse 不断调用 next_tuple

我正在尝试使用 Streamparse 在 Python 中编写一个简单的 Storm 拓扑。除了我写的简单的 Kafka spout 之外,一切都对我有用——它似乎只是不断地调用“next_tuple”。我的螺栓相当慢,所以系统似乎很快在内存中爆炸。

启动拓扑时,我尝试将 topology.max.spout.pending 设置为 1,以防止它向拓扑中添加太多消息。

然而,结果仍然是这样,尽管螺栓要慢得多:

我简单的卡夫卡喷口:

我的螺栓只有默认配置,但需要大量时间才能完成 process() 方法。我无法弄清楚它们是如何成为问题的,但如果它们相关,我可以发布。