我们创建了一个 Mapreduce 作业以将数据注入 BigQuery。我们的工作没有太多的过滤功能,所以我们想让它只做地图工作,以使其更快、更高效。
但是,BigQuery 接受的 java 类“com.google.gson.JsonObject”没有实现 hadoop Mapper 接口所需的 Writable 接口。JsonObject 也是最终的,我们不能扩展它......
关于我们如何解决这个问题的任何建议?
谢谢,
我们创建了一个 Mapreduce 作业以将数据注入 BigQuery。我们的工作没有太多的过滤功能,所以我们想让它只做地图工作,以使其更快、更高效。
但是,BigQuery 接受的 java 类“com.google.gson.JsonObject”没有实现 hadoop Mapper 接口所需的 Writable 接口。JsonObject 也是最终的,我们不能扩展它......
关于我们如何解决这个问题的任何建议?
谢谢,
附加到威廉的回应:我想自己测试一下,我创建了一个安装了 bigquery 连接器的新集群,然后运行了以下仅地图作业:
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryOutputFormat;
import com.google.common.base.Splitter;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.regex.Pattern;
/**
* An example MapOnlyJob with BigQuery output
*/
public class MapOnlyJob {
public static class MapOnlyMapper extends Mapper<LongWritable, Text, LongWritable, JsonObject> {
private static final LongWritable KEY_OUT = new LongWritable(0L);
// This requires a new version of guava be included in a shaded / repackaged libjar.
private static final Splitter SPLITTER =
Splitter.on(Pattern.compile("\\s+"))
.trimResults()
.omitEmptyStrings();
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
for (String word : SPLITTER.split(line)) {
JsonObject json = new JsonObject();
json.addProperty("word", word);
json.addProperty("mapKey", key.get());
context.write(KEY_OUT, json);
}
}
}
/**
* Configures and runs the main Hadoop job.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
if (args.length != 3) {
System.out.println("Usage: hadoop MapOnlyJob "
+ "[projectId] [input_file] [fullyQualifiedOutputTableId]");
String indent = " ";
System.out.println(indent
+ "projectId - Project under which to issue the BigQuery operations. "
+ "Also serves as the default project for table IDs which don't explicitly specify a "
+ "project for the table.");
System.out.println(indent
+ "input_file - Input file pattern of the form "
+ "gs://foo/bar*.txt or hdfs:///foo/bar*.txt or foo*.txt");
System.out.println(indent
+ "fullyQualifiedOutputTableId - Output table ID of the form "
+ "<optional projectId>:<datasetId>.<tableId>");
System.exit(1);
}
// Global parameters from args.
String projectId = args[0];
// Set InputFormat parameters from args.
String inputPattern = args[1];
// Set OutputFormat parameters from args.
String fullyQualifiedOutputTableId = args[2];
// Default OutputFormat parameters for this sample.
String outputTableSchema =
"[{'name': 'word','type': 'STRING'},{'name': 'mapKey','type': 'INTEGER'}]";
Configuration conf = parser.getConfiguration();
Job job = Job.getInstance(conf);
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Set classes and configure them:
job.setOutputFormatClass(BigQueryOutputFormat.class);
BigQueryConfiguration.configureBigQueryOutput(
job.getConfiguration() /* Required as Job made a new Configuration object */,
fullyQualifiedOutputTableId,
outputTableSchema);
// Configure file-based input:
FileInputFormat.setInputPaths(job, inputPattern);
job.setJarByClass(MapOnlyMapper.class);
job.setMapperClass(MapOnlyMapper.class);
// The key will be discarded by BigQueryOutputFormat.
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(JsonObject.class);
// Make map-only
job.setNumReduceTasks(0);
job.waitForCompletion(true);
}
}
我有以下依赖项:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.cloud.bigdataoss</groupId>
<artifactId>bigquery-connector</artifactId>
<version>0.7.0-hadoop1</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>18.0</version>
</dependency>
您应该能够使用 Hadoop 的 BigQuery 连接器(请参阅https://cloud.google.com/hadoop/bigquery-connector),它提供了 Hadoop OutputFormat 类的实现。