2

我正在 Cloudera Distribution 上使用 StreamSets,试图从该网站http://files.data.gouv.fr/sirene/摄取一些数据

我在选择 HTTP 客户端和 Hadoop FS 目标的参数时遇到了一些问题。

https://image.noelshack.com/fichiers/2017/44/2/1509457504-streamsets-f.jpg

我收到此错误:HTTP_00 - 无法解析记录:java.io.IOException: org.apache.commons.compress.archivers.ArchiveException: No Archiver found for the stream signature

我会告诉你我的配置。

HTTP客户端:

一般的

名称:HTTP 客户端 INSEE

描述:客户端 HTTP SIRENE

记录错误:发送到错误

HTTP

资源网址: http: //files.data.gouv.fr/sirene/

标题:sirene_:sirene_

模式:流媒体

按状态操作

HTTP 统计代码:500 | 状态操作:使用指数退避重试 |

基本退避间隔(毫秒):1000 | 最大重试次数:10

HTTP 方法:GET

身体时区 : UTC (UTC)

请求传输编码:缓冲

HTTP 压缩:无

连接超时:0

读取超时:0

身份验证类型:无

使用 OAuth 2

使用代理服务器

最大批量(记录):1000

批处理等待时间(毫秒):2000

分页

分页模式:无

TLS

使用TLS

超时处理

超时操作:立即重试

最大重试次数:10

数据格式

日期格式:分隔

压缩格式:存档

压缩目录中的文件名模式:*.csv

分隔符格式类型:自定义

标题行:带标题行

最大记录长度(字符):1024

允许额外的列

分隔符:分号

转义字符:其他\

引用字符:其他“

根字段类型:列表映射

要跳过的行:0

解析 NULL

字符集:UTF-8

忽略控制字符

Hadoop FS 目标:

一般的

名称:Hadoop FS 1

描述:写入HDFS

舞台图书馆:CDH 5.7.6

制作活动

必填字段

前提条件

记录错误:发送到错误

输出文件

文件类型:整个文件

文件前缀

标题中的目录

目录模板:/user/pap/StreamSets/sirene/

数据时区:UTC (UTC)

时间基准:${time:now()}

使用滚动属性

验证 HDFS 权限:开

跳过文件恢复:开

迟到的记录

延迟记录时间限制(秒):${1 * HOURS}

后期记录处理:发送到错误

数据格式

数据格式:整个文件

文件名表达式:${record:value('/fileInfo/filename')}

权限表达式:777

文件存在:覆盖

在事件中包含校验和

...那我做错了什么?:(

4

2 回答 2

2

看起来http://files.data.gouv.fr/sirene/正在返回文件列表,而不是压缩存档。这是一个棘手的问题,因为没有标准的方法来遍历这样的列表。您可能可以将http://files.data.gouv.fr/sirene/读取为文本,然后使用 Jython 评估器解析出 zip 文件 URL,检索、解压缩和解析它们,将解析的记录添加到批处理中. 不过,我认为您会遇到这种方法的问题,因为所有记录最终都会出现在同一批次中,从而耗尽内存。

另一个想法可能是使用两个管道 - 第一个将使用 HTTP 客户端源和脚本评估器来下载压缩文件并将它们写入本地目录。然后,第二个管道将照常通过目录源读取压缩的 CSV。

如果您决定尝试一下,请通过我们的渠道之一与 StreamSets 社区互动 - 请参阅https://streamsets.com/community

于 2017-11-01T15:04:49.810 回答
0

I'm writing the Jython evaluator. I'm not familiar with the available constants/objects/records as presented in comments. I tried to adapt this python script into the Jython evaluator :

import re
import itertools
import urllib2
data = [re.findall(r'(sirene\w+.zip)', line) for line in open('/home/user/Desktop/filesdatatest.txt')]
data_list = filter(None, data)
data_brackets = list(itertools.chain(*data_list))
data_clean = ["http://files.data.gouv.fr/sirene/" + url for url in data_brackets]
for url in data_clean:
    urllib2.urlopen(url)

records = [re.findall(r'(sirene\w+.zip)', record) for record in records] gave me this error message SCRIPTING_05 - Script error while processing record: javax.script.ScriptException: TypeError: expected string or buffer, but got in at line number 50

filesdatatest.txt contains things like :

Listing of /v1/AUTH_6032cb4c2159474684c8df1da2e2b642/storage/sirene/  
Name    Size    Date  
../            
README.txt  2Ki     2017-10-11 03:31:57  
sirene_201612_L_M.zip   1Gi     2017-01-05 00:12:08  
sirene_2017002_E_Q.zip  444Ki   2017-01-05 00:44:58  
sirene_2017003_E_Q.zip  6Mi     2017-01-05 00:45:01  
sirene_2017004_E_Q.zip  2Mi     2017-01-05 03:37:42  
sirene_2017005_E_Q.zip  2Mi     2017-01-06 03:40:47  
sirene_2017006_E_Q.zip  2Mi     2017-01-07 05:04:04  

so I know how to parse records.

于 2017-11-02T17:15:26.250 回答