1

更新:我会立即自己回答这个问题(此代码有效):

我的自定义上传代码基于:https ://developers.google.com/bigquery/loading-data-into-bigquery#loaddatapostrequest

import sys
import json

from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run
from apiclient.errors import HttpError

import httplib2

FLOW = OAuth2WebServerFlow(
    client_id='xxxxxxx.apps.googleusercontent.com',
    client_secret='shhhhhhhhhhhh',
    scope='https://www.googleapis.com/auth/bigquery',
    user_agent='my-program-name/1.0')

def loadTable(http, service):
  projectId = 'drc-compute'
  datasetId = 'standing'
  import time
  tableId = 'test_' + str(int(time.time()))

  url = "https://www.googleapis.com/upload/bigquery/v2/projects/" + projectId + "/jobs"
  schema = open('test_schema.json', 'r')

  # Create the body of the request, separated by a boundary of xxx
  newresource = ('--xxx\n' +
            'Content-Type: application/json; charset=UTF-8\n' + '\n' +
            '{\n' +
            '   "configuration": {\n' +
            '     "load": {\n' +
            '     "sourceFormat": "NEWLINE_DELIMITED_JSON",\n' +
            '      "schema": {\n'
            '         "fields": ' + schema.read() + '\n' +
            '      },\n' +
            '      "destinationTable": {\n' +
            '        "projectId": "' + projectId + '",\n' +
            '        "datasetId": "' + datasetId + '",\n' +
            '        "tableId": "' + tableId + '"\n' +
            '      }\n' +
            '    }\n' +
            '  }\n' +
            '}\n' +
            '--xxx\n' +
            'Content-Type: application/octet-stream\n' +
            '\n')

  # Append data from the specified file to the request body
  f = open('test.json', 'r')
  newresource += f.read().replace('\n', '\r\n')

  # Signify the end of the body
  newresource += ('--xxx--\n')

  print newresource

  headers = {'Content-Type': 'multipart/related; boundary=xxx'}
  resp, content = http.request(url, method="POST", body=newresource, headers=headers)

  if not resp.status == 200:
    print resp
    print content
  else:
    jsonResponse = json.loads(content)
    jobReference = jsonResponse['jobReference']['jobId']
    import time
    while True:
     jobCollection = service.jobs()
     getJob = jobCollection.get(projectId=projectId, jobId=jobReference).execute()
     currentStatus = getJob['status']['state']

     if 'DONE' == currentStatus:
      print "Done Loading!"
      return

     else:
      print 'Waiting to load...'
      print 'Current status: ' + currentStatus
      print time.ctime()
      time.sleep(10)

def main(argv):
  # If the credentials don't exist or are invalid, run the native client
  # auth flow. The Storage object will ensure that if successful the good
  # credentials will get written back to a file.
  storage = Storage('bigquery2.dat') # Choose a file name to store the credentials.
  credentials = storage.get()
  if credentials is None or credentials.invalid:
    credentials = run(FLOW, storage)

  # Create an httplib2.Http object to handle our HTTP requests and authorize it
  # with our good credentials.
  http = httplib2.Http()
  http = credentials.authorize(http)

  service = build('bigquery','v2', http=http)

  #datasets = service.datasets().list(projectId='917370487687').execute()

  loadTable(http, service)

if __name__ == '__main__':
  main(sys.argv)

除了在可以打开浏览器并登录到谷歌的机器上运行一次之外,您还需要自己的 bigqueryclient_id并进行复制。client_secret然后 bigquery2.dat 将存储 oauth2 刷新令牌等。我正在使用的简单测试数据只是:

测试.json

{"asdf": "dd"}
{"asdf": "ax"}

test_schema.json

[
  {
    "type": "STRING",
    "name": "asdf",
    "mode": "NULLABLE"
  }
]
4

1 回答 1

2

不要让这个问题悬而未决,因为您已经在问题部分回答了 - 感谢@noonien 评论:

“记得将加载属性中的 sourceFormat 设置为 NEWLINE_DELIMITED_JSON”

于 2013-10-30T18:16:27.787 回答