问题标签 [data-pipeline]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
2 回答
1879 浏览

google-cloud-platform - 带有 python flex 模板的数据流 - 启动器超时

我正在尝试使用 flex 模板运行我的 python 数据流作业。当我使用直接运行器(没有弹性模板)运行时,作业在本地运行良好,但是当我尝试使用弹性模板运行它时,作业停留在“排队”状态一段时间,然后因超时而失败。

这是我在 GCE 控制台中找到的一些日志:

对于 1,我看不到有用的 lo。对于 2,服务帐户是默认服务帐户,因此它应该具有所有权限。

我该如何进一步调试呢?

这是我的 Docker 文件:

我正在关注本指南 - https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates

0 投票
2 回答
158 浏览

google-cloud-platform - 使用 CloudSQL 到 BigQuery 的正确数据管道

在我将这个问题作为我的论文项目时,我在整个数据工程中真的是新手,所以请耐心等待。

我目前正在为已经拥有 CloudSQL 服务的电池存储系统开发一个大数据平台,该服务每 15 秒收集一次数据(因此它已经在 GCP 环境中)。我的工作是复制它们并在每次数据进入时传输到 BIGQUERY(包括使用 Dataprep 准备数据),然后将其用于机器学习。

我已经挖掘了几种方法,其中一种使用 Dataflow,尝试过一次,但它是手动完成的。使用jdbc to bigquery作业。为了满足我的需求(定期运行作业),我被推荐使用 Cloud Composer。

另一方面,我得到了另一个使用 PubSub 的源,它触发 Dataflow 的作业。后一种方法似乎更有希望,但是,最好同时了解这两个世界。任何建议肯定会有所帮助...

0 投票
1 回答
82 浏览

google-cloud-functions - Google Dataflow pipeline for varying schema

I have a product to define and configure business workflows. A part of this product is a form-builder which enables users to setup different forms.

This entire forms data is backed on MongoDB in the following structure

As seen above the form can be for various different businesses. However, I am looking at data pipeline to transport the data to Google Bigquery at periodic intervals for analysis.

On BQ side, I am maintaining separate tables for each workflows

I have a current working solution which is completely written on Google Cloud Functions. I have a Google Scheduler Job run at periodic intervals invoking the different cloud functions. The cloud functions is doing the following things at high level

  • Iterate for each schema
  • Read the data mongodb for every schema since the last run (as cursor)
  • For each row of data, run the custom transformation logic (this includes transforming various nested data types like grids/lookup etc)
  • Write each row of transformed data directly as stream as ndjson on Google Cloud Storage

I above solution provides me with,

  • Complete control on transformation
  • Simple deployment

However since its all on CF, I am bound by limitation of 9 minutes per run. This essentially puts a lot of pagination requirements especially if there is a need to regenerate the complete data from beginning of time

While the above solution works fine for now, I was looking at other serverless options like Google data-flow. Since I am just starting on data-flow/apache beam, I was wondering

If I were to write a pipeline on beam, should I go with same approach of

  1. Extract(Row by Row) -> Transform -> Load (GCS) -> Load (BQ)

or

  1. Extract (entire data as JSON) -> Load to GCS -> Transform (Beam) -> Load to GCS -> Load to BQ

Let me know if there is any better option for entire data processing.

0 投票
2 回答
144 浏览

python-3.x - 转换文件后加载 DynamoDB 数据。任何 AWS 服务,例如 GCP Dataflow/Apache Beam?

AWS 新手。我需要创建一个每日批处理管道

  1. 阅读 6-10 个 1GB+ CSV 文件。(每个文件都是从 SQL 数据库中提取的表。)
  2. 使用一些逻辑转换每个文件并连接所有文件以为每个 id 创建一个项目。
  3. 使用 upsert 逻辑将此连接数据加载到单个 DynamoDB 表中。

我开始使用的当前方法是:我们有一个可用于此类任务的 EC2。所以我正在编写一个python代码来(1)读取所有CSV,(2)转换为非规范化的JSON文件,(3)使用boto3导入Dynamodb

我的问题是我担心我的数据是否是“大数据”。使用单个 Python 脚本处理 10GB 数据可以吗?如果文件大小变成 10 倍,我会面临缩放问题吗?我过去只使用过 GCP,在这种情况下,我会使用 DataFlow 来完成任务。那么在 AWS 术语中是否存在等价物?如果有人可以提供一些想法,那就太好了。谢谢你的时间。

0 投票
1 回答
545 浏览

google-cloud-platform - 使用 pip install 更新后 Airflow 1.10.13、2020-11-24 问题

