示例骨架代码如下所示,我基本上是从 bigquery 读取 RDD 并选择 my_field_name 值为 null 的所有数据点
JavaPairRDD<String, GenericData.Record> input = sc
.newAPIHadoopRDD(hadoopConfig, AvroBigQueryInputFormat.class, LongWritable.class, GenericData.Record.class)
.mapToPair( tuple -> {
GenericData.Record record = tuple._2;
Object rawValue = record.get(my_field_name); // Problematic !! want to get my_field_name of this bq row, but just gave something not making sense
String partitionValue = rawValue == null ? "EMPTY" : rawValue.toString();
return new Tuple2<String, GenericData.Record>(partitionValue, record);
}).cache();
JavaPairRDD<String, GenericData.Record> emptyData =
input.filter(tuple -> StringUtils.equals("EMPTY", tuple._1));
emptyData.values().saveAsTextFile(my_file_path)
然而,输出 RDD 似乎完全出乎意料。特别是 my_field_name 的值似乎完全随机。经过一些调试,似乎过滤是按预期进行的,但问题在于我从GenericData.Record
(基本上record.get(my_field_name)
)提取的值似乎完全随机。
因此,在我从 AvroBigQueryInputFormat 切换到 GsonBigQueryInputFormat 以读取 json 中的 bq 之后,这段代码似乎工作正常。
但是,理想情况下,我真的想改用 Avro(它应该比处理 json 快得多),但是它在我的代码中的当前行为完全令人不安。我只是使用 AvroBigQueryInputFormat 错误?