Apache Flink 的新手并在玩,我正在尝试通过 JSON 字段键实现接收器分区。
以下是插入 Kinesis Data Stream 的示例数据:
{"user_id": 1337, "some_field": "data"}
{"user_id": 55, "some_field": "data"}
我希望 Apache Flink 作业通过 Kinesis Data Stream 使用该数据,然后通过为键添加“user_id”值作为前缀来保存到 S3 中,例如/user-1337/data-partition.json仅保存该 user_id 字段的位置。
这是一个示例代码:
public class LogProcessingJob {
private static final ObjectMapper jsonParser = new ObjectMapper();
private static final String region = "us-east-1";
private static final String inputStreamName = "testing-apache-flink";
private static final String s3SinkPath = "s3a://testing-apache-flink/data";
private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
Properties inputProperties = new Properties();
inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
}
private static StreamingFileSink<Tuple2> createS3SinkFromStaticConfig() {
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix") // HOW TO GET user_id here?
.withPartSuffix(".json")
.build();
return StreamingFileSink
.forRowFormat(new Path(s3SinkPath), new SimpleStringEncoder<Tuple2>("UTF-8"))
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withOutputFileConfig(config)
.build();
}
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
/* if you would like to use runtime configuration properties, uncomment the lines below
* DataStream<String> input = createSourceFromApplicationProperties(env);
*/
DataStream<String> input = createSourceFromStaticConfig(env);
input.map(value -> { // Parse the JSON
JsonNode jsonNode = jsonParser.readValue(value, JsonNode.class);
return new Tuple2(jsonNode.get("user_id").asInt(),
jsonNode.get("status").asText());
}).returns(Types.TUPLE(Types.INT, Types.STRING))
.keyBy(event -> event.f0) // partition by user_id
.addSink(createS3SinkFromStaticConfig());
env.execute("Process log files");
}
}
如何让 user_id 进入 OutputFileConfig 或者有更好的方法吗?