问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
166 浏览

amazon-web-services - 使用 S3 作为原始数据的数据管道将如何工作?

我目前使用 AWS S3 作为数据湖来存储原始数据,每分钟向指定存储桶添加大约 100 个项目。我知道数据管道和数据 ETL 概念的基础知识,但我仍然不熟悉基础知识,例如 Apache Spark 是什么,或者 AWS Glue 的具体工作原理。

我愿意尝试所有教程并为自己学习,但我不知道从哪里开始。如果可以的话,请指导我从哪里开始执行以下任务。

  1. 每当将新对象添加到 S3 存储桶时,转换它们并将它们存储在另一个数据存储中。
  2. 如果要以大型 CSV 格式管理生成的转换项目(我猜是 DynamoDB,因为它是表数据?),那么在哪里存储转换后的项目。
  3. 低级解决方案和高级解决方案将如何处理这些任务?(例如,使用 Spark 与 Glue)

谢谢!

0 投票
1 回答
45 浏览

python - 使用 CSV 和文本文件中的 python 插入 SQL Server 表

我正在尝试将 CSV 文件和文本文件中的数据插入 SQL SERVER SSMS 18.7 版。下面是我的代码。

当我运行查询时,我收到以下错误。

如果我使用文本文件,有人可以帮助我们解决这个问题吗?

0 投票
0 回答
169 浏览

google-cloud-dataflow - 会话窗口在 Cloud Dataflow 上的 Apache Beam 中无法正常工作

需求如下:我们要跟踪用户事件并根据以下逻辑创建会话

  • 30 分钟不活动
  • UTC 结束时间

为此,我们将所有用户事件发布到 pubsub。在 apache Beam 管道中,我们读取 pubsub 消息,按 visitor_id 和 event_date 分组,并将它们分组到用户会话窗口中,条件如下:


问题是——我们在 bigquery 中看到 2 个连续事件相隔超过 30 分钟的记录。例如:

在此处输入图像描述

正在发生的事情是——beam 比较最后两个事件,如果它们相隔超过 30 分钟,它会将所有事件转储到 bigquery 中,包括最后一个事件。而它应该将相隔超过 30 分钟的最后一个事件添加到一个单独的会话中。

0 投票
1 回答
48 浏览

amazon-dynamodb - 估计通过 Data Pipeline 导出 DynamoDB 数据的持续时间

我的 DynamoDB 表有大约 1 亿 (30GB) 个项目,我为它预置了 10k RCU。我正在使用数据管道作业来导出数据。

DataPipeline 读取吞吐量比设置为 0.9。

如何计算完成导出的时间(管道需要 4 多个小时才能完成导出)

我该如何优化它,以便在更短的时间内完成导出。

读取吞吐率与 DynamoDB 导出有何关系?

0 投票
0 回答
110 浏览

airflow - 最佳数据管道框架

符合以下要求的最佳数据管道框架是什么?:

  • 开源/免费使用
  • 需要使用 Python 创建数据管道(应该支持 Geopandas、Pandas、Numpy,...)
  • 支持手动和时间触发的管道
  • 非技术用户可以在其中启动管道(订单数据)的 Web 界面。必须可以使用需要在运行时定义的变量。
  • 支持在单个 Docker 容器中运行管道的能力
  • 与源代码控制 (GIT) 集成。即从 GIT 下载最新的日期管道

我已经调查过 Apache Airflow,但想知道市场上是否有更好的替代方案,它支持上面定义的要求 :)

0 投票
1 回答
85 浏览

python - 使用 SQl 和 Python 的数据管道

我需要使用 Python 创建一个数据管道。我想在 Python 中连接 MySql 并读取数据框中的表,执行预处理,然后将数据加载回 Mysql Db。我能够使用 mysql 连接器连接到 MySql Db,然后预处理数据帧。但是,我无法将这些数据帧从 Python 加载回 Mysql。错误:ValueError:未知类型 str96 python。请帮助我完成此任务的方法。

我是编程新手。任何帮助将不胜感激。谢谢!

0 投票
0 回答
222 浏览

google-cloud-platform - GCP 数据融合 Azure blob 存储配置 传输多个文件

我正在尝试通过数据融合管道将多个 csv 文件从 Azure 存储容器传输到 GCP 存储桶。

我可以通过提及 Azure blob 存储配置 wasbs://containername@storageaccountname.blob.core.windows.net/ CSVFile的“路径”字段的以下路径(特定 CSV 文件的完整路径)来成功传输单个文件.csv

但是,当我尝试从“contrainername”容器传输多个文件时,“路径”下方不起作用(这里我没有在“/”之后提及任何文件名,因为我需要传输此容器下的所有文件)

'wasbs://containername@storageaccountname.blob.core.windows.net/'

它抛出异常,因为' Spark program'phase-1' failed with error: / is not found。请检查系统日志以获取更多详细信息'

在这里,我使用“SAS 令牌”进行身份验证,该身份验证在容器级别生成,非常适合完整路径文件。

GCP管道中是否有任何Forloop选项来遍历文件?提前致谢


编辑:在容器级别生成的 SAS 令牌不适用于路径“/”。在存储帐户级别生成的 SAS 令牌可以选择该目录下的所有文件。但它将所有文件数据合并到一个文件中(在 GC 存储中生成)

任何人都可以帮助如何单独传输文件?谢谢

0 投票
1 回答
19 浏览

bigdata - 为什么 DolphinScheduler 中的任务状态总是处于成功提交状态?

当我点击Start按钮运行工作流时,遇到如下情况:任务状态一直处于成功提交状态,请问如何解决这个问题?

0 投票
1 回答
98 浏览

google-cloud-platform - GCP 数据融合将多个数据从 Azure 存储传输到 Google 存储

我正在尝试通过数据融合将目录下的多个(.csv)文件从 Azure 存储容器传输到 Google 存储(作为 .txt 文件)。通过数据融合,我可以成功传输单个文件并将其转换为 .txt 文件作为 GCS Sink 的一部分。

但是,当我尝试将 azure 容器下的所有 .csv 文件传输到 GCS 时,它会合并所有 .csv 文件数据并在 GCS 生成单个 .txt 文件。

有人可以帮助如何分别传输每个文件并在接收端将其转换为 txt 吗?

0 投票
0 回答
17 浏览

python - 了解使用生成器进行数据管道的程序的程序流程和代码

我创建了一个虚拟数据管道,它从日志文件中读取数据并将其应用来过滤器(虚拟过滤器)。我很难理解这个程序的输出(流程)。

问题 在 'process' 函数的 for 循环中,我们遍历每个过滤器。现在在循环中,当我们在 filter1 时,我希望程序流将被转移到 filter1 函数,并且它将在移动到循环内的下一个过滤器之前执行。因此,根据我在 filter1 之后的循环中的说法,程序流程应该进入 filter1 函数,但输出显示程序只是生成了一个 filter1 生成器对象,然后直接循环到下一个过滤器,而无需先执行 filter1。同样,在进程函数中退出 for 循环后,程序进入 filter2,然后进入 filter1 并执行它。我不明白这里到底发生了什么。如果您能解释此代码的工作原理及其与输出相关的控制流程,那就太好了。

代码:日志类有两个过滤功能,可以做一些过滤工作。首先,我正在初始化一个虚拟文件,这里它只是两个字符串的列表,然后我添加了两个虚拟过滤器,它们在进程函数内的循环中应用于文件

这是程序的输出

在此处输入图像描述