问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
308 浏览

etl - Streamsets 数据收集器:将字段替换为其子值

我有这样的数据结构

我想使用 Streamsets Data Collector将updated_at字段替换为其字段值。unix据我所知,可以使用字段替换器来完成。但我仍然不明白如何制作映射表达式。我怎样才能做到这一点?

0 投票
1 回答
417 浏览

python-3.x - 有没有一种方法可以让每日 DAG 依赖于每周(周末)DAG?

我有这些 Dags DAG_A(每天运行)、DAG_B(周一至周五运行)和 DAG_C(周六和周日运行),其中 DAG_A 依赖于 DAG_B 和 DAG_C。我尝试使用外部任务传感器设置依赖关系,但每次我的调度程序停止运行并且任何 Dags 都没有成功

0 投票
1 回答
2695 浏览

docker - Google Cloud Composer 与 Docker 上的 Airflow

我找不到太多关于在 Google Cloud Composer 与 Docker 上运行 Airflow 的区别的信息。我正在尝试将我们目前在 Google Cloud Composer 上的数据管道切换到 Docker 上,以便仅在本地运行,但我正在尝试概念化其中的区别。

0 投票
1 回答
1031 浏览

python - 将图像从磁盘添加到 TensorFlow 数据集

我正在使用 Tensorflow Datasets 的tfds.load函数来加载我的数据:

现在,我在本地电脑上多了一些猫和狗的图片(例如Cat1.jpg)。我想将它们添加到这些数据中。我怎样才能做到这一点?

请注意,我不仅有一个文件,而且还有很多,而且这只是一个二进制分类示例;同样的问题也适用于多类分类,所以最好也有一个解决方案。

更新:我尝试了不同的方法,比如尝试使用 tf-nightly 和 tf.keras.preprocessing.image_dataset_from_directory 读取图像,但是,不幸的是,这并不容易。存在很多问题,例如生成的数据集处于不同的 dtype 中,无法与原始数据集合并。我没有解决这个问题的办法。我为此付出了很多,因为我真的需要详细的代码,一个可行的解决方案,而不仅仅是一些在理论上如何实现这一点的一般想法。我不需要 image_dataset_from_directory 的解决方案,如果有人有任何解决方案,详细的代码可以工作,我很好。

我不想发布任何代码,因为我认为有更好的方法来解决这个问题。但是,请找到我在这里尝试的方式(在 colab 中):

tmp 中有一个 Test 文件夹。一个子文件夹 cat 和另一只狗。包括一些来自搜索猫和狗的随机图片。

结果 train_ds 是一个<BatchDataset shapes: ((None, 224, 224, 3), (None,)), types: (tf.float32, tf.int32)>

例如 raw_train 是一个<DatasetV1Adapter shapes: ((None, None, 3), ()), types: (tf.uint8, tf.int64)>.

ds 现在是<DatasetV1Adapter shapes: ((224, 224, 3), ()), types: (tf.uint8, tf.int64)>

不能解决它,因为数据没有正确匹配/连接。此外,在多类情况下,我无法控制检查标签的匹配。

所以我不需要任何关于如何在理论上实现这一点的一般想法。我需要一个详细的工作解决方案,详细的代码。不仅如此示例中的二进制文件,我还需要它来处理多类问题,因为我也有这个问题。那么如何在多类情况下将“读入标签”与 tfds.load 产生的标签匹配。没有匹配错误,例如混合课程左右。例如,猫变成了马(在猫、狗和马的情况下)。

第二种方式:我还尝试将 ImageDataGenerator 直接指向 raw_train 数据集。如果这可行,我通常可以继续使用 ImageDataGenerator,尽管我实际上并不想要这个。所以我只想将图像添加到 raw_train 数据集。我试过这个:

然后匹配/连接这些数据生成器的结果。但不可能只在 raw_train 上指出这一点,它会给出错误。

0 投票
1 回答
597 浏览

python-3.x - 张量流中的 tf.data.Dataset 中的填充

代码:

给出以下错误:

但是,当我使用:

上面的代码工作正常。这使我得出结论,有些地方出了问题:13-tf.shape(x)[0]但无法理解是什么。我尝试将其转换tf.shape(x)[0]int(tf.shape(x)[0]),但仍然出现相同的错误。

我想要代码做什么:我有一个tf.data.Dataset对象具有可变长度的大小序列,(None,128)其中第一个维度(无)小于 13。我想填充序列,使每个集合的大小为 13 即(13,128)。有没有替代方法(如果上述问题无法解决)?

0 投票
1 回答
57 浏览

python - 搭建端到端的数据分析平台

