1

df.head()加载弹性搜索数据后我可以做得很好。但是在我做之后withColumn,我不能做df.headdf.show()

我不知道发生了什么,withColumn如果我创建相同的代码df2 = sqlContext.createDataFrame( [(1, "a", 23.0), (3, "B", -23.0)], ("x1", "x2", "x3"))但不能使用来自 elasticsearch 的数据,则相同的代码可以正常工作。

我正在使用来自https://www.elastic.co/guide/en/kibana/current/tutorial-load-dataset.html的官方演示数据(accounts.zip)

import findspark
findspark.init('/usr/local/spark')
from pyspark import SparkContext, SparkConf

import pyspark

if 'sc' in locals():
    sc.stop()
conf = pyspark.SparkConf()
conf.set("spark.driver.allowMultipleContexts", "true")
conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.0.jar")
conf.set("es.index.auto.create", "true")
conf.set("es.nodes.discovery", "true")
#conf.set("es.nodes.wan.only", "true")
sc = pyspark.SparkContext(conf=conf)

sqlContext = pyspark.SQLContext(sc)

df = sqlContext.read.option("es.resource", "relay-foods").format("org.elasticsearch.spark.sql").load()

# ==============> 
# at this state I can print df fine
df.show() # works


from dateutil import parser
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def order_period(order_date):
    order_date = parser.parse(order_date)
    result = order_date.strftime('%Y-%m')

    return result
udf_order_period = udf(order_period, StringType())
df_new2 = df.withColumn("OrderPeriod", udf_order_period("OrderDate"))
# ==================> 
df_new2.head() # this results in error

错误消息是

Py4JJavaError: An error occurred while calling o152.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): java.lang.IllegalArgumentException: The value (Buffer(_jsonparsefailure, _split_type_failure)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3258)
    at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3255)
    at org.apache.spark.sql.Dataset$$anonfun$53.apply(Dataset.scala:3365)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3364)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3255)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: The value (Buffer(_jsonparsefailure, _split_type_failure)) of the type (scala.collection.convert.Wrappers.JListWrapper) cannot be converted to the string type
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:290)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StringConverter$.toCatalystImpl(CatalystTypeConverters.scala:285)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:103)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:396)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:60)
    at org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1$$anonfun$apply$3.apply(ExistingRDD.scala:57)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1073)
    at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089)
    at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1127)
    at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
    at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$2.writeIteratorToStream(PythonUDFRunner.scala:50)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:345)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1945)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:194)

编辑

这是 df.show() 和 df.dtypes

+--------------------+--------+--------+----------+-------+----------+-----+------------+------+-----------------+--------------------+--------------------+------+----+
|          @timestamp|@version|CommonId| OrderDate|OrderId|PickupDate|PupId|TotalCharges|UserId|             host|             message|                path|sentAt|tags|
+--------------------+--------+--------+----------+-------+----------+-----+------------+------+-----------------+--------------------+--------------------+------+----+
|2019-01-28 22:02:...|       1|   2N3WF|03/12/2009|   1937|04/12/2009|    5|    $147.24 | 39588|devserver-MS-7758|1937,03/12/2009,3...|/tmp/relay-foods.csv|  null|null|


[('@timestamp', 'timestamp'),
 ('@version', 'string'),
 ('CommonId', 'string'),
 ('OrderDate', 'string'),
 ('OrderId', 'string'),
 ('PickupDate', 'string'),
 ('PupId', 'string'),
 ('TotalCharges', 'string'),
 ('UserId', 'string'),
 ('host', 'string'),
 ('message', 'string'),
 ('path', 'string'),
 ('sentAt', 'timestamp'),
 ('tags', 'string')]
4

0 回答 0