5

我在 util 类中有这些方法,它们将特定的 PCollection 转换为特定的 PTable。

public static PTable<IdDetails, CASegmentsForModification> getPTableForCASegments(PCollection<CASegmentsForModification> aggregatedPCollectionForCASegments) {
    return aggregatedPCollectionForCASegments.parallelDo(new CASegmentsPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(CASegmentsForModification.class)));
}

public static PTable<IdDetails, UserPrimaryIdMapping> getPTableForPrimaryIdMapping(PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping) {
    return pCollectionOfUserPrimaryIdMapping.parallelDo(new UserPrimaryIdMappingPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping.class)));
}

public static PTable<IdDetails, UserGroupSegments> getPTableForUserGroupSegments(PCollection<UserGroupSegments> pCollectionOfUserGroupSegments) {
    return pCollectionOfUserGroupSegments.parallelDo(new UserGroupSegmentsPTableConverter(),
            Avros.tableOf(Avros.records(IdDetails.class), Avros.records(UserGroupSegments.class)));
}

如何实现上述方法的一种通用方法?

4

2 回答 2

2

有一种更好的方法是使用 PTables util 类中的静态asPtable方法。您的 PCollection 必须是 Pair 类型,并且 PTable 结果将是 PTable 类型

    public static <K,V> PTable<K,V> asPTable(PCollection<Pair<K,V>> pcollect)

根据您的示例,您只需要创建 DoFn(或扩展类)即可返回 Avros.pairs(Avros.records(yourClass.class), Avros.records(yourOtherClass.class))。

另一种方法是使用预定义的 MapFn,即ExtractKEYFn并将其应用于您的集合。您将需要实现 map 方法来提取键并生成键、值输出。本质上是一样的思路,之后就可以将 PCollection> 转换成 PTable

它应该为您节省大量样板代码。

以防万一,还有其他可能有用的功能,例如FilterFn,但是当您使用MemPipeline进行单元测试时,我们发现了一些错误。我建议的第一种方法应该是最安全的。

编辑:

保存一些代码的一个很好的平衡是使用字段名称根据字段名称获取您的密钥,并为每个 PCollection 调用此 MapFn。

//we are assuming the key will be in the first level of your record
public class GenericRecordToPair <V extends GenericRecord, K extends GenericRecord> extends MapFn<V, Pair<K, V>> {
    String key;

    public GenericRecordToPair(String key){
        this.key = key;
    }

    @Override
    public Pair<T, TupleN> map(S input) {
        return new Pair<K,V> (input.get(key), input);
    }

}

从您的示例中,您可以执行类似的操作

PCollection<UserGroupSegments> pCollectionOfUserGroupSegments = ...//comming from somewhere
PCollection<UserPrimaryIdMapping> pCollectionOfUserPrimaryIdMapping = ...//comming from somewhere
PTable<IdDetails, UserGroupSegments> pTable1 = PTables.asPTable(pCollectionOfUserGroupSegments.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserGroupSegments))));
PTable<IdDetails, UserPrimaryIdMapping> pTable2 = PTables.asPTable(pCollectionOfUserPrimaryIdMapping.parallelDo(new GenericRecordToPair("idDetails"), Avros.pairs(Avros.records(IdDetails.class), Avros.records(UserPrimaryIdMapping))));
于 2017-08-15T01:53:02.813 回答
1

这正是该PCollection.by方法的目的,它接受一个 MapFn 来生成密钥并返回一个 PTable,其中每个记录由该 MapFn 的结果作为密钥。

因此,您可以执行以下操作:

PTable<IdDetails, CASegmentsForModification> pTableForCASegments = aggregatedPCollectionForCASegments.by(
    new CASegmentsKeyMapFn(),
    Avros.records(IdDetails.class)
)
于 2017-09-05T01:20:12.820 回答