2

Apache Beam 根据上一行的值更新值

我已经对 CSV 文件中的值进行了分组。在分组的行中,我们发现一些缺失值需要根据前一行的值进行更新。如果该行的第一列是空的,那么我们需要将其更新为 0。

我可以对记录进行分组,但无法找出更新值的逻辑,我该如何实现?

记录

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89482 2012 年 1 月 1 日
BS:89483 2012 年 1 月 1 日 300
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 2 日

分组记录

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 1 日
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 1 日 300
BS:89483 2012 年 1 月 2 日

更新缺失值

客户ID 日期 数量
BS:89481 2012 年 1 月 1 日 100
BS:89481 2012 年 1 月 2 日 900
BS:89482 2012 年 1 月 1 日 000
BS:89482 2012 年 1 月 2 日 200
BS:89483 2012 年 1 月 1 日 300
BS:89483 2012 年 1 月 2 日 300

到目前为止的代码:

public class GroupByTest {
    public static void main(String[] args) throws IOException {
        System.out.println("We are about to start!!");

        final File schemaFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\schema_transform2.avsc");

        File csvFile = new File(
                "C:\\AI\\Workspace\\office\\lombok\\artifact\\src\\main\\resources\\CustomerRequest-case2.csv");
        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions 
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows
        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));

        // Transformation
        //Convert row to KV
        final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
            .aggregateField("balance", Sum.ofDoubles(), "balances");

        final PCollection<Row> aggregagte = rows.apply(combine);

        PCollection<String> pOutput=aggregagte.apply(Select.flattenedSchema()).apply(ParDo.of(new RowToString()));
        
                        
        
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    private static String getColumnValue(String columnName, Row row, Schema sourceSchema) {
        String type = sourceSchema.getField(columnName).schema().getType().toString().toLowerCase();
        LogicalType logicalType = sourceSchema.getField(columnName).schema().getLogicalType();
        if (logicalType != null) {
            type = logicalType.getName();
        }

        switch (type) {
        case "string":
            return row.getString(columnName);
        case "int":
            return Objects.requireNonNull(row.getInt32(columnName)).toString();
        case "bigint":
            return Objects.requireNonNull(row.getInt64(columnName)).toString();
        case "double":
            return Objects.requireNonNull(row.getDouble(columnName)).toString();
        case "timestamp-millis":
            return Instant.ofEpochMilli(Objects.requireNonNull(row.getDateTime("eventTime")).getMillis()).toString();

        default:
            return row.getString(columnName);

        }
    }



}

修改代码: 原始代码

final Group.CombineFieldsByFields<Row> combine = Group.<Row>byFieldNames("customerId", "date")
        .aggregateField("amount", Sum.ofDoubles(), "balances");

按客户 ID 分组

class ToKV extends DoFn<Row, KV<String, Row>> {

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;

    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString(), row));
    }

    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }


}

按客户 ID 分组:

ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
    
    
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());

// 尝试按日期分组

    PCollection<Row> outputRow = 
            groupedKVRows
            .apply(ParDo.of(new GroupByDate()))
            .setCoder(RowCoder.of(AvroUtils.toBeamSchema(schema)));

如何编写将 Iterable 转换为 pCollection 的逻辑,以便对日期进行排序。

class GroupByDate extends DoFn<KV<String,Iterable<Row>>, Row> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        Iterable<Row> rows = context.element().getValue();
        
    
        
        
    }

Avro 架构:

{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "customerId",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ],
    "logicalType": "date"
    
  }, {
    "name" : "amount",
    "type" : [ "double", "null" ]
  } ]
}

更新将 PCollection 转换为 Row[]

class KVToRow extends DoFn<KV<String, Iterable<Row>>, Row[]> {

    private static final long serialVersionUID = -1345126662309830332L;

    @ProcessElement
    public void processElement(ProcessContext context) {
        String strKey = context.element().getKey();
        List<Row> rowList = new ArrayList();
        Iterable<Row> rowValue = context.element().getValue();
        rowValue.forEach(data -> {
            rowList.add(data);

        });
        Row[] rowArray = new Row[rowList.size()-1];
        rowArray=rowList.toArray(rowArray);
        context.output(rowArray);
    }
}

建议代码

Row[] rowArray = Iterables.toArray(rows, Row.class);

错误:

Iterables 类型中的 toArray(Iterable<? extends T>, Class) 方法不适用于参数 (PCollection, Class)

将可迭代对象转换为数组

Row[] rowArray =  groupedKVRows.apply(ParDo.of(new KVToRow()));

错误:

此行有多个标记 - 类型不匹配:无法从 PCollection<Row[]> 转换为 Row[] - 1 行已更改,2 行已删除

4

1 回答 1

5

Beam 不提供任何订单保证,因此您必须像以前那样对它们进行分组。

但据我所知,您需要按customerId. 之后,您可以应用 ParDo 之类的 PTransform 对分组的行进行排序,date并根据需要填充缺失值。

通过转换为 Array 进行排序的示例

static class SortAndForwardFillFn extends DoFn<KV<String, Iterable<Row>>> {

    @ProcessElement
    public void processElement(@Element KV<String, Iterable<Row>> element, OutputReceiver<KV<String, Iterable<Row>>> outputReceiver) {

        // Create a formatter for parsing dates
        DateTimeFormatter formatter = DateTimeFormat.forPattern("dd/MM/yyyy HH:mm:ss");

        // Convert iterable to array
        Row[] rowArray = Iterables.toArray(rows, Row.class);

        // Sort array using dates
        Arrays
            .sort(
                rowArray,
                Comparator
                .comparingLong(row -> formatter.parseDateTime(row.getString("date")).getMillis())
        );

        // Store the last amount
        Double lastAmount = 0.0;

        // Create a List for storing sorted and filled rows
        List<Row> resultRows = new ArrayList<>(rowArray.length);

        // Iterate over the array and fill in the missing parts
        for (Row row : rowArray) {

            // Get current amount
            Double currentAmount = row.getDouble("amount");

            // If null, fill the previous value and add to results, 
            // otherwise add as it is
            resultRows.add(...);
        }

        // Output using the output receiver
        outputReceiver
            .output(
                KV.of(element.getKey(), resultRows)
            )
        );
    }
}
于 2021-11-04T01:52:39.413 回答