0

嗨,我有一个 Spark DataFrame,我使用 SQL 上下文进行了一些转换,例如,在所有数据中只选择两列。

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

但现在我想将这个 sqlcontext 转换为 pandas 数据框,我正在使用

pddf = df_oraAS.toPandas()

但是输出在这里停止,我需要重新启动 IDE(spyder)

6/01/22 16:04:01 INFO DAGScheduler: Got job 0 (toPandas at <stdin>:1) with 3 output partitions
16/01/22 16:04:01 INFO DAGScheduler: Final stage: ResultStage 0 (toPandas at <stdin>:1)
16/01/22 16:04:01 INFO DAGScheduler: Parents of final stage: List()
16/01/22 16:04:01 INFO DAGScheduler: Missing parents: List()
16/01/22 16:04:01 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1), which has no missing parents
16/01/22 16:04:01 INFO SparkContext: Starting job: toPandas at <stdin>:1
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KB, free 9.4 KB)
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.9 KB, free 14.3 KB)
16/01/22 16:04:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50877 (size: 4.9 KB, free: 511.1 MB)
16/01/22 16:04:01 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/22 16:04:01 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1)
16/01/22 16:04:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
16/01/22 16:04:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 119523958 bytes)
16/01/22 16:04:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 117876401 bytes)
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:200)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:252)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:247)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:317)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:63)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

我做错了什么?谢谢

编辑:更完整:我从 Oracle 数据库 (cx_Oracle) 加载日期并将数据放入 pandas 数据框中

df_ora = pd.read_sql('SELECT* FROM DEC_CLIENTES', con=connection) 

接下来我创建了一个 sparkContext 来操作数据框

sqlContext = SQLContext(sc)

df_oraAS = sqlContext.createDataFrame(df_ora)

df_oraAS.registerTempTable("df_oraAS")

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

我想再次从 sqlcontext 转换为 pandas 数据框

 pddf = df_oraAS.toPandas() 
4

2 回答 2

3

toPandas基本上collect是变相了。输出是本地 Pandas DataFrame。如果数据不适合驱动程序内存,它只会失败,因此您会看到错误。

于 2016-01-22T16:20:55.863 回答
0

您的 pd.read_sql 调用将整个数据库读入 pandas 数据帧。这是司机本地的。当您调用 createDataFrame 时,它​​会从您的 python pandas 数据帧创建一个 Spark DataFrame,这会导致非常大的任务大小(请参见下面的日志行):

16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.

即使您只选择 5 行,您实际上首先使用该 pd.read_sql 调用将整个数据库加载到内存中。如果您从 Oracle SQL 数据库读取数据,为什么不使用 spark JDBC 驱动程序,然后执行您的选择过滤器,然后调用 toPandas?

您的代码正在做的是将整个数据库读取到 pandas,写入 Spark,过滤并读回 Pandas。

于 2016-01-22T18:40:43.647 回答