我为 BucketingSink 创建了一个作家。接收器和编写器可以正常工作,但是当编写器将 avro genericrecord 写入镶木地板时,该文件是从进行中创建的,等待完成。但是文件是空的,只有 0 个字节。谁能告诉我代码有什么问题?我尝试将 AvroParquetWriter 的初始化放在 open() 方法中,但结果仍然相同。
调试代码时,我确认 writer.write(element) 确实执行并且 element 包含 avro genericrecord 数据
流数据
BucketingSink<DataEventRecord> sink =
new BucketingSink<DataEventRecord>("hdfs://localhost:9000/tmp/");
sink.setBucketer(new DateTimeBucketer<DataEventRecord>("yyyy-MM-dd--HHmm"));
sink.setWriter(new ParquetSinkWriter<DataEventRecord>());
ParquetSinkWriter
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.streaming.connectors.fs.StreamWriterBase;
import org.apache.flink.streaming.connectors.fs.Writer;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.any.DataEventRecord;
public class ParquetSinkWriter<T> extends StreamWriterBase<T> {
private transient ParquetWriter<GenericRecord> writer;
private Path path;
private FileSystem fs;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int blockSize = 256 * 1024 * 1024;
private final int pageSize = 64 * 1024;
@Override
// workaround
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.path = path;
this.fs = fs;
}
@Override
public void write(T event) throws IOException {
DataEventRecord element = (DataEventRecord) event;
if (writer == null) {
writer = new AvroParquetWriter<GenericRecord>(this.path, element.getSchema(), compressionCodecName, blockSize, pageSize);
}
if (writer != null) {
GenericRecord datum = element.getRecord();
writer.write(datum);
}
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
}
super.close();
}
@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<T>();
}
}