0

Apache Flink 的新手并在玩,我正在尝试通过 JSON 字段键实现接收器分区。

以下是插入 Kinesis Data Stream 的示例数据:

{"user_id": 1337, "some_field": "data"}
{"user_id": 55, "some_field": "data"}

我希望 Apache Flink 作业通过 Kinesis Data Stream 使用该数据,然后通过为键添加“user_id”值作为前缀来保存到 S3 中,例如/user-1337/data-partition.json仅保存该 user_id 字段的位置。

这是一个示例代码:

public class LogProcessingJob {
    private static final ObjectMapper jsonParser = new ObjectMapper();
    private static final String region = "us-east-1";
    private static final String inputStreamName = "testing-apache-flink";
    private static final String s3SinkPath = "s3a://testing-apache-flink/data";

    private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
        Properties inputProperties = new Properties();
        inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
        inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

        return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
    }

    private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix") // HOW TO GET user_id here?
                .withPartSuffix(".json")
                .build();

        return StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withOutputFileConfig(config)
                .build();
    }

    public static void main(String[] args) throws Exception {
        // set up the streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /* if you would like to use runtime configuration properties, uncomment the lines below
         * DataStream<String> input = createSourceFromApplicationProperties(env);
         */
        DataStream<String> input = createSourceFromStaticConfig(env);

        input.map(value -> { // Parse the JSON
            JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
            return new Tuple2(jsonNode.get("user_id").asInt(),
                    jsonNode.get("status").asText());
        }).returns(Types.TUPLE(Types.INT, Types.STRING))
                .keyBy(event -> event.f0) // partition by user_id
                .addSink(createS3SinkFromStaticConfig());

        env.execute("Process log files");
    }
}

如何让 user_id 进入 OutputFileConfig 或者有更好的方法吗?

4

1 回答 1

1

Flink 提供了一个BucketAssigner接口,允许你指定 Bucket每个传入的元素应该放入。

根据文件,

/**
 * A BucketAssigner is used with a {@link StreamingFileSink} to determine the {@link Bucket} each incoming element
 * should be put into.
 *
 * <p>The {@code StreamingFileSink} can be writing to many buckets at a time, and it is responsible for managing
 * a set of active buckets. Whenever a new element arrives it will ask the {@code BucketAssigner} for the bucket the
 * element should fall in. The {@code BucketAssigner} can, for example, determine buckets based on system time.
 *
 * @param <IN> The type of input elements.
 * @param <BucketID> The type of the object returned by the {@link #getBucketId(Object, BucketAssigner.Context)}. This has to have
 *                  a correct {@link #hashCode()} and {@link #equals(Object)} method. In addition, the {@link Path}
 *                  to the created bucket will be the result of the {@link #toString()} of this method, appended to
 *                  the {@code basePath} specified in the {@link StreamingFileSink StreamingFileSink}.
 */

因此,在您的情况下,您可以实现这样的自BucketAssigner定义

public class UserIdBucketAssigner<T extends Tuple2> implements BucketAssigner<T, String> {
  private Long serialVersionUID = 1L

  public String getBucketId(T element, BucketAssigner.Context context) {
    return element._1
  }

  public SimpleVersionedSerializer<String> getSerializer() {
    SimpleVersionedStringSerializer.INSTANCE
}

}

并在StreamingFileSink施工时指定:

private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("prefix") // HOW TO GET user_id here?
                .withPartSuffix(".json")
                .build();

        return StreamingFileSink
                .forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withBucketAssigner(new UserIdBucketAssigner<Tuple2>)
                .withOutputFileConfig(config)
                .build();
    }
于 2020-08-07T09:37:12.217 回答