我需要创建一个端到端的平台:

  1. 输入数据收集和存储 - 数据将通过 FTP 定期收集并存储在云端。
  2. 数据分析 - 将分析数据(使用 Tableau/任何其他分析软件)
  3. 报告 - 根据不断变化的输入数据生成每日和每月报告。

我需要决定我可以为这个解决方案使用哪种工具/技术,以及如何在线托管脚本/进程以使它们 24*7 运行。

我的一个想法是使用 AWS 和 Tableau:

  1. 对于数据收集 - 在 Amazon ECS 控制台中安排 cron 作业
  2. 存储 - 存储在 AWS 上 - 可能是 Amazon S3 存储)但是我可以选择在加载到 Amazon S3 之前格式化/更改数据吗?
  3. 使用 Tableau 创建仪表板。但这需要多个 Tableau 会话,因为不同报告的数据源会有所不同。此外,我需要定期将这些报告/仪表板以电子邮件格式发送给用户。可以在 AWS 中完成吗?

但是,我相信 AWS 解决方案对公司来说成本很高。还有哪些其他可能的解决方法?

抱歉,我是这个领域的新手,很难找到一个具有成本效益的解决方案。

0 投票
0 回答
93 浏览

apache-flink - 可行的流媒体建议 | 是否可以将 Apache Nifi + Apache Beam(在 Flink 集群上)与实时流数据一起使用

所以,我对我尝试使用的所有 Apache 框架都非常陌生。我希望您对 IoT 流应用程序的几个工作流程设计提出建议:

  1. 由于我们有可用于 Flink 的 NiFi 连接器,因此我们可以轻松地在 Flink 上使用 Beam 抽象。我可以使用 NiFi 作为数据流工具,将数据从 MiNiFi 驱动到 Flink Cluster(这里将其存储在内存中或其他东西中),然后使用 Beam Pipeline 进一步处理数据。
  2. 是否有任何用于光束的 NiFi 连接器?如果不能,我们可以这样做吗?因此,我们直接将数据从 NiFi 流式传输到 Beam 作业(在 Flink 集群上运行)

我仍处于早期设计阶段,如果我们可以讨论可能的解决方法,那就太好了。如果您需要任何其他详细信息,请告诉我。

0 投票
2 回答
397 浏览

python - 作为 Kedro 节点的 Jupyter 笔记本

如何将 Jupyter Notebook 用作 Kedro 管道中的节点?这与将 Jupyter Notebooks 中的函数转换为 Kedro 节点不同。我想要做的是使用完整的笔记本作为节点。

0 投票
1 回答
142 浏览

python - 使用拆分 API 拆分数据集时出现“AssertionError:无法识别的指令格式” - Tensorflow2.x

请阅读给定的问题。

您需要使用原始cats_vs_dogs 数据的子集,这些数据完全在“train”拆分中。IE 'train' 包含 25000 条记录和 1738 张损坏的图像,总共有 23262 张图像。

你会把它分开得到

  • 前 10% 作为“新”训练集
  • 最后 10% 作为新的验证集和测试集,从中间分开(即最后 10% 的前半部分是验证集(前 5%),后半部分是测试集(后 5%))

这 3 个记录集应分别称为 train_examples、validation_examples 和 test_examples。

注意:记得使用cats_vs_dogs:4。. 作为数据集,因为 4.0 支持新的拆分 API。


我为相应的代码编写了代码如下:

我运行上面的代码并得到以下错误。

请帮助我进行适当的拆分。

0 投票
1 回答
1223 浏览

keras - 找不到相关的张量 remote_handle:操作 ID:14738,输出编号:0

我正在使用 colab pro TPU 实例来进行补丁图像分类。我正在使用 tensorflow 2.3.0 版。

调用 model.fit 时出现以下错误: InvalidArgumentError: Unable to find the relevant tensor remote_handle: Op ID: 14738, Output num: 0带有以下跟踪:

H 有两个数据集 zip 文件,其中包含 300,000> 和 100,000< 训练和验证示例,我使用 !gdown 从我的 Google Drive 下载并在 Colab VM 上解压缩。对于数据管道,我使用 tf.data.Dataset API 并为 API 提供文件路径列表,然后使用 .map 方法从内存中获取图像,请记住,我的训练数据集不适合内存

以下是创建数据集的代码:

这是用于创建和编译我的模型并拟合数据集的代码,我使用带有 VGG16 后端的 keras 自定义模型:

对于 TPU 初始化和策略,我使用strategy = tf.distribute.TPUStrategy(resolver) 如下所示的初始化代码:

可以在以下位置获得带有输出的整个笔记本的副本:Colab Ipython Notebook