1

示例骨架代码如下所示,我基本上是从 bigquery 读取 RDD 并选择 my_field_name 值为 null 的所有数据点

    JavaPairRDD<String, GenericData.Record> input = sc
            .newAPIHadoopRDD(hadoopConfig, AvroBigQueryInputFormat.class, LongWritable.class, GenericData.Record.class)
            .mapToPair( tuple -> {
                GenericData.Record record = tuple._2;
                Object rawValue = record.get(my_field_name); // Problematic !! want to get my_field_name of this bq row, but just gave something not making sense
                String partitionValue = rawValue == null ? "EMPTY" : rawValue.toString();
                return new Tuple2<String, GenericData.Record>(partitionValue, record);
            }).cache();
    JavaPairRDD<String, GenericData.Record> emptyData = 
            input.filter(tuple -> StringUtils.equals("EMPTY", tuple._1));
    emptyData.values().saveAsTextFile(my_file_path)

然而,输出 RDD 似乎完全出乎意料。特别是 my_field_name 的值似乎完全随机。经过一些调试,似乎过滤是按预期进行的,但问题在于我从GenericData.Record(基本上record.get(my_field_name))提取的值似乎完全随机。

因此,在我从 AvroBigQueryInputFormat 切换到 GsonBigQueryInputFormat 以读取 json 中的 bq 之后,这段代码似乎工作正常。

但是,理想情况下,我真的想改用 Avro(它应该比处理 json 快得多),但是它在我的代码中的当前行为完全令人不安。我只是使用 AvroBigQueryInputFormat 错误?

4

0 回答 0