1

我有以下函数应该对所有数据湖资产进行递归,但是 TypeError: <generator object deep_ls at 0x7fc512538e40> has the wrong type - (<class 'str'>,) is expected.在调用以下行时files = dbutils.fs.ls(deep_ls(root, max_depth=20))它给了我以下错误,这意味着递归地获取文件。您对如何解决此问题有任何想法或建议:

def deep_ls(path: str, max_depth=1):


    # Hidden files may be filtered out
    condition = None if keep_hidden else lambda x: x.name[0] not in ('_', '.')

    # List all files in path and apply sorting rules
    li = sorted(filter(condition, dbutils.fs.ls(path)),reverse=reverse, key=key)

    # Return all files (not ending with '/')
    for x in li:
        if x.path[-1] != '/':
            yield x

    # If the max_depth has not been reached, start
    # listing files and folders in subdirectories
    if max_depth > 1:
        for x in li:
            if x.path[-1] != '/':
                continue
            for y in deep_ls(x.path, max_depth - 1, reverse, key, keep_hidden):
                yield y

    # If max_depth has been reached,
    # return the folders
    else:
        for x in li:
            if x.path[-1] == '/':
                yield x
            

def key(val):
    try:
        return int(list(filter(bool, val.path.split('/'))).pop().split('=').pop())
    except ValueError as e:
        return -1

这是尝试调用此函数的完整数据块笔记本代码:

    # Example Implementation
# ----------------------

# Library Imports

import os
import requests
import json
import jmespath
import pandas as pd

from pprint import pprint
from pyapacheatlas.auth import ServicePrincipalAuthentication
from pyapacheatlas.core import PurviewClient, AtlasEntity, AtlasProcess, TypeCategory
from pyapacheatlas.core.typedef import *

from pyspark.sql import *
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Authentication

# Service Principal with "Purview Data Source Administrator" permissions on Purview
tenant_id = "enter-tenant-id"
client_id = "enter-client-id"
client_secret = "enter-client-secret"
resource_url = "https://purview.azure.net"
data_catalog_name = "demo-purview"
adls_sas_token = 'enter-sas-token-here'

# Retrieve authentication objects
azuread_access_token = azuread_auth(tenant_id, client_id, client_secret, resource_url)
purview_client = purview_auth(tenant_id, client_id, client_secret, data_catalog_name)

# Asset details

# Asset parameters
storage_account = "adls"
container = "lake"

# The root level path we want to begin populating assets from
top_path = f"/azure_storage_account#{storage_account}.core.windows.net/azure_datalake_gen2_service#{storage_account}.dfs.core.windows.net/azure_datalake_gen2_filesystem#{container}"

# Retrieve full list of assets
assets_all = list(get_all_adls_assets(top_path, data_catalog_name, azuread_access_token, max_depth=20))


# Grab SAS token
#adls_sas_token = dbutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Configure Spark to access from DFS endpoint
root = 'https://%s@%s.dfs.core.windows.net/' % (container, storage_account)

spark.conf.set('fs.azure.sas.%s.%s.dfs.core.windows.net' % (container, storage_account), adls_sas_token)
print('Remote adls root path: ' + root)

# Get ADLS files recursively
files = dbutils.fs.ls(deep_ls(root, max_depth=20))
files_df = convertfiles2df(files) # Note this is a Pandas DataFrame

# Generate asset-aligned names
files_df['asset'] = files_df['name'].str.replace(r'\d+', '{N}')

# Append schema row-wise from Purview
files_df['schema'] = files_df.apply(lambda row: get_adls_asset_schema(assets_all, row['asset'], purview_client), axis=1)

# Display Asset DataFrame
display(files_df)
4

1 回答 1

0

dbutils.fs.ls函数只接受一个参数- 带有路径的字符串,但您将生成器传递给它。您可以将其转换为以下内容:

for file in deep_ls(root, max_depth=20):
  res = dbutils.fs.ls(file)
  do_something_with_result...
于 2021-05-11T06:19:08.317 回答