0

我正在尝试使用 SparkNLP 构建基本管道:

documentAssembler = DocumentAssembler() \
    .setInputCol("S81000") \
    .setOutputCol("document")

sentenceDetector = SentenceDetectorDLApproach() \
    .setInputCols(["document"]) \
    .setOutputCol("sentences") \
    .setEpochsNumber(100)

nlp_pipeline = Pipeline(stages=[documentAssembler, sentenceDetector])

use_model = nlp_pipeline.fit(sample_text)

use_output = use_model.transform(sample_text)

display(use_output)

这是 sample_text df 的架构:

sample_text:pyspark.sql.dataframe.DataFrame
S81000:string

我确保没有空值。

当我尝试显示时,出现以下错误:

SparkException: Failed to execute user defined function(HasSimpleAnnotate$$Lambda$8171/1165655549: (array<array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>>) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
Caused by: TensorFlowException: file is too short to be an sstable
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2519.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2519.0 (TID 78791) (10.139.64.21 executor 8): org.apache.spark.SparkException: Failed to execute user defined function(HasSimpleAnnotate$$Lambda$8171/1165655549: (array<array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>>) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.tensorflow.exceptions.TensorFlowException: file is too short to be an sstable
     [[{{node save/RestoreV2}}]]
    at org.tensorflow.internal.c_api.AbstractTF_Status.throwExceptionIfNotOK(AbstractTF_Status.java:101)
    at org.tensorflow.Session.run(Session.java:691)
    at org.tensorflow.Session.access$100(Session.java:72)
    at org.tensorflow.Session$Runner.runHelper(Session.java:381)
    at org.tensorflow.Session$Runner.run(Session.java:329)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper.getSession(TensorflowWrapper.scala:90)
    at com.johnsnowlabs.ml.tensorflow.TensorflowSentenceDetectorDL.predict(TensorflowSentenceDetectorDL.scala:215)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$processText$1(SentenceDetectorDLModel.scala:329)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.Iterator$ConcatIterator.foreach(Iterator.scala:179)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$annotate$2(SentenceDetectorDLModel.scala:364)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$annotate$2$adapted(SentenceDetectorDLModel.scala:362)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.annotate(SentenceDetectorDLModel.scala:362)
    at com.johnsnowlabs.nlp.HasSimpleAnnotate.$anonfun$dfAnnotate$1(HasSimpleAnnotate.scala:42)
    ... 24 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)
    at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)
    at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:289)
    at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:299)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)
    at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)
    at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)
    at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)
    at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:512)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:511)
    at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:399)
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:59)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3018)
    at org.apache.spark.sql.Dataset.$anonfun$collectResult$1(Dataset.scala:3009)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3802)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3800)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3008)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:194)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:57)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:411)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:839)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:375)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:897)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:775)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:854)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:652)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:817)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:224)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:775)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:211)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$13(DriverLocal.scala:544)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:53)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:279)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:271)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:53)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:521)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(HasSimpleAnnotate$$Lambda$8171/1165655549: (array<array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>>) => array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:757)
    at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
    at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:91)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more
Caused by: org.tensorflow.exceptions.TensorFlowException: file is too short to be an sstable
     [[{{node save/RestoreV2}}]]
    at org.tensorflow.internal.c_api.AbstractTF_Status.throwExceptionIfNotOK(AbstractTF_Status.java:101)
    at org.tensorflow.Session.run(Session.java:691)
    at org.tensorflow.Session.access$100(Session.java:72)
    at org.tensorflow.Session$Runner.runHelper(Session.java:381)
    at org.tensorflow.Session$Runner.run(Session.java:329)
    at com.johnsnowlabs.ml.tensorflow.TensorflowWrapper.getSession(TensorflowWrapper.scala:90)
    at com.johnsnowlabs.ml.tensorflow.TensorflowSentenceDetectorDL.predict(TensorflowSentenceDetectorDL.scala:215)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$processText$1(SentenceDetectorDLModel.scala:329)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:512)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.Iterator$ConcatIterator.foreach(Iterator.scala:179)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$annotate$2(SentenceDetectorDLModel.scala:364)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.$anonfun$annotate$2$adapted(SentenceDetectorDLModel.scala:362)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at com.johnsnowlabs.nlp.annotators.sentence_detector_dl.SentenceDetectorDLModel.annotate(SentenceDetectorDLModel.scala:362)
    at com.johnsnowlabs.nlp.HasSimpleAnnotate.$anonfun$dfAnnotate$1(HasSimpleAnnotate.scala:42)
    ... 24 more

这是在 Azure 数据块上。我确保按照 github 页面上的 SparkNLP 文档中的描述配置集群。当管道仅包含文档创建者时,不会发生该错误。我打算再添加一个 USE 层(因此命名约定),但错误从句子检测层开始。有任何想法吗?

4

0 回答 0