问题标签 [windowing]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
pyspark - 过去 30 天关于数据子集的 Pyspark 窗口
我有一个有效的 Pyspark Windowing 函数 (Spark 2.0),它需要过去 30 天 (86400*30) 秒,并计算每个 ID 列“a”中每个操作发生的次数。我应用此函数的数据集在“2018-01-01”和“2018-04-01”之间每天都有多条记录。因为这是一个 30 天的回顾,我不想将此函数应用于没有完整 30 天回顾的数据。为方便起见,我想从 2 月 1 日开始计算。我不能过滤掉一月份,因为二月份的计数需要它。我知道我可以在新数据框上添加一个过滤器并在 2 月份之前过滤掉数据,但是有没有办法在没有额外步骤的情况下做到这一点?不必进行可以节省时间的计算会很好。
这是代码:
当前数据集的模型。我不想手动转换col ts,所以我写了一个替代品。
用我编造的样本数据。我想返回以下行
相反,我得到了这个:
hive - 使用 Hive 中的值计算连续的日期范围
我想知道是否可以计算一组 Id 的特定值的连续范围并返回每个 Id 的计算值。给定以下数据:
我想要以下输出:
在这种情况下,范围是信用小于 1 的连续天数。如果 date_key 列之间存在间隙,则范围不必采用下一个值,例如 ID 1 中介于 8096 和 8098 之间的日期键。是否可以使用 Hive 中的窗口功能来做到这一点?
提前致谢!
javascript - react-virtualized - 带有 Masonry 的 InfiniteLoader 不起作用
我在 Masonry 上使用 react-virtualized 的例子(这里)
它有效。但是现在我正在尝试将 InfiniteLoader 添加到示例中,但无法使其正常工作。
我尝试过的事情:
- 不使用 WindowScroller 并定义我自己的高度
最初我认为它可以通过删除 WindowScroller 来工作,但从未调用loadMoreRows回调。
也永远不会调用isRowLoaded回调。
一段代码演示了它是如何实现的:
从_renderAutoSizer就像示例一样。没有变化。Masonry 按预期工作,但无法实现无限滚动。
我知道我需要以某种方式使用 onRowsRendered 。但是这些示例显示了列表和网格的用法。Masonry 似乎无法与 InfiniteLoader 连接。
apache-flink - 获取处理迟到事件的前一个窗口值
我正在寻找一种方法来设置窗口以允许延迟,并让我根据为会话计算的先前值计算值。
我的会话值总体上是一个唯一标识符,并且永远不会发生冲突,但从技术上讲,会话可以随时进入。在大多数会话中,大多数事件的处理时间超过 5 分钟,允许迟到 1 天应该满足任何迟到的事件。
对于每个会话,我都会找到一个字段的最大值,并检查一些事件是否没有发生(如果它们确实发生了,它们将使最大值字段为零)。我决定创建一个ProcessWindowFunction
来做到这一点。
在允许延迟事件之前,这可以正常工作。当一个迟到的事件发生时,maxValue
重新计算并HttpSink
再次输出。我正在寻找一种方法来计算 previousmaxValue
和 late的 delta maxValue
。
我正在寻找一种方法来确定:
- 如果对函数的调用来自迟到的事件(我不想重复计算会话总数)
- 新数据是什么,或者如果有办法存储之前的计算值。
对此的任何帮助将不胜感激。
编辑:用于 ValueState 的新代码
KafkaConsumer.scala
会话处理器.scala
apache-flink - 扩展窗口
我有一个始终是一个应用程序,监听 Kafka 流并处理事件。事件是会话的一部分。我需要根据会话数据进行计算。由于我的会话时间长,我在尝试正确运行我的计算时遇到了问题。我 90% 的课程在 5 分钟后完成。99% 在 1 小时后完成。会话可能持续一天以上,由于这是一个实时系统,没有确定的结束。会话是独一无二的,并且显示永远不会发生冲突。
我正在寻找一种可以多次处理窗口的方法,或者使用初始等待期并在此之后处理任何以后的事件,或者每个事件类型结构的纯进程。我需要保留所有以前的事件(ListState),以及以前处理的值(ValueState)。
我以前认为allowedLateness
可以让我这样做,但似乎迟到只在应该处理事件时才考虑,它不会扩展实际的窗口。 GlobalWindows
也可以工作,但我不确定是否有办法多次处理一个窗口。我相信我可以在一段时间不活动后使用evictor
withGlobalWindows
来清除 Windows(尽管我承认,我还没有研究过这个,因为我不确定如何GlobalWindow
多次触发。
任何关于如何实现我想要做的事情的建议将不胜感激,我也很乐意澄清任何需要的点。
google-cloud-dataflow - Apache Beam:自定义窗口(windowfn)
大师 - 我是 Apache Beam 的新手,正在尝试实施,这似乎是一个非常简单的用例。我有股票数据,我需要找到过去 10 次交易中股票的滚动平均价格。
现在由于没有固定的持续时间可以发生 10 个事务(有时可能是几毫秒,有时可能是几秒),我认为我不能使用基于时间的窗口。我有两个问题:
- 这是 Beam 的有效用例还是我在这里遗漏了一点?
- 是否有一种相当简单/合法/非hack的方式来编写一个可以根据记录数窗口数据的窗口函数/类(在python sdk中)?
我已经看到了在记录上伪造时间戳数据的建议,这样每个到达的记录看起来就像是相隔一秒创建的,但我发现这有两个问题:
一个。这确实是一个 hack 解决方案,它似乎不适合像梁这样的东西,它应该是如此强大和优雅的架构
湾。如果您首先要通过使用程序顺序添加假时间戳来扼杀性能,那么使用高性能 Beam 管道(无服务器)有什么意义?
想知道 Beam 中的窗口是否可能是一个更优雅的解决方案
pyspark - Spark:指望一个窗口不工作毫秒
您可以创建一个窗口来计算过去 7 天内记录发生的次数。但是,如果您尝试查看记录在毫秒级别上出现的次数,它就会崩溃。
简而言之,下面的函数df.timestamp.astype('Timestamp').cast("long")
只将时间戳转换为一秒到一个长。它忽略毫秒。您如何将整个时间戳(包括毫秒)转换为 long。您需要将值设置为 long 以便它可以与窗口一起使用。
计数应该是 0,1,2,3,4,5... 而不是 0,0,2,2,4,4,...
scala - 在标签列上使用窗口函数后 XGBoost 失败
我已经成功训练了一个 XGBoost 模型,其中trainDF
一个数据框包含两列:features
并且label
我们有 11k 1 和 57M 0(不平衡数据集)。一切正常。
然后,我想通过一些窗口来更改 y 标签,以便在每个组中,我可以更早地预测 y 标签。
结果有 57M 的 0 和 214k 的 1(尽管行数大致相同)。的列中没有NA
s并且类型仍然是。然后 xgboost 失败:"label"
trainDF
double (nullable=true)
我可以根据需要包含日志。我的困惑是使用窗口功能并且实际上不更改任何其他设置会导致 XGB 失败。我将不胜感激对此的任何想法。
apache-flink - 如何在 Flink Stream Processing Windowing 中收集迟到的数据
考虑我有一个包含事件时间数据的数据流。我想在 8 毫秒的窗口时间内收集输入数据流并减少每个窗口数据。我使用以下代码执行此操作:
Point:数据流的key是处理时间的时间戳,映射到处理毫秒的时间戳的最后8个约数,例如1531569851297
将映射到1531569851296
。
但是有可能数据流到达较晚并进入了错误的窗口时间。例如,假设我将窗口时间设置为 8 毫秒。如果数据按顺序进入 Flink 引擎,或者至少延迟小于窗口时间(8 毫秒),那将是最好的情况。但是假设数据流事件时间(也是数据流中的一个字段)以 30 毫秒的延迟到达。所以它会进入错误的窗口,我想如果我检查每个数据流的事件时间,因为它想进入窗口,我可以过滤这么晚的数据。所以我有两个问题:
- 我如何过滤要进入窗口的数据流并检查是否在窗口的正确时间戳处创建的数据?
- 如何在变量中收集如此晚的数据以对它们进行一些处理?
java - 使用 Kafka Streams 对数据进行窗口化并同时处理每个窗口
我想要实现的目的是按用户对我从 Kafka 主题收到的一些消息进行分组并将它们窗口化,以便汇总我在(5 分钟)窗口中收到的消息。然后我想收集每个窗口中的所有聚合,以便立即处理它们,并将它们添加到我在 5 分钟间隔内收到的所有消息的报告中。
最后一点似乎是困难的部分,因为 Kafka Streams 似乎没有提供(至少我找不到它!)任何可以在“有限”流中收集所有与窗口相关的东西以在一个地方处理的东西.
这是我实现的代码
结果是这样的
每个窗口都有许多日志行,它们与其他窗口混合在一起。
我想要的是这样的:
其中方法过程类似于:
结果将像这样分组(每个报告都是对我的回调的调用:void process(...)),并且每个窗口的提交将在处理整个窗口时提交: