1

我有以下简单代码导致有关缓存的错误:

trips_in = sc.textFile("trip_data.csv")
trips = trips_in.map(lambda l: l.split(",")).map(lambda x: parseTrip(x)).cache()

trips.count()

该函数parseTrip()获取字符串列表并创建并返回一个类 Trip:

class Trip:
  def __init__(self, id, duration):
    self.id = id
    self.duration = duration

我在操作后立即收到错误消息count()。但是,如果我删除cache()第二行末尾的 一切正常。根据错误,问题是无法腌制类 Trip:

PicklingError: Can't pickle __main__.Trip: attribute lookup __main__.Trip failed

那么我怎样才能让它变得可腌制(如果它是一个实际的词)?请注意,我使用的是 Databricks 笔记本,因此我无法为类定义制作单独的 .py 以使其可腌制。

4

1 回答 1

1

环境不会影响答案 - 如果您想使用自定义类,它必须在集群中的每个节点上都是可导入的。

  • 对于单个模块,您可以轻松地使用指向GitHub GistSparkContext.addPyFile的 URL (或其他支持的格式:“ HDFS 中的文件(或其他 Hadoop 支持的文件系统),或 HTTP、HTTPS 或 FTP URI ”)

    • 创建一个要点。
    • 单击原始链接并复制 URL。
    • 在您的笔记本电话中:

      sc.addPyFile(raw_gist_url)
      
  • 对于复杂的依赖关系,您分发 egg 文件。

    • 使用. _ _setuptools

      目录结构:

      .
      ├── setup.py
      └── trip
          └── __init__.py
      

      示例设置文件:

      #!/usr/bin/env python
      
      from setuptools import setup
      
      setup(name='trip',
            version='0.0.1',
            description='Trip',
            author='Jane Doe',
            author_email='jane@example.com',
            url='https://example.com',
            packages=['trip'],)
      
    • 创建鸡蛋文件:

      python setup.py bdist_egg
      

      这将创建dist带有trip-0.0.1-pyX.Y.egg文件的目录

    • 转到 Databricks 仪表板 -> 新建 -> 库并从 dist 目录上传 egg 文件:

      在此处输入图像描述

    • 将库附加到要使用的集群。

  • 最后,如果您想要的只是一种记录类型,则namedtuple无需任何额外步骤即可使用:

    from collections import namedtuple
    
    Trip = namedtuple('Trip', ['id', 'duration'])
    
于 2017-02-27T19:48:31.473 回答