以下是工作到 12 月 1 日的配置

  1. 作曲家-1.11.2-气流-1.10.6
  2. 蟒蛇 - 3.6
  3. 'dbt==0.17.0',
  4. '谷歌云存储',
  5. 'google-cloud-secret-manager==1.0.0',
  6. 'protobuf==3.12.2'

通过上述配置,我们观察到以下错误。错误:无法安装 dbt 和 protobuf==3.12.2,因为这些软件包版本具有冲突的依赖关系。\n\n冲突是由以下原因引起的:\n 用户请求的 protobuf==3.12.2\n dbt-bigquery 0.17.0 依赖于protobuf<3.12 和 >=3.6.0\n\n要解决此问题,您可以尝试:\n1. 放宽您指定的软件包版本范围\n2. 删除软件包版本以允许 pip 尝试解决依赖冲突\n\n错误:ResolutionImpossible

收到此错误消息后,我们尝试了许多版本更改来解决上述问题。下面列出了一些已应用的修复以及我们针对这些修复观察到的错误。

  1. 已应用修复:根据上述错误将 protobuf 版本从 3.12.2 降级到 3.11.3 错误:从https://files.pythonhosted.org/packages/d4/12/6c3fd74a590c7327c98cae008c11d536029fa9cd7924de ​​4767e8cb88041 请求 docutils<0.16,>=0.10 0.15.1-post1.tar.gz#sha256=f33ddb723332c6d6b6d99731ee1fc0c35eb4044a2df5cca1c64c8aa78eaf22cb (from botocore<1.15,>=1.5.0->dbt-redshift==0.17.0->dbt==0.17.0) 有不同的版本0.15.1'

  2. 已应用修复:将 docutils 版本指定为 0.15.2 以修复上述错误错误:SyntaxError:调用“打印”时缺少括号。您的意思是 print("Generating %s..." % output)?\n ------------------------------- ---------\n错误:命令出错,退出状态为 1:python setup.py egg_info 检查日志以获取完整的命令输出。

  3. 已应用修复:尝试将 python 版本从 3.6 升级到 3.8。错误:路径 python3.8(来自 --python=python3.8)不存在。

  4. 已应用修复:删除了上述库的包版本控制,以允许 pip 尝试解决依赖冲突。错误:命令出错,退出状态为 1:\n 命令:/tmp/venv__vuo821/bin/python3.6 -c 'import sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-ghz6thkq/protobuf_a5303ec5c2df4986bb9b05948ae30052/setup.py'"'"'; file ='"'"'/tmp/pip-install-ghz6thkq/protobuf_a5303ec5c2df4986bb9b05948ae30052/setup.py'"'"';f=getattr(tokenize, '"'"'open'"'"', open)( file ) ;code=f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(代码,, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-gi38587q\n cwd: /tmp/pip-install-ghz6thkq/protobuf_a5303ec5c2df4986bb9b05948ae30052/\n完整输出(6 行):\n Traceback(最近一次调用最后一次):\n 文件“”,第 1 行,\n 文件“/tmp/pip-install-ghz6thkq/protobuf_a5303ec5c2df4986bb9b05948ae30052/setup.py”,第 52 行\ n print "Generating %s..." % output\n ^\n SyntaxError: 调用 'print' 时缺少括号。您的意思是 print("Generating %s..." % output)?\n ------------------------------- ---------\n错误:命令出错,退出状态为 1:python setup.py egg_info 检查日志以获取完整的命令输出

  5. 已应用修复:已将 dbt 升级到 0.17.2 错误:收到 SIGTERM。终止子进程。

  6. 已应用修复:尝试将 setuptools 版本降级到 49.6.0 错误:SyntaxError:调用“打印”时缺少括号。您的意思是 print("Generating %s..." % output)?\n ------------------------------- ---------\n错误:命令出错,退出状态为 1:python setup.py egg_info 检查日志以获取完整的命令输出。

  7. 已应用修复:将 Composer 和 Airflow 版本升级到 composer-1.13.2-airflow-1.10.9 错误:上述配置组合出现类似错误

0 投票
1 回答
63 浏览

sql - 插入新记录时生成累进编号(某些记录需要具有相同的编号)

标题可能有点混乱。让我解释一下这个问题。我有一个每天加载新记录的管道。此记录包含销售额。关键是<日期、位置、票证、线路>。这些数据被加载到红移表中,然后通过系统读取的视图公开。这个系统有一个限制,票的列是 varchar(10) 但票是 30 个字符的字符串。如果系统只取前10个字符会产生重复。票号可以是“假”号。如果它不等于实数也没关系。所以我正在考虑在 redshift 表上添加一个包含渐进数字的新列。问题是我不能使用标识列,因为属于同一张票的记录必须具有相同的“累进编号”。

