Spark 2.0.1写入错误:引起:java.util.NoSuchElementException

我试图将情绪值附加到每个消息,我已经下载了所有stanford核心jar文件作为依赖项:

import sqlContext.implicits._ import com.databricks.spark.corenlp.functions._ import org.apache.spark.sql.functions._ val version = "3.6.0" val model = s"stanford-corenlp-$version-models-english" // val jars = sc.listJars if (!jars.exists(jar => jar.contains(model))) { import scala.sys.process._ s"wget http://repo1.maven.org/maven2/edu/stanford/nlp/stanford- corenlp/$version/$model.jar -O /tmp/$model.jar".!! sc.addJar(s"/tmp/$model.jar")} val all_messages = spark.read.parquet("/home/ubuntu/messDS.parquet") case class AllMessSent (user_id: Int, sent_at: java.sql.Timestamp, message: String) val messDS = all_messages.as[AllMess] 

到目前为止,一切都很好,因为我可以执行计算并保存DS

 case class AllMessSentiment = (user_id: Int, sent_at: java.sql.Timestamp, message: String, sentiment: Int) val output = messDS .select('user_id,'message,'sent_at, sentiment('message).as('sentiment)).as[AllMessSentiment]) import java.util output.write.parquet("/home/ubuntu/AllMessSent.parquet") 

我可以输出结果为: output.show(truncate = false)我可以看到情绪分数但是当写入csv或者镶木地板时,错误如下所示,是否有人知道如何解决它?:

 org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 9, localhost): java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.AbstractIterable.head(Iterable.scala:54) at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:163) at com.databricks.spark.corenlp.functions$$anonfun$sentiment$1.apply(functions.scala:158) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.NoSuchElementException at java.util.ArrayList$Itr.next(ArrayList.java:854) at java.util.ArrayList$Itr.next(ArrayList.java:854) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43) at scala.collection.IterableLike$class.head(IterableLike.scala:107) at scala.collection.AbstractIterable.head(Iterable.scala:54) at com.databricks.spark.corenlp. functions$$anonfun$sentiment$1.apply(functions.scala:163) at com.databricks.spark.corenlp. functions$$anonfun$sentiment$1.apply(functions.scala:158) at org.apache.spark.sql .catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) at org.apache.spark.sql.execution .BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution. WholeStageCodegenExec$$anonfun$8$$anon$1 .hasNext(WholeStageCodegenExec.scala:370) at org.apache.spark.sql.execution.datasources. DefaultWriterContainer$$anonfun$writeRows$1 .apply$mcV$sp(WriterContainer.scala:253) at org.apache.spark.sql.execution.datasources .DefaultWriterContainer$$anonfun$writeRows$1.apply(WriterContainer.scala:252) at org.apache.spark.sql.execution.datasources .DefaultWriterContainer$$anonfun$writeRows$1. apply(WriterContainer.scala:252) at org.apache.spark.util.Utils$ .tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1325) at org.apache.spark.sql.execution.datasources .DefaultWriterContainer.writeRows(WriterContainer.scala:258) ... 8 more 

当所有消息被分成句子并清除特殊字符和空格时,我能够运行算法。