0

希望我能得到一些关于这个问题的提示。

我编写了以下代码,以将表从一个存储备份到另一个存储。

query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_or_replace_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()
print(tbs_out)

for tb in tbs_out:
    table = tb.name + today
    print(target_connection_string)
    #create table with same name in storage2
    table_service_in.create_table(table_name=table, fail_on_exist=False)
    #first query
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(table,data,table_service_out,table_service_in,query_size)

这应该是一个简单的脚本,用于遍历表中的项目并将它们复制到另一个存储帐户中。我已经在一个 azure 函数中运行了这个确切的代码,它工作得很好。

今天我尝试针对多个存储帐户运行它,运行了一段时间就好了,但随后它停止并抛出此错误:

Traceback (most recent call last):
  File "/Users/users/Desktop/AzCopy/blob.py", line 205, in <module>
    queryAndSaveAllDataBySize(table,data,table_service_out,table_service_in,query_size)
  File "/Users/users/Desktop/AzCopy/blob.py", line 191, in queryAndSaveAllDataBySize
    data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/tableservice.py", line 738, in query_entities
    resp = self._query_entities(*args, **kwargs)
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/tableservice.py", line 801, in _query_entities
    return self._perform_request(request, _convert_json_response_to_entities,
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/tableservice.py", line 1106, in _perform_request
    return super(TableService, self)._perform_request(request, parser, parser_args, operation_context)
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/common/storageclient.py", line 430, in _perform_request
    raise ex
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/common/storageclient.py", line 358, in _perform_request
    raise ex
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/common/storageclient.py", line 343, in _perform_request
    _http_error_handler(
  File "/Users/users/miniforge3/lib/python3.9/site-packages/azure/cosmosdb/table/common/_error.py", line 115, in _http_error_handler
    raise ex
azure.common.AzureMissingResourceHttpError: Not Found
{"odata.error":{"code":"TableNotFound","message":{"lang":"en-US","value":"The table specified does not exist.\nRequestId:bbdb\nTime:2021-09-29T16:42:17.6078186Z"}}}

我不明白为什么会发生这种情况。因为它所要做的就是从一侧复制到另一侧。

请如果有人可以帮助解决这个问题,我完全筋疲力尽,无法再思考了:(

更新:再次阅读我的代码,我认为我在这里有这个限制。

#query 100 items per request, in case of consuming too much menory load all data in one time
query_size = 100

当我检查我的存储表时,实际上我只有 100 行。但是我找不到任何地方可以设置查询大小以一次性加载所有数据。

据我了解,是在我达到 query_size 限制后,我需要寻找下一个x_ms_continuation令牌来获取下一批。

我现在有这个代码:

query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_or_replace_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()
print(tbs_out)

for tb in tbs_out:
    table = tb.name + today
    print(target_connection_string)
    #create table with same name in storage2
    table_service_in.create_table(table_name=table, fail_on_exist=False)

    #first query
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(table,data,table_service_out,table_service_in,query_size)
    

根据微软文档,marker应该检查是否有任何延续令牌,如果是真的,它应该重新运行代码。但这在我的情况下不会发生,一旦我到达query_size代码就会抛出错误。

任何人都可以帮忙吗?

4

1 回答 1

0

尝试将for 块替换为以下内容,以便在存储 2 中创建具有相同名称的表:

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(tb.name)
    #first query 
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,data,table_service_out,table_service_in,query_size)

以下是完整的示例代码:

from azure.cosmosdb.table.tableservice import TableService,ListGenerator

table_service_out = TableService(account_name='', account_key='')
table_service_in = TableService(account_name='', account_key='')

#query 100 items per request, in case of consuming too much menory load all data in one time
query_size = 100

#save data to storage2 and check if there is lefted data in current table,if yes recurrence
def queryAndSaveAllDataBySize(tb_name,resp_data:ListGenerator ,table_out:TableService,table_in:TableService,query_size:int):
    for item in resp_data:
        #remove etag and Timestamp appended by table service
        del item.etag
        del item.Timestamp
        print("instet data:" + str(item) + "into table:"+ tb_name)
        table_in.insert_entity(tb_name,item)
    if resp_data.next_marker:
        data = table_out.query_entities(table_name=tb_name,num_results=query_size,marker=resp_data.next_marker)
        queryAndSaveAllDataBySize(tb_name,data,table_out,table_in,query_size)


tbs_out = table_service_out.list_tables()

for tb in tbs_out:
    #create table with same name in storage2
    table_service_in.create_table(tb.name)
    #first query 
    data = table_service_out.query_entities(tb.name,num_results=query_size)
    queryAndSaveAllDataBySize(tb.name,data,table_service_out,table_service_in,query_size)

到这里这应该可以正常工作了,如果您仍然对 query_size 有问题,请获取表的全部数据,获取 100 条记录的列表,如下所示: 我们可以按照以下方式而不是设置 query_size = 100,这将给出我们的 100 条记录:

tasks = table_service.query_entities('tasktable')
lst = list(tasks)
print(lst[99])

还要检查来自azure-sdk-for-python的以下示例

def sample_query_entities_values(self):
    from azure.data.tables import TableClient
    from azure.core.exceptions import HttpResponseError

    print("Entities with 25 < Value < 50")
    # [START query_entities]
    with TableClient.from_connection_string(self.connection_string, self.table_name) as table_client:
        try:
            parameters = {u"lower": 25, u"upper": 50}
            name_filter = u"Value gt @lower and Value lt @upper"
            queried_entities = table_client.query_entities(
                query_filter=name_filter, select=[u"Value"], parameters=parameters
            )

            for entity_chosen in queried_entities:
                print(entity_chosen)

        except HttpResponseError as e:
            print(e.message)
    # [END query_entities]
于 2021-09-30T11:52:36.607 回答