0

我只是试图运行有状态流的示例代码,但它失败并出现错误。无法理解为什么会发生。

Cloudera vm 5.13.3 上的 Spark 2.3 和 3.6 python

运行选项:

--master local[*] --queue PyCharmSpark pyspark-shell 

我的代码是:

from pyspark import SparkConf, SQLContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.sql.functions import *

conf = (SparkConf()
       .setAppName("ch2_dstreams_t1.py"))

spark = SparkSession.builder \
     .appName(" ") \
     .config(conf=conf) \
     .getOrCreate()


# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
ssc.checkpoint("checkpoint")

# define the update function
def updatetotalcount(currentcount, countstate):
    if countstate is None:
       countstate = 0
    return sum(currentcount, countstate)

# Create a DStream that will connect to hostname:port, like localhost:9999
lines = ssc.socketTextStream("localhost", 7777)

# Split each line into words
words = lines.flatMap(lambda line: line.split(" "))

# Count each word in each batch
pairs = words.map(lambda word: (word, 1)) \

wordCounts = pairs.reduceByKey(lambda x, y: x + y)\

totalCounts = wordCounts.updateStateByKey(updatetotalcount)

totalCounts.pprint()

# Start the computation
ssc.start()

# Wait for the computation to terminate
ssc.awaitTermination()

流正在工作并正在侦听套接字,但是当我尝试在终端应用程序中输入行时失败

一个错误:

To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
-------------------------------------------
Time: 2018-08-27 11:40:10
-------------------------------------------

[Stage 0:>                                                          (0 + 1) / 1]18/08/27 11:40:15 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:15 WARN storage.BlockManager: Block input-0-1535395215600 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395215800 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216000 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216200 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:16 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/08/27 11:40:16 WARN storage.BlockManager: Block input-0-1535395216400 replicated to only 0 peer(s) instead of 1 peers
18/08/27 11:40:20 WARN storage.BlockManager: Putting block rdd_30_0 failed due to exception org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given
.
18/08/27 11:40:20 WARN storage.BlockManager: Block rdd_30_0 could not be removed as it was not found on disk or in memory
18/08/27 11:40:20 ERROR executor.Executor: Exception in task 0.0 in stage 8.0 (TID 22)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/08/27 11:40:20 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 8.0 (TID 22, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main
    process()
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/opt/cloudera/parcels/SPARK2-2.3.0.cloudera2-1.cdh5.13.3.p0.316101/lib/spark2/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/rdd.py", line 1979, in <lambda>
  File "/opt/cloudera/parcels/SPARK2/lib/spark2/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 594, in <lambda>
  File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
    return sum(currentcount, countstate)
TypeError: _() takes 1 positional argument but 2 were given

    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:298)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:438)
    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRunner.scala:421)
    at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:252)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:216)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

可能根本原因在于我的函数updatetotalcount,当我评论转换updateStateByKey(updatetotalcount)时,它会在输出中打印出结果:

File "/home/cloudera/PycharmProjects/spark_streaming/ch2_dstreams_t1.py", line 27, in updatetotalcount
        return sum(currentcount, countstate)
    TypeError: _() takes 1 positional argument but 2 were given

请建议,为什么我会收到此错误?

4

1 回答 1

0

问题是 - 使用

from pyspark.sql.functions import *

覆盖python函数。如果你想使用 pyspark.sql.functions 需要解析命名空间,例如

import pyspark.sql.functions as f
于 2018-08-28T07:10:03.617 回答