问题标签 [data-ingestion]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
846 浏览

sql-server - 在 0.9 版中工作的 kafka 连接作业在 0.10.2 中不工作

当我运行我的 kafka 连接作业时,我收到以下错误

我的属性文件是

我尝试使用GET调用 kafka 连接(在它崩溃之前)curl http://localhost:8083 /JdbcSourceConnector/config/validate,我得到了响应{"version":"0.10.2.0-cp1","commit":"64c9b42f3319cdc9"}curl: (3) <url> malformed。任何想法我做错了什么,因为这项工作在 kafka 0.9 中工作。

0 投票
1 回答
419 浏览

validation - 使用 Kafka 和 Hadoop 进行数据摄取 - 如何避免因质量检查失败而导致的数据重复?

这是一个简化的场景:

N 个业务流需要来自同一来源的相同原始数据。数据使用 Kafka(普通 Kafka 管道)摄取并登陆 HDFS,在 HDFS 上,每个流程的原始数据都会触发质量检查的自动流程。所有 N 个流可能具有不同的数据质量标准。例如,他们可能要求在将原始数据转换为所需模式时,可以对原始数据应用不同格式的日期和时间。

处理未能满足业务流程质量测试 KPI 的最佳方法是什么?

选项包括:

  1. 全部失败 - 通知源数据提供者并等待固定数据。然后重新摄取并运行所有 N 组质量检查。
  2. 创建一个分支 - 意味着 N 个未通过质量检查的业务流中的 K 个将等待其固定数据集,而通过的 NK 将使用当前数据集。
  3. 标记某些业务流程未通过质量检查的条目,并将其放入特殊队列以手动处理/修复。对错误条目的数量应用一些规则和阈值(仅就需要通过此队列并分析和修复有问题的条目的团队能力的认识而言)

以上哪种方法(如果有)是最明智的?当许多具有不同质量标准的消费者使用相同的数据时,是否有任何模式或最佳实践来处理这种情况?理想情况下,我会避免重复相同的数据,这意味着为每个消费者重新摄取固定数据集(N - 并不是最坏的情况,因为对 N 中的一个进行修复可能会导致其他事先没问题的问题。所以,理论上,这个过程可能是无止境的)。

0 投票
1 回答
850 浏览

date - Elasticsearch 摄取管道 -epoch_millis 到日期格式

我在 ES 5.4.1 中使用 reindex API,我需要将一个长字段(代表一个日期)转换为一个日期字段。所以源索引看起来像

temp 必须转换为日期对象。

我想用处理器,

但是在尝试创建这个处理器时,我得到了一个错误

有任何想法吗?

0 投票
1 回答
64 浏览

data-ingestion - 使用目录扫描运算符处理具有特定扩展名的大文件

我有一个 1GB+ 大小的文件从 MQ 进入我的目录,这需要一些时间才能完全传输文件,但即使文件不完整,也会在该目录中生成一个文件。我担心我的 directoryScan 操作员会选择一个不完整的文件。此外,我无法添加初始延迟,因为我不确定传输文件需要多长时间。

PS:我在某处读到一些文件传输协议通过向文件添加不同的扩展名来解决这个问题,直到它完成。假设我的 directoryScan 操作员正在等待任何扩展名为 .txt 的文件,因此此文件传输协议将创建一个扩展名为 .abc 的文件,直到传输完成。

我该怎么做呢?

0 投票
1 回答
1491 浏览

elasticsearch - 在摄取节点中拆分处理器后访问数组元素

我试图在摄取节点管道中使用“拆分”处理器将字符串拆分为数组后访问数组元素?

我有一个用斜杠('/')分隔的长字符串。我只想将一个子字符串传递给索引,然后转储其余的。

例如,我有一个字符串“/aaa/bbb/ccc”。我只想索引“ccc”。

我目前的想法是使用split + set + remove,但是我不知道如何访问拆分后的数组元素。

PS:如果我使用 Logstash 有解决方案吗?

