0

我是 Apache 梁的新手,我正在使用 Apache 梁,并在 GCP 中使用 Dataflow 作为运行程序。执行管道时出现以下错误。

coder of type class org.apache.beam.sdk.coders.ListCoder has a #structuralValue method which does not return true when the encoding of the elements is equal. Element [Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:06:02.000Z, companyId=242, startTime=2020-04-01T09:00:33.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:07:47.000Z, companyId=242, startTime=2020-04-01T09:06:03.000Z], Person [businessDay=01042020, departmentId=101, endTime=2020-04-01T09:48:25.000Z, companyId=242, startTime=2020-04-01T09:07:48.000Z]]

PCollection 类似于 PCollection< KV < String,List < Person > > > 和 PCollection< KV < String,Iterable < List < Person > > >

我已经将 Person 实现为可序列化的 POJO 类并覆盖 equals 和 hash 方法。但我认为我还需要为 person 编写自定义 ListCoder 并在管道中注册。我不知道如何解决这个问题,请帮助。

4

1 回答 1

2

这是一个工作示例。如果你克隆 repo,在playground根目录下,运行./gradlew run,然后你可以验证效果。您也可以运行 with./gradlew run --args='--runner=DataflowRunner --project=$YOUR_PROJECT_ID --tempLocation=gs://xxx/staging --stagingLocation=gs://xxx/staging'在 Dataflow 上运行它。

Person如果您从头开始构建,该类应如下所示:

class Person implements Serializable {
  public Person(
      String businessDay,
      String departmentId,
      String companyId
  ) {
    this.businessDay = businessDay;
    this.departmentId = departmentId;
    this.companyId = companyId;
  }

  public String companyId() {
    return companyId;
  }

  public String businessDay() {
    return businessDay;
  }

  public String departmentId() {
    return departmentId;
  }

  @Override
  public boolean equals(Object other) {
    if (this == other) {
      return true;
    }
    if (other == null) {
      return false;
    }
    if (getClass() != other.getClass()) {
      return false;
    }
    Person otherPerson = (Person) other;
    return this.businessDay.equals(otherPerson.businessDay)
        && this.departmentId.equals(otherPerson.departmentId)
        && this.companyId.equals(otherPerson.companyId);
  }

  @Override
  public int hashCode(){
    return Objects.hash(this.businessDay, this.departmentId, this.companyId);
  }

  private final String businessDay;
  private final String departmentId;
  private final String companyId;
}

我建议

  • 使用AutoValue而不是从头开始创建 POJO。下面是一些例子您可以在这里查看整个项目。优点是您不必每次创建新对象类型时都从头开始实现equalsand 。hashCode

  • 在 KV 中,如果键是诸如 List 之类的可迭代对象,则将其包装在一个对象中并明确地确定性地序列化它(例如),因为 Java 中的序列化是不确定的。

于 2020-06-12T22:39:14.367 回答