0

我需要遍历磁盘上的大量文件列表,打开每个文件并解析它。我有一个带有文件名的文件,我需要遍历这些文件名。

我将此函数传递给map()

%python

def parse(filename):
  try:
    tf = sc.textFile(filename)
    # run parsing code, produce text
    return text
  except:
      return None

当我尝试运行以下命令时:

parsed_contents = filenames.map(parse)
parsed_contents.top(5)

我收到此错误:

例外:您似乎正试图从广播变量、操作或转换中引用 SparkContext。SparkContext 只能在驱动程序上使用,不能在它在工作人员上运行的代码中使用。有关详细信息,请参阅 SPARK-5063。

如果我单独运行 try 块中的代码并指定文件名,则该代码将起作用。

我应该如何遍历指定的文件列表,提取它们的内容?

4

1 回答 1

1

当您对 rdd 执行转换时(在这种情况下是您的 call filnames.map(parse)),驱动程序分配工作人员来处理您的 rdd 的每个分区。因此,您的地图调用本质上是发送给工作人员以应用于您的 rdd。在您提供的代码中,您基本上sparkContext是从工作人员上运行的代码调用实例,这会导致错误。需要在驱动程序进程上进行文件读取。

sc.textFile接受逗号分隔的字符串,指定要读取的文件名。因此您可以执行以下操作:

filenames = sc.textFile("filesToRead.txt")

parsed_contents = sc.textFile(",".join(filenames.collect()))

parsed_contents.top(5)

您还可以指定模式作为sc.textFile方法的输入。例如,

parsed_contents = sc.textFile("file[0-5].txt")

UPDATE 用于过滤磁盘上存在的文件。

def check_exists(name):
    try:
        open(name, 'r')
        True
    except:
        False

existingFiles = filenames.filter(check_exists)
于 2017-02-06T17:57:19.163 回答