我正在尝试在本地计算机上运行连接器示例,但不断收到 UnknownHostException。如何使用 Hadoop 连接器配置对 BigQuery 的访问?
package com.mycompany.dataproc;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Tuple2;
public class BigQueryAccessExample {
JavaSparkContext jsc ;
public BigQueryAccessExample(JavaSparkContext jsc){
}
public static void main(String[] args) throws Exception{
SparkConf conf = new SparkConf()
.setAppName("BigQuery Reader").setMaster("local[5]");
conf.set("spark.serializer", org.apache.spark.serializer.KryoSerializer.class.getName());
JavaSparkContext jsc = new JavaSparkContext(conf);
String projectId = "mycompany-data";
String fullyQualifiedInputTableId = "mylogs.display20151030";
Configuration hadoopConfiguration = jsc.hadoopConfiguration();
//BigQueryConfiguration.
// Set the job-level projectId.
hadoopConfiguration.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Use the systemBucket for temporary BigQuery export data used by the InputFormat.
String bucket = "my-spark-test";
hadoopConfiguration.set(BigQueryConfiguration.GCS_BUCKET_KEY, bucket);
com.google.cloud.hadoop.io.bigquery.
// Configure input and output for BigQuery access.
BigQueryConfiguration.configureBigQueryInput(hadoopConfiguration, fullyQualifiedInputTableId);
//BigQueryConfiguration.configureBigQueryOutput(conf, fullyQualifiedOutputTableId, outputTableSchema);
JavaPairRDD<LongWritable, JsonObject> tableData = jsc.newAPIHadoopRDD(hadoopConfiguration, GsonBigQueryInputFormat.class, LongWritable.class, JsonObject.class);
//tableData.count();
JavaRDD<JsonObject> myRdd = tableData.map(new Function<Tuple2<LongWritable, JsonObject>, JsonObject>() {
public JsonObject call(Tuple2<LongWritable, JsonObject> v1) throws Exception {
System.out.println(String.format("idx: %s val: %s", v1._1(), v1._2().toString()));
return v1._2();
}
});
myRdd.take(10);
}
}
但我明白了UnknownHostException
java.net.UnknownHostException: metadata
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:184)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
at sun.net.www.http.HttpClient.New(HttpClient.java:308)
at sun.net.www.http.HttpClient.New(HttpClient.java:326)
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104)
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998)
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932)
at com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93)
at com.google.api.client.http.HttpRequest.execute(HttpRequest.java:972)
at com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:142)
at com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:489)
at com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:189)
at com.google.cloud.hadoop.util.CredentialConfiguration.getCredential(CredentialConfiguration.java:71)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.createBigQueryCredential(BigQueryFactory.java:81)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQuery(BigQueryFactory.java:101)
at com.google.cloud.hadoop.io.bigquery.BigQueryFactory.getBigQueryHelper(BigQueryFactory.java:89)
at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getBigQueryHelper(AbstractBigQueryInputFormat.java:363)
at com.google.cloud.hadoop.io.bigquery.AbstractBigQueryInputFormat.getSplits(AbstractBigQueryInputFormat.java:102)
at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:115)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
at org.apache.spark.rdd.RDD$$anonfun$take$1.apply(RDD.scala:1277)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:306)
at org.apache.spark.rdd.RDD.take(RDD.scala:1272)
at org.apache.spark.api.java.JavaRDDLike$class.take(JavaRDDLike.scala:494)
at org.apache.spark.api.java.AbstractJavaRDDLike.take(JavaRDDLike.scala:47)
at com.mycompany.dataproc.BigQueryAccessExample.main(BigQueryAccessExample.java:57)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
我似乎需要设置访问凭据或权限..但我没有看到任何有关此的文档。
我从https://console.developers.google.com/project/ /apiui/credential 下载了凭据并进行了设置,GOOGLE_APPLICATION_CREDENTIALS
但这似乎不起作用。
有什么帮助吗?