我正在尝试从 GCP 存储桶下载加密文件和密钥,然后解密文件并将其加载回存储桶。所以我构建了这个 DataFlow 管道,如下所示:
class downloadFile(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'test.csv.gpg'
self.destination_file_name = "/tmp/test.csv.gpg"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
这里我使用self.destination_file_name = "/tmp/test.csv.gpg"是因为我从其他人那里了解到 DataFlow 作业将在 Linux VM 上运行,因此将文件下载到这个 /tmp/ 路径是完全安全的。
class downloadKey(beam.DoFn):
def __init__(self):
self.bucket_name = 'bucket_name'
self.source_blob_name = 'privateKey.txt'
self.destination_file_name = "/tmp/privateKey.txt"
def process(self, element):
from google.cloud import storage
storage_client = storage.Client()
bucket = storage_client.bucket(self.bucket_name)
blob = bucket.blob(self.source_blob_name)
blob.download_to_filename(self.destination_file_name)
基本上,两个下载DoFns具有相同的结构。下载文件和密钥后,密钥将被导入到运行 VM 的 DataFlow:
class importKey(beam.DoFn):
def process(self, element):
import subprocess
subprocess.call(['gpg', '--import','/tmp/privateKey.txt'])
然后解密DoFn:
class decryption(beam.DoFn):
def process(self, element, *args, **kwargs):
import subprocess
subprocess.call(['gpg', '-d', '/tmp/test.csv.gpg > test.csv'])
# load file back to bucket
bucket_name = 'bucket_name'
source_file_name = '/tmp/test.csv'
destination_blob_name = "clearText.csv"
storage_client = storage.Client()
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(destination_blob_name)
blob.upload_from_filename(source_file_name)
所以这个解密 DoFn 将调用带有子进程的 gpg 命令来解密文件。
最后是管道本身:
dummyMessage = {"projectID":"fakeProjectID",
"bucketID":"fakeBucketID"}
setp= (
p
| 'Create Sample'
>> beam.Create([dummyMessage["projectID"]])
|"testDecrypt" >> beam.ParDo(downloadLookupFile())
|"testDecrypt2" >> beam.ParDo(downloadKey())
|"testDecrypt3" >> beam.ParDo(importKey())
|"testDecrypt4" >> beam.ParDo(decryption())
)
这里我只是创建一个虚拟消息来调用管道,稍后将替换为真实消息。
当我运行管道时,一切正常,我可以看到 DataFlow 中已创建作业,并且显示作业状态为成功。但在存储桶中我看不到解密的文件。
我在要调试的代码中添加了几个打印语句,似乎在 downloadFile() 和 downloadKey() 方法中,从未达到 process(),这意味着没有处理过任何文件。任何人都可以分享一些有关如何在 DoFn 中访问 GCS 存储桶的知识吗?我不确定代码的哪一部分是磨损的,对我来说一切都很好。
任何帮助将不胜感激。
