问题标签 [data-ingestion]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
32 浏览

json - 将 Elastic 输出提取到 Elastic 中

我有一个客户在 JSON 文件中向我提供了如下所示的 Elastic 查询输出:

可以看到,有将近 170.000 条记录。我想在我的测试 Elastic 中摄取这些数据。我需要摄取的数据是 _source 中的数据。我怎样才能最好地从文件中提取它?如果输入在 JSON 文件中,logstash 是否是执行此操作的正确工具?

感谢您的建议。C

0 投票
1 回答
141 浏览

apache-nifi - Nifi在HDFS路径上为前一天合并的json文件移动文件

我需要将前一天处理并合并的 json 文件移动到新的 hdfs 路径中。要求是递归搜索未处理的文件并移动待处理的未处理文件。

路径 1 -> /data/nifi/working/2019/10/source_2019_10_15.json --- 每日处理的文件合并在此路径下,并每天添加。路径 2 -> /data/nifi/incoming/ - 代码应该搜索文件夹是否不存在,然后创建和移动文件,如果文件夹已经存在则移动文件。

目前,我正在使用 nifi flow -- ListHDFS->MoveHDFS 但无法实现。

需要帮助如何实现这一点。

感谢您的帮助。

0 投票
1 回答
51 浏览

postgresql - 有没有办法在 Sqoop 中设置 PostgreSQL 优化器?

我正在尝试运行一个 sqoop 作业以将数据从 postgresql 摄取到 hdfs,但我被困在某个点上。

Sqoop在我的“ WHERE ”语句的末尾添加“ AND (1=0) ”,以便在摄取之前获取元数据。

添加上述命令后,查询永远不会完成。(在 Sqoop 和 DBeaver 中)

但是,此查询仅在我设置 SET OPTIMIZER = ON 后才有效(在 DBeaver 中)

我正在寻找在我的 sqoop 会话中设置优化器参数的解决方案。

有没有办法做到这一点?

0 投票
2 回答
282 浏览

apache-kafka - 是否有任何 api 或工具可以获取 kafka 主题中的摄取率

如何找到摄取比率?即,每秒/分钟间隔摄取的事件/字节数

0 投票
1 回答
147 浏览

amazon-s3 - Sqoop 无法从 Postgres 导入到 S3

我在日常操作中将数据从 Postgresql 导入到 hdfs 和 hdfs 到 S3。(sqoop 导入 [postgres 到 hdfs] & distcp [从 hdfs 到 s3])

我想删除中间步骤(hdfs)并使用 sqoop 直接将数据导入 S3 存储桶。

但是,相同的 sqoop 字符串在导入操作结束时会失败。

我也试过--target-dir s3a://path/to/destination而不是 ....... LOCATION s3a://path/to/destination

在“映射:%100 完成”之后,它会抛出以下错误消息:

0 投票
2 回答
959 浏览

elasticsearch - 在摄取管道中使用搜索模板

ElasticSearch 摄取管道可以使用搜索模板作为其脚本吗?

具体来说,我想配置一个摄取管道,以便每当进入特定类型的数据时,我们会在 ElasticSearch 中查询一些相关数据,并根据结果在原始数据上填充一个(或多个字段),然后再加载它在。

我看到摄取管道可以使用脚本(https://www.elastic.co/guide/en/elasticsearch/reference/master/script-processor.html),并且脚本可以包含搜索模板(https://www. elastic.co/guide/en/elasticsearch/reference/master/modules-scripting-using.html),但我无法找到有关可能将两者结合起来的任何信息。

0 投票
2 回答
1937 浏览

google-cloud-platform - Google 数据融合执行错误“INVALID_ARGUMENT:‘DISKS_TOTAL_GB’配额不足。请求的 3000.0,可用的 2048.0。”

我正在尝试使用 Google Data Fusion Free 版本将一个简单的 CSV 文件从 GCS 加载到 BQ。管道因错误而失败。它读到

Mapreduce 和 Spark 执行管道重复相同的错误。感谢解决此问题的任何帮助。谢谢

问候 KA

0 投票
1 回答
751 浏览

druid - Apache Druid:更新数据源中的数据时出现问题

我目前使用的是 druid-Incubating-0.16.0 版本。如https://druid.apache.org/docs/latest/tutorials/tutorial-update-data.html教程链接中所述,我们可以使用结合 firehose 来更新和合并数据源的数据。

步骤:1 我使用与初始结构相同的样本数据

第 2 步:我用 appendToExisting = false 和 rollUp = true 更新了 Tiger 的数据 {"timestamp":"2018-01-01T01:01:35Z","animal":"tiger", "number":30} 和找到了结果

第 3 步:现在我用 appendToExisting = false 和 rollUp = true 更新长颈鹿 {"timestamp":"2018-01-01T03:01:35Z","animal":"giraffe", "number":30} 并得到以下结果

我的疑问是,在第 3 步中,老虎的数量减少了 1,但我认为它不应该改变,因为老虎的第 3 步没有变化,也没有数量变化

仅供参考,count 和 number 是 metricSpec,它们分别是 count 和 longSum。请说清楚。


当使用带有初始数据的 ingestSegment firehose 时

在添加带有 appendToExisting = true 的新数据 {"timestamp":"2018-01-01T03:01:35Z","animal":"giraffe", "number":30} 时,我得到了

它是正确的和预期的输出吗?为什么汇总没有发生?

0 投票
2 回答
163 浏览

json - 如何从 TXT 文件中提取 JSON?

问题不在于获取 TXT,因为它是

返回的不是 JSON 对象,而是一个字符串......即使replace(txt,E'\n',' ')它不起作用。如何规范化并将字符串真正转换为 JSON?

PS:我正在使用 JSONb,它必须是 JSON 用于摄取。


笔记

json文件/tmp/test.json

我正在使用 UBUNTU 18 LTS、PostgreSQL v12 和 linux 标准 TXT 中的文件。终端命令file -i /tmp/test.json说这一切​​都很好,“text/plain; charset=utf-8”。

仅将全文加载到一个字段中(使用 COPY FROM 似乎不可能),PostgreSQL 很丑,但是这个功能已经过测试并且是可靠的:

0 投票
1 回答
142 浏览

csv - Nifi:检查 CSV 文件的行更新,然后摄取

通常在我们的环境中,当有人要摄取新数据时,他们会生成一个新的 CSV 文件,Nifi 将从 SFTP 中获取该文件。但是,我有一个新案例,其中源的 CSV 文件位于共享驱动器上,并且当有更新时,他们的应用程序会向其中添加新行。

Nifi 有没有办法在添加新行时监控这个 CSV 文件?并且它随后可以只摄取新数据吗?我认为关系数据库有类似的东西,但我还没有看到文件有类似的东西。提前致谢。