0 投票
1 回答
4814 浏览

python - Pandas:合并两个数据框并保留来自单个数据框的不相交数据

欲望:

我想要一种方法来合并两个数据框并保留指定数据框中的非相交数据。

问题:

我有重复的数据,我希望这一行可以删除重复的数据:

示例数据和数据测试:

背景:

我正在加载 xml 数据并将 xml 文件转换为几种不同的记录类型作为命名元组。我将每种记录类型拆分为自己的数据框。然后,我通过如下构造将 xml 文件中的当前数据集与已加载到数据库中的数据进行比较previous_df

列是根据命名元组中的字段动态创建的。数据库架构是使用 sqlalchemy 生成的,UniqueConstraint当我认为数据库中有重复项时,我添加了进行管理。

提前感谢您提供的任何帮助。

0 投票
0 回答
346 浏览

elasticsearch - 使用分隔符拆分字符串并分配给新键-logstash

我正在尝试使用logstash将csv导入elasticsearch。我在 csv 中有一个字段是 IP:Port 对。我想拆分 IP 和端口,并通过 logstash 配置文件将它们分配给两个新密钥。

我查看了 kv ,但 kv 似乎需要字符串中的键和值对。我的只有价值。

样本 csv 数据:

hostname1,172.20.28.11:8080,proxy_web,false hostname2,172.20.28.20:443,web_https,false

在上述情况下,我已经将第二个字段映射为“IP_Port”(使用 csv=> 列)现在,我想将 172.20.28.11:8080 拆分为 IP=172.20.28.11 和 port=8080(key=IP,value= 172.20.28.11) 等端口。然后将 IP 的数据类型更改为“ip 数据类型”。

之后,如果我想转换提取的 IP 并使用它进行地理映射,我可以使用以下代码段包含 IP 到 geoip 的转换。

对此有何建议?

0 投票
1 回答
466 浏览

mysql - sqoop 在 RDBMS MySQL 中导出更新表记录

所以我试图在 MySQL 的 RDBMS 表中执行更新。问题是这个更新来自我的 HDFS 中的一个文件,虽然在 MySQL 中,当我更新记录时,表计数带有一个主键,但结果集在描述中出现了重复的值,而不是用新的更新先前的记录钥匙。在下面的示例中,记录 Financial 应该只出现一次,值为 9,但相反,我有两条记录,第一条的键为 2,第二条的键为 9。

在此处输入图像描述

我的 Sqoop 代码:

非常感谢你们。

0 投票
2 回答
8985 浏览

spring - 您如何将 Spring Boot 日志直接摄取到 Elastic 中

我正在研究将 Spring Boot 应用程序日志直接发送到弹性搜索的可行性。不使用 filebeats 或 logstash。我相信 Ingest 插件可能会对此有所帮助。

我最初的想法是通过 TCP 使用 logback 来做到这一点。

https://github.com/logstash/logstash-logback-encoder

所以看上面你可以直接将日志发送到logstash。我只是想知道是否可以使用更新的摄取功能并跳过使用logstash?通过使用摄取方法通过网络将 json 编码的日志直接发送到弹性?

https://www.elastic.co/blog/new-way-to-ingest-part-1

我的问题

我想知道这是否可能?如果是这样,你能解释一下你会怎么做。还有什么可能的陷阱等等。

0 投票
1 回答
272 浏览

mysql - 使用 mysql 表中的 sqoop 更新 hive 表

我已经有一个名为角色的配置单元表。我需要使用来自 mysql 的信息更新此表。所以,我使用这个脚本认为它会在我的蜂巢表上添加和更新新数据:`

不幸的是,这只会插入新数据,但我无法更新已经存在的记录。在你问几个陈述之前:

  • 该表没有PK

  • 如果我不指定 --last-value 作为参数,我将获得那些已经存在的重复记录。

如果不应用截断表或使用 PK 重新创建表,我怎么能弄清楚呢?存在方式?

多谢你们。