0

google.api_core.exceptions.ServiceUnavailable: 503 Deadline Exceeded
using python 3.7 ,google-cloud-pubsub ==1.1.0 在主题上发布数据。在我的本地机器上,它工作得非常好,能够发布关于该主题的数据,还能够通过订阅者从该主题中提取数据。但不明白当我在服务器上部署代码时它不起作用并且它因内联错误而失败但是当我在服务器上显式调用发布者方法时它也在服务器框上发布正常。代码在发布时在下一行失败:

future = publisher.publish(topic_path, data=data)

**ERROR:2020-02-20 14:24:42,714 ERROR Failed to publish 1 messages.**
Trackback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 57, in error_remapped_callable
    return callable_(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 826, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/usr/local/lib/python3.7/site-packages/grpc/_channel.py", line 729, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Deadline Exceeded"
        debug_error_string = "{"created":"@1582208682.711481693","description":"Deadline Exceeded","file":"src/core/ext/filters/deadline/deadline_filter.cc","file_line":69,"grpc_status":14}"
The above exception was the direct cause of the following exception: 
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/google/api_core/retry.py", line 184, in retry_target
    return target()
  File "/usr/local/lib/python3.7/site-packages/google/api_core/timeout.py", line 214, in func_with_timeout
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/google/api_core/grpc_helpers.py", line 59, in error_remapped_callable
    six.raise_from(exceptions.from_grpc_error(exc), exc)
  File "<string>", line 3, in raise_from
google.api_core.exceptions.ServiceUnavailable: 503 Deadline Exceeded

上述异常是以下异常的直接原因:

Traceback(最近一次调用最后一次):文件“/usr/local/lib/python3.7/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py”,第 219 行,在 _commit response = self._client .api.publish(self._topic, self._messages) 文件“/usr/local/lib/python3.7/site-packages/google/cloud/pubsub_v1/gapic/publisher_client.py”,第 498 行,在发布请求中,重试=重试,超时=超时,元数据=元数据文件“/usr/local/lib/python3.7/site-packages/google/api_core/gapic_v1/method.py”,第 143 行,调用中 返回wrapped_func(*args,**kwargs)文件“/usr/local/lib/python3.7/site-packages/google/api_core/retry.py”,第286行,在retry_wrapped_func on_error=on_error,文件“/usr/ local/lib/python3.7/site-packages/google/api_core/retry.py”,第 206 行,在 retry_target last_exc 中,文件“”,第 3 行,在 raise_from google.api_core.exceptions.RetryError 中:超过 60.0s 的最后期限在 0x7f67d064e950> 调用 functools.partial(.error_remapped_callable>

4

1 回答 1

0

您应该尝试将数据分块为合理大小的块 (max_messages),并且不要忘记添加 done 回调。

# Loop over json containing records/rows
for idx, row in enumerate(rows_json):
    publish_json(row, idx, rowmax=len(rows_json), topic_name)

# Publish messages asynchronous 
def publish_json(msg, rowcount, rowmax, topic_project_id, topic_name):
    batch_settings = pubsub_v1.types.BatchSettings(max_messages=100)
    publisher = pubsub_v1.PublisherClient(batch_settings)
    topic_path = publisher.topic_path(topic_project_id, topic_name)
    future = publisher.publish(
        topic_path, bytes(json.dumps(msg).encode('utf-8')))
    future.add_done_callback(
        lambda x: logging.info(
            'Published msg with ID {} ({}/{} rows).'.format(
                future.result(), rowcount, rowmax))
    )
于 2020-02-21T14:19:42.807 回答