2

我没有得到谷歌示例工作

https://cloud.google.com/hadoop/examples/bigquery-connector-spark-example PySpark

我认为代码中有一些错误,例如:

'# 输出参数'mapred.bq.project.id': '',

应该是:'mapred.bq.output.project.id':'',

'# 将数据写回到新的 BigQuery 表中。
'# BigQueryOutputFormat 丢弃键,因此将键设置为无。
(word_counts
.map(lambda 对:无,json.dumps(pair))
.saveAsNewAPIHadoopDataset(conf))

会给出错误信息。如果我将其更改为:
(word_counts
.map(lambda pair: (None, json.dumps(pair)))
.saveAsNewAPIHadoopDataset(conf))

我收到错误消息:
org.apache.hadoop.io.Text 无法转换为 com.google.gson.JsonObject

无论我尝试什么,我都无法完成这项工作。
在 BigQuery 中创建了一个数据集,其名称是我在 'conf' 中给它的名称,结尾带有 '_hadoop_temporary_job_201512081419_0008'
并且最后创建了一个带有 '_attempt_201512081419_0008_r_000000_0' 的表。但总是空的

有人可以帮我吗?
谢谢

4

1 回答 1

2

我们正在努力更新文档,因为正如您所指出的,在这种情况下文档不正确。对于那个很抱歉!在我们努力更新文档时,我想尽快给您答复。

铸造问题

你提到的最重要的问题是选角问题。不幸的是,PySpark 不能使用BigQueryOutputFormat创建 Java GSON 对象。解决方案(解决方法)是将输出数据保存到Google Cloud Storage (GCS) 中,然后使用bq命令手动加载。

代码示例

这是导出到 GCS 并将数据加载到 BigQuery 的代码示例。您还可以使用subprocessPython 以编程方式执行bq命令。

#!/usr/bin/python
"""BigQuery I/O PySpark example."""
import json
import pprint
import pyspark

sc = pyspark.SparkContext()

# Use the Google Cloud Storage bucket for temporary BigQuery export data used
# by the InputFormat. This assumes the Google Cloud Storage connector for
# Hadoop is configured.
bucket = sc._jsc.hadoopConfiguration().get('fs.gs.system.bucket')
project = sc._jsc.hadoopConfiguration().get('fs.gs.project.id')
input_directory ='gs://{}/hadoop/tmp/bigquery/pyspark_input'.format(bucket)

conf = {
    # Input Parameters
    'mapred.bq.project.id': project,
    'mapred.bq.gcs.bucket': bucket,
    'mapred.bq.temp.gcs.path': input_directory,
    'mapred.bq.input.project.id': 'publicdata',
    'mapred.bq.input.dataset.id': 'samples',
    'mapred.bq.input.table.id': 'shakespeare',
}

# Load data in from BigQuery.
table_data = sc.newAPIHadoopRDD(
    'com.google.cloud.hadoop.io.bigquery.JsonTextBigQueryInputFormat',
    'org.apache.hadoop.io.LongWritable',
    'com.google.gson.JsonObject',
    conf=conf)

# Perform word count.
word_counts = (
    table_data
    .map(lambda (_, record): json.loads(record))
    .map(lambda x: (x['word'].lower(), int(x['word_count'])))
    .reduceByKey(lambda x, y: x + y))

# Display 10 results.
pprint.pprint(word_counts.take(10))

# Stage data formatted as newline delimited json in Google Cloud Storage.
output_directory = 'gs://{}/hadoop/tmp/bigquery/pyspark_output'.format(bucket)
partitions = range(word_counts.getNumPartitions())
output_files = [output_directory + '/part-{:05}'.format(i) for i in partitions]

(word_counts
 .map(lambda (w, c): json.dumps({'word': w, 'word_count': c}))
 .saveAsTextFile(output_directory))

# Manually clean up the input_directory, otherwise there will be BigQuery export
# files left over indefinitely.
input_path = sc._jvm.org.apache.hadoop.fs.Path(input_directory)
input_path.getFileSystem(sc._jsc.hadoopConfiguration()).delete(input_path, True)

print """
###########################################################################
# Finish uploading data to BigQuery using a client e.g.
bq load --source_format NEWLINE_DELIMITED_JSON \
    --schema 'word:STRING,word_count:INTEGER' \
    wordcount_dataset.wordcount_table {files}
# Clean up the output
gsutil -m rm -r {output_directory}
###########################################################################
""".format(
    files=','.join(output_files),
    output_directory=output_directory)
于 2015-12-18T00:25:50.203 回答