0

我正在尝试为存储帐户构建备份系统。只是为了使示例尽可能清晰,这是我的存储帐户结构

storageA:
 |---container1
 |  |---Blob1.txt
 |---container2
 |  |---Blob2.txt
 |  |---Blob3.txt
 |  |---Blob4.txt
 |---container3
   |---Blob5.txt

我有一个脚本可以遍历容器和 blob,并将相同的结构复制到另一个存储帐户。脚本如下。

from typing import Container
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
source_container_name = 'newblob' # Name of container which has blob to be copied





# Create client
client = BlobServiceClient.from_connection_string(connection_string) 


client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow() + timedelta(hours=4) # Token valid for 4 hours
    )

    
    
    print(container['name'], container['metadata'])
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        print(blob.name)
        print("==========================")
    # # ============================= TARGET =======================================

    # Target Client
    target_connection_string = ''
    target_account_key = ''
    source_container_name = source_container_name
    target_blob_name = blob.name
    target_destination_blob = container['name'] + today
    print(target_destination_blob)
    
    # Create target client
    target_client = BlobServiceClient.from_connection_string(target_connection_string)
    container = ContainerClient.from_connection_string(target_connection_string, target_destination_blob)
    try:
        container_client = target_client.create_container(target_destination_blob)
    # Create new blob and start copy operation.
    except:
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
    # print(source_blob.url)

此脚本制作容器和 blob 的完整副本,没有任何错误。但是当我去我的target存储时,我可以看到我有相同的容器,容器 1 和 3 它们有正确的 blob,但容器 2 只有 2 个 blob,无论我是否尝试将新文件上传到源存储并运行我的脚本,但新文件永远不会被复制。

谁能帮我理解这个问题?非常感谢

更新:经过一些调试,我发现了一些有趣的东西。在我的代码块中,我放置了一些打印语句来跟踪发生的循环,特别是在复制 blob 时。

这是我的代码的更新版本,重现:

from typing import Container
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied

# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow() + timedelta(hours=4) # Token valid for 4 hours
    )

    
    
    print(container['name'], container['metadata'])
    print("==========================")
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        print(blob.name)
        
# # ============================= TARGET =======================================

# Target Client

    target_connection_string = ''
    target_account_key = ''
    source_container_name = container['name']
    target_blob_name = blob.name
    target_destination_blob = container['name'] + today
    print(target_destination_blob)

    # Create target client
    target_client = BlobServiceClient.from_connection_string(target_connection_string)
    container = ContainerClient.from_connection_string(target_connection_string, target_destination_blob)
    try:
        container_client = target_client.create_container(target_destination_blob)
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
        print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
    except:
        # Create new blob and start copy operation.
        new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
        new_blob.start_copy_from_url(source_blob.url)
        print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        # print(source_blob.url)

现在,当我运行代码时,我得到以下输出:

==========================
blob1 {}
lastfile.txt
blob12021-09-22
COPY TO
EXCEPT: saving blob lastfile.txt into blob12021-09-22 
==========================
blob2 {}
lastfile.txt
lastupdate.txt
newFile.txt
blob22021-09-22
COPY TO
EXCEPT: saving blob newFile.txt into blob22021-09-22 
==========================
blob3 {}
lastupdate.txt
blob32021-09-22
COPY TO
EXCEPT: saving blob lastupdate.txt into blob32021-09-22 

如我所见,整个循环仅复制列表的最后一个文件。这是我对多循环感到困惑的地方。请任何人解释我做错了什么,以及如何使循环针对每个文件并将其复制到新存储?非常感谢您为我提供的任何帮助

4

2 回答 2

0

半解决方案:感谢大家的支持和帮助。我终于解决了blob备份过程的所有问题。

这是代码

from typing import Container
from azure.cosmosdb.table.tableservice import TableService,ListGenerator
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied


# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow() + timedelta(hours=4) # Token valid for 4 hours
    )

    
    print("==========================")
    print(container['name'], container['metadata'])
    
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        target_connection_string = ''
        target_account_key = ''
        source_container_name = container['name']
        target_blob_name = blob.name
        target_destination_blob = container['name'] + today
        print(target_blob_name)
        # print(blob.name)
        target_client = BlobServiceClient.from_connection_string(target_connection_string)
        try:
            container_client = target_client.create_container(target_destination_blob)
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        except:
            # Create new blob and start copy operation.
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"EXCEPT: saving blob {target_blob_name} into {target_destination_blob} ")
        

这是关于 blob 存储的部分。这将做的是循环遍历给定(源)存储帐户 -> 容器中的所有容器。然后将遍历遇到的每个容器内的每个 blob。在获取 blob 列表时,将自动将容器和 blob 复制到目标存储帐户。容器名称备份将形成如下:container name + today date这种方法已被考虑用于未来的自动化,因此如果您决定每周运行一次脚本,您将知道每个 Bach 所指的日期。

现在我正在尝试解决这种自动化的最后一点,从我正在移动到表的 blob 存储。代码结构如下。