这就是我想要的:

地点 线 数量 票号
2020 年 12 月 12 日 67 123...生长激素 1 10 1
2020 年 12 月 12 日 67 123...生长激素 2 5 1
2020 年 12 月 12 日 67 123...生长激素 3 23 1
2020 年 12 月 12 日 23 123...国标 1 13 2
2020 年 12 月 12 日 23 123...国标 2 45 2
... ... ... ... ... ...
2020 年 12 月 12 日 78 123...AG 5 100 153

第二天加载新数据时,我想从 ticket_id 154 开始,依此类推。

每行都有一列,指定插入的时刻。同一天插入的行具有相同的 insert_time。

我的解决方案是:

  1. 插入带有ticket_id 作为dense_rank 的记录。但是每次(我加载新记录,所以每天)ticket_id 都从一个开始,所以......
  2. ...将刚刚插入的行更新为 ticket_id = ticket_id + 我在 ticket_id 列下找到的最大数量 insert_time != max(insert_time)

你认为有更好的解决方案吗?如果存在一个以 <day, location, ticket> 作为输入并返回最多 10 个字符的哈希函数,那就太好了。

0 投票
1 回答
657 浏览

azure - 使用 Azure 数据工厂将压缩的 XML 文件从 HTTP 链接源复制并提取到 Azure Blob 存储

我正在尝试建立 Azure 数据工厂复制数据管道。源是一个开放的 HTTP 链接源(网址参考:https ://clinicaltrials.gov/AllPublicXML.zip )。所以基本上源包含一个包含许多 XML 文件的压缩文件夹。我想使用 Azure 数据工厂将提取的 XML 文件解压缩并保存在 Azure Blob 存储中。我试图遵循此处提到的配置:How to decompress a zip file in Azure Data Factory v2 但我收到以下错误:

不完全确定出了什么问题,但是如果有人可以指导我进行该程序,那将非常有帮助。

0 投票
1 回答
36 浏览

firebase - 为什么在 ga/firebase 选项没有任何变化的情况下,来自 bigquery 的数据量显着减少?

我使用 Bigquery 从 ga 和 firebase 获取原始数据。我可以从 Bigquery 获得大约 100000 ~ 200000 行的日志数据。

但从上周开始,我从 Bigquery 获得了大约 1000 行数据。

在此处输入图像描述

我没有更改 ga、firebase、bigquery 的任何选项。而且我发现 ga 中显示的数据没有变化。

正如我认为的那样,将 ga 与 bigquery 和 firebase 与 bigquery 连接存在问题。

如何找到相关的问题和原因?我需要很多关于这个问题的意见

谢谢你!

0 投票
1 回答
350 浏览

python - 如何使用 tf.data 在 tensorflow 管道中加载 .npy 文件

我正在尝试np.load()使用tf.data管道从 .npy 文件中读取我的 X 和 y 数据。但是如果我打电话会得到以下错误model.fit()。有人解决这个问题吗?我想我必须将 X_data 和 y_data 的形状赋予tf.py_funciton. 我正在使用 TensorFlow 2.4

错误:

Input 0 of layer sequential_13 is incompatible with the layer: expected ndim=3, found ndim=2. Full shape received: (None, None)

描述:

train_filenames,train_label_filenames 是包含每个 .npy 文件的文件路径的列表。

所以那个print(train_filenames[0])节目是'E:\UserData\Mustermann\02_Data\X_Data\Sample0.npy' _np.load(train_filenames[0]).shape(12, 8002)
np.load(label_filenames[0]).reshape(-1,1).shape(1, 1)

所以一个样本有 12 个时间步长和 8002 个特征。

代码:

0 投票
1 回答
43 浏览

amazon-web-services - AWS Datapipeline RDS 到 S3 活动错误:无法建立与 jdbc://mysql 的连接:

我目前正在使用 RDStoRedshift 模板设置 AWS Data Pipeline。在第一个 RDStoS3Copy 活动期间,我收到以下错误:

“[错误](TaskRunnerService-resource:df-04186821HX5MK8S5WVBU_@Ec2Instance_2021-02-09T18:09:17-0)df-04186821HX5MK8S5WVBU amazonaws.datapipeline.database.ConnectionFactory:无法建立与 jdbc://mysql:/myhostname 的连接: 3306/mydb 没有找到适合 jdbc://mysql:/myhostname:3306/mydb 的驱动程序"

我对 AWS 服务比较陌生,但复制活动似乎为复制活动启动了一个 EC2 实例。该错误清楚地表明没有可用的驱动器。我是否需要为 AWSDataPipeline 建立一个 EC2 实例才能在其中使用和安装驱动程序?