4

我正在通过 PySpark 探索 Spark Streaming,当我尝试将transform函数与take.

我可以成功地使用sortByDStreamtransformpprint结果。

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

但是如果我使用take相同的模式并尝试pprint它:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

工作失败了

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

您可以在此处的笔记本中查看完整的代码和输出。

我究竟做错了什么?

4

1 回答 1

5

Function you pass to transform should transform from RDD to RDD. If you use an action, like take, you have to convert the result back to RDD:

sc: SparkContext = ...

author_counts_sorted_dstream.transform(
  lambda rdd: sc.parallelize(rdd.take(5))
)

In contrast RDD.sortBy used is a transformation (returns an RDD) so there is no need for further parallelization.

On a side note following function:

lambda foo: foo \
    .sortBy(lambda x:x[0].lower()) \
    .sortBy(lambda x:x[1], ascending=False)

doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:

lambda x: (x[0].lower(), -x[1])
于 2017-01-05T12:40:29.710 回答