from typing import Container
from azure.cosmosdb.table.tableservice import TableService,ListGenerator
from azure.storage.blob import BlobClient, BlobServiceClient, ContainerClient
from azure.storage.blob import ResourceTypes, AccountSasPermissions
from azure.storage.blob import generate_account_sas    
from datetime import *

today = str(datetime.now().date())
print(today)




#================================ SOURCE ===============================
# Source Client
connection_string = '' # The connection string for the source container
account_key = '' # The account key for the source container
# source_container_name = 'newblob' # Name of container which has blob to be copied
table_service_out = TableService(account_name='', account_key='')
table_service_in = TableService(account_name='', account_key='')

# Create client
client = BlobServiceClient.from_connection_string(connection_string) 




client = BlobServiceClient.from_connection_string(connection_string)
all_containers = client.list_containers(include_metadata=True)
for container in all_containers:
    # Create sas token for blob
    sas_token = generate_account_sas(
        account_name = client.account_name,
        account_key = account_key, 
        resource_types = ResourceTypes(object=True, container=True),
        permission= AccountSasPermissions(read=True,list=True),
        # start = datetime.now(),
        expiry = datetime.utcnow() + timedelta(hours=4) # Token valid for 4 hours
    )

    
    print("==========================")
    print(container['name'], container['metadata'])
    
    # print("==========================")
    container_client = client.get_container_client(container.name)
    # print(container_client)
    blobs_list = container_client.list_blobs()
    for blob in blobs_list:
        # Create blob client for source blob
        source_blob = BlobClient(
        client.url,
        container_name = container['name'],
        blob_name = blob.name,
        credential = sas_token
    )
        target_connection_string = ''
        target_account_key = ''
        source_container_name = container['name']
        target_blob_name = blob.name
        target_destination_blob = container['name'] + today
        print(target_blob_name)
        # print(blob.name)
        target_client = BlobServiceClient.from_connection_string(target_connection_string)
        try:
            container_client = target_client.create_container(target_destination_blob)
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"TRY: saving blob {target_blob_name} into {target_destination_blob} ")
        except:
            # Create new blob and start copy operation.
            new_blob = target_client.get_blob_client(target_destination_blob, target_blob_name)
            new_blob.start_copy_from_url(source_blob.url)
            print("COPY TO")
            print(f"EXCEPT: saving blob {target_blob_name} into {target_destination_blob} ")
        

#query 100 items per request, in case of consuming too much menory load all data in one time
query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_or_replace_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(table_name=tb.name, fail_on_exist=False)
    #first query
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,data,table_service_out,target_connection_string,query_size)

最后一点与表格有关,但如果我第一次运行此代码,它工作得很好,如果我第二次运行它,它会抛出此消息

Client-Request-ID= Retry policy did not allow for a retry: Server-Timestamp=Wed, 22 Sep 2021 14:47:38 GMT, Server-Request-ID=, HTTP status code=409, Exception=Conflict{"odata.error":{"code":"TableAlreadyExists","message":{"lang":"en-US","value":"The table specified already exists.\nRequestId:\nTime:2021-09-22T14:47:38.7583927Z"}}}

我希望在脚本运行时检查表是否存在,不要创建它,如果有任何更新就更新它。

对此有任何帮助或建议吗?

于 2021-09-22T14:59:18.493 回答
0

首先,代码看起来不错,看起来当容器有多个文件时会出现问题。因此,如果容器中有多个文件,则获取文件的数量并循环运行它们以上传每个文件。

确保你有正确的function.json:

下面是在单个存储帐户中复制 blob 的示例。

{
"scriptFile": "__init__.py",
  
"bindings": [
    {
      "type": "blob",
      "direction": "in",
      "name": "blob_client",
      "path": "source/source.txt",
      "connection": "STORAGE_CONN_STR"
    },
    {
      "type": "blob",
      "direction": "out",
      "name": "output_file_dest",
      "path": "target/target.csv",
      "connection": "STORAGE_CONN_STR"
    }
  ]
}

将以下代码的功能应用到您的代码中,以便在您在第一个存储帐户中上传后立即复制 blob。

import logging

import azure.functions as func

from azure.storage.blob import BlobClient,BlobServiceClient

def main():
logging.info(' start Python Main function processed a request.')

#CONNECTION STRING
blob_service_client = BlobServiceClient.from_connection_string(CONN_STR)

# MAP SOURCE FILE
blob_client = blob_service_client.get_blob_client(container="source", blob="source.txt")

#SOURCE CONTENTS
content=  blob_client.download_blob().content_as_text()
    
# WRITE HEADER TO A OUT PUTFILE
output_file_dest = blob_service_client.get_blob_client(container="target", blob="target.csv")

#INITIALIZE OUTPUT               
output_str = ""

#STORE COULMN HEADERS
data= list()
    
data.append(list(["column1", "column2", "column3", "column4"]))

output_str += ('"' + '","'.join(data[0]) + '"\n')

output_file_dest.upload_blob(output_str,overwrite=True)
logging.info(' END OF FILE UPLOAD')

if __name__ == "__main__":
main()
于 2021-09-22T11:42:33.420 回答