我尝试使用工作负载身份联合(用于身份验证)将消息从 Azure 函数应用推送到 GCP Pub/Sub。
观察:
- 如果我们使用具有 GCP 服务帐户的私有“密钥”的服务主体,而不是使用工作负载联合身份验证,则“Azure 函数应用程序”能够成功地将消息发布到 GCP 发布/订阅。
- 如果我使用 Azure VM 并从该 VM 运行代码,而不是使用“Azure Function App”,我可以使用工作负载身份联合发布消息
下面是我在 Azure 函数应用中运行代码与 Azure VM 中的代码相比所做的更改。
- 由于 Azure VM 实例使用元数据服务 url ( http://169.254.169.254/metadata/identity/oauth2/token?api-version=2019-08-01 ),因此将其替换为 (http://<identity_endpoint>/ msi/token?api-version=2019-08-01)
- 使用 url (http://<identity_endpoint>/msi/token?api-version=2019-08-01) 我可以毫无问题地生成令牌
资源:
- https://cloud.google.com/iam/docs/configuring-workload-identity-federation#gcloud
- https://cloud.google.com/iam/docs/using-workload-identity-federation#oidc_4
- 示例 json 配置 - https://google.aip.dev/auth/4117(搜索天蓝色)
import logging
import os
import json
import azure.functions as func
from azure.storage.blob import BlobClient
from google.cloud import pubsub_v1
from google.auth import identity_pool
import requests
def main(myblob: func.InputStream):
blob_client = BlobClient.from_blob_url("<blob storage url>")
download_stream = blob_client.download_blob()
blob_data = download_stream.readall()
jsonKey = json.loads(blob_data)
endpoint=os.environ["IDENTITY_ENDPOINT"]
endpoint_header=os.environ["IDENTITY_HEADER"]
resource_uri = os.environ["RESOURCE_URI"]
token_auth_uri = f"{endpoint}?resource={resource_uri}&api-version=2019-08-01"
head_msi = {'X-IDENTITY-HEADER':endpoint_header}
resp = requests.get(token_auth_uri, headers=head_msi)
access_token = resp.json()
logging.info(f"access_token: {access_token}")
jsonKey_to_str = json.dumps(jsonKey)
newJson = jsonKey_to_str.replace("http://169.254.169.254/metadata/identity/oauth2/token?api-version=2018-02-01&resource=app://<app_url>",token_auth_uri)
logging.info(f"jsonstring after substitute: {newJson}")
final_json_key = json.loads(newJson)
logging.info(f"jsonKey after substitute: {final_json_key}")
credentials = identity_pool.Credentials.from_info(final_json_key)
logging.info(f"credentials from IDP: {credentials.token}")
scoped_credentials = credentials.with_scopes(['https://www.googleapis.com/auth/cloud-platform'])
publisher = pubsub_v1.PublisherClient(credentials=scoped_credentials)
topic_path = publisher.topic_path("cpn-data-highway-test", "cpndata")
data_str = f"Message number infinite"
data = data_str.encode("utf-8")
future = publisher.publish(topic_path, data)
print(future.result())
Failure Exception: RetryError: Deadline of 600.0s exceeded while calling target function,
last exception: 503 Getting metadata from plugin failed with error: ('Unable to retrieve Identity Pool subject token', '')
Stack: File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py",
line 402, in _handle__invocation_request call_result = await self._loop.run_in_executor( File "/usr/local/lib/python3.9/concurrent/futures/thread.py",
line 58, in run result = self.fn(*self.args, **self.kwargs) File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/dispatcher.py",
line 611, in _run_sync_func return ExtensionManager.get_sync_invocation_wrapper(context, File "/azure-functions-host/workers/python/3.9/LINUX/X64/azure_functions_worker/extension.py", line 215, in _raw_invocation_wrapper result = function(**args) File "/home/site/wwwroot/BlobTriggerExample/__init__.py"
, line 47, in main print(future.result()) File "/home/site/wwwroot/.python_packages/lib/site-packages/google/cloud/pubsub_v1/publisher/futures.py"
, line 62, in result return super().result(timeout=timeout) File "/usr/local/lib/python3.9/concurrent/futures/_base.py"
, line 445, in result return self.__get_result() File "/usr/local/lib/python3.9/concurrent/futures/_base.py"
, line 390, in __get_result raise self._exception File "/home/site/wwwroot/.python_packages/lib/site-packages/google/cloud/pubsub_v1/publisher/_batch/thread.py"
, line 275, in _commit response = self._client.api.publish( File "/home/site/wwwroot/.python_packages/lib/site-packages/google/pubsub_v1/services/publisher/client.py"
, line 621, in publish response = rpc(request, retry=retry, timeout=timeout, metadata=metadata,) File "/home/site/wwwroot/.python_packages/lib/site-packages/google/api_core/gapic_v1/method.py"
, line 154, in __call__ return wrapped_func(*args, **kwargs) File "/home/site/wwwroot/.python_packages/lib/site-packages/google/api_core/retry.py"
, line 283, in retry_wrapped_func return retry_target( File "/home/site/wwwroot/.python_packages/lib/site-packages/google/api_core/retry.py"
, line 205, in retry_target raise exceptions.RetryError(