1

我是新手。我正在尝试从 hbase 读取数据并使用大数据批处理对表达式生成器中的数据进行一些转换并将输出写入文件。 在此处输入图像描述

现在我想获取表格的行键并对其应用转换,如下所示,

(concat('-',cast(cus.key as string))) as id

这里的键是我从中提取数据的 hbase 表的行键。

我附上了映射选项卡的快照。

在此处输入图像描述

因此,当我基本上运行我的工作时,应该选择 hbase 表的键,以便将上述转换cast(cus.key as string) 应用于 rowkey 并存储为列 id。

我想知道我们是否有任何简单的方法可以从 hbase 表中获取行键?

提前致谢。

4

2 回答 2

1

首先,当您在 Hbase 中加载数据时,您需要创建一个自定义行键(在 hbaseoutput 选项中)。

您可以使用一些 ID 字段以使其独一无二,例如"key"+user_id.

遵循这个:这里

在您这样做的同时,将相同的值 ( "key"+user_id) 存储在您命名的列中row_key_technical(例如)

现在您可以像使用表格中的普通列一样使用行键。因此,使用 thbaseinput,您可以检索技术列中的行键存储并做任何您想做的事情。

你需要做两次。

我不确定这是唯一的解决方案,但它是一个。Mybe有人有更好的解决方案;)。

于 2017-01-25T16:44:28.137 回答
1

您可以强制 HbaseInput 组件获取 Hbase 表的行键。执行以下操作,转到存在 tHbaseInput 类的位置。

C:\Program Files (x86)\Talend-Studio\studio\plugins\org.talend.designer.components.mrprovider_6.2.1.20160704_1411\components\tHBaseInput

并且在 tHBaseInput_mrcode_main_only java jet 类中,会有一个方法 validateResult(),如下

    public boolean validateResult(org.apache.hadoop.hbase.client.Result result,
                    <%=recordStruct%> value) throws IOException {
                org.apache.hadoop.hbase.io.ImmutableBytesWritable rowKey = new org.apache.hadoop.hbase.io.ImmutableBytesWritable();
                rowKey.set(result.getRow());
                lastSuccessfulRow = rowKey.get();

                byte[] rowResult = null;
                String temp = null;

                <%
                for (int i = 0; i < mapping.size(); i++) {
                    Map<String, String> map = mapping.get(i);
                    String family_column= map.get("FAMILY_COLUMN");
                    IMetadataColumn column = mainColumns.get(i);
                    String columnName = column.getLabel();
                    String defaultValue = column.getDefault();
                    String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
                    JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
                    String patternValue = column.getPattern() == null || column.getPattern().trim().length() == 0 ? null : column.getPattern();
                    boolean isPrimitiveType = JavaTypesManager.isJavaPrimitiveType(javaType, column.isNullable());
                    String toAssign = "value." + columnName;

                    %>

                    rowResult = result.getValue(
                            org.apache.hadoop.hbase.util.Bytes.toBytes(<%=family_column%>),
                            org.apache.hadoop.hbase.util.Bytes.toBytes("<%=column.getOriginalDbColumnName()%>"));
                    temp = org.apache.hadoop.hbase.util.Bytes.toString(rowResult);

Modify the above method to below

public boolean validateResult(org.apache.hadoop.hbase.client.Result result,
            <%=recordStruct%> value) throws IOException {
        org.apache.hadoop.hbase.io.ImmutableBytesWritable rowKey = new org.apache.hadoop.hbase.io.ImmutableBytesWritable();
        rowKey.set(result.getRow());
        lastSuccessfulRow = rowKey.get();

        byte[] rowResult = null;
        String temp = null;
        value.key = org.apache.hadoop.hbase.util.Bytes.toString(lastSuccessfulRow);
        <%
        for (int i = 0; i < mapping.size(); i++) {
            Map<String, String> map = mapping.get(i);
            String family_column= map.get("FAMILY_COLUMN");
            IMetadataColumn column = mainColumns.get(i);
            String columnName = column.getLabel();
            String defaultValue = column.getDefault();
            String typeToGenerate = JavaTypesManager.getTypeToGenerate(column.getTalendType(), column.isNullable());
            JavaType javaType = JavaTypesManager.getJavaTypeFromId(column.getTalendType());
            String patternValue = column.getPattern() == null || column.getPattern().trim().length() == 0 ? null : column.getPattern();
            boolean isPrimitiveType = JavaTypesManager.isJavaPrimitiveType(javaType, column.isNullable());
            String toAssign = "value." + columnName;

            %>
            if(!"key".equalsIgnoreCase("<%=column.getOriginalDbColumnName()%>"))

完成后,删除 C:\Program Files (x86)\Talend-Studio\studio\configuration 中的文件“ComponentsCache.javacache”。并重启talend open studio。现在您的 tHbaseInput 组件将从 Hbase 表中获取行键。这可能不适用于每种情况,但如果您使用 talend open studio 生成作业并将 jar 部署到其他地方,这可能会有所帮助。

感谢我的项目经理。

于 2017-01-31T13:37:16.373 回答