Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法转换为阴影

我在尝试将数据写入我的Cassandra数据库时遇到错误。

我在这里得到的:1)Dictionary.java

package com.chatSparkConnactionTest; import java.io.Serializable; public class Dictionary implements Serializable{ private String value_id; private String d_name; private String d_value; public Dictionary(){} public Dictionary (String value_id, String d_name, String d_value) { this.setValue_id(value_id); this.setD_name(d_name); this.setD_value(d_value); } public String getValue_id() { return value_id; } public void setValue_id(String value_id) { this.value_id = value_id; } public String getD_name() { return d_name; } public void setD_name(String d_name) { this.d_name = d_name; } public String getD_value() { return d_value; } public void setD_value(String d_value) { this.d_value = d_value; } } 

我的主要课程:

 package com.chatSparkConnactionTest; import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; import java.io.Serializable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import com.datastax.spark.connector.japi.CassandraJavaUtil; import com.datastax.spark.connector.japi.CassandraRow; import com.datastax.spark.connector.japi.SparkContextJavaFunctions; import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD; import com.datastax.driver.core.Session; import com.datastax.spark.connector.cql.CassandraConnector; import com.datastax.spark.connector.japi.CassandraRow; import com.google.common.base.Objects; import org.apache.avro.data.Json; import org.apache.hadoop.util.StringUtils; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; //import org.apache.spark.sql.SchemaRDD; //import org.apache.spark.sql.cassandra.CassandraSQLContext; import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Date; import java.util.List; import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow; import com.datastax.spark.connector.japi.CassandraRow; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonNode; import org.apache.spark.api.java.function.Function; public class JavaDemoRDDWrite implements Serializable { private static final long serialVersionUID = 1L; public static void main(String[] args) { SparkConf conf = new SparkConf(). setAppName("chat"). setMaster("local"). set("spark.cassandra.connection.host", "127.0.0.1"); JavaSparkContext sc = new JavaSparkContext(conf); List dictionary = Arrays.asList( new Dictionary("7", "n1", "v1"), new Dictionary("8", "n2", "v2"), new Dictionary("9", "n3", "v3") ); for (Dictionary dictionaryRow : dictionary) { System.out.println("id: " + dictionaryRow.getValue_id()); System.out.println("name: " + dictionaryRow.getD_name()); System.out.println("value: " + dictionaryRow.getD_value()); } JavaRDD rdd = sc.parallelize(dictionary); System.out.println("Total rdd rows: " + rdd.collect().size()); javaFunctions(rdd) .writerBuilder("chat", "dictionary", mapToRow(Dictionary.class)) .saveToCassandra(); }; } 

的pom.xml

  4.0.0 chat_connaction_test ChatSparkConnectionTest 0.0.1-SNAPSHOT   com.datastax.cassandra cassandra-driver-core 3.1.0   org.apache.spark spark-core_2.11 2.0.0   org.apache.spark spark-sql_2.11 2.0.0   com.datastax.spark spark-cassandra-connector_2.11 2.0.0-M3   org.apache.spark spark-streaming_2.10 2.0.0    

这是错误文本:

 java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 16/10/11 17:43:03 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 16/10/11 17:43:03 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job 16/10/11 17:43:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/10/11 17:43:03 INFO TaskSchedulerImpl: Cancelling stage 1 16/10/11 17:43:03 INFO DAGScheduler: ResultStage 1 (runJob at RDDFunctions.scala:37) failed in 0.274 s 16/10/11 17:43:03 INFO DAGScheduler: Job 1 failed: runJob at RDDFunctions.scala:37, took 0.291592 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904) at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37) at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61) at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486) at com.chatSparkConnactionTest.JavaDemoRDDWrite.main(JavaDemoRDDWrite.java:69) Caused by: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158) at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111) at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110) at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140) at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110) at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) at org.apache.spark.scheduler.Task.run(Task.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source) 

问题是,即使出现错误,系统也会将RDD中的第一个值插入到数据库表中。 其他2行只是被忽略了。

但是,以防万一,这是我的Cassandra表:

 CREATE TABLE dictionary ( value_id text, d_value text, d_name text, PRIMARY KEY (value_id, d_name) ) WITH comment = 'dictionary values' AND CLUSTERING ORDER BY (d_name ASC); 

更新了pom.xml:

  4.0.0 chat_connaction_test ChatSparkConnectionTest 0.0.1-SNAPSHOT   org.apache.spark spark-core_2.11 2.0.0   org.apache.spark spark-sql_2.11 2.0.0   com.datastax.spark spark-cassandra-connector_2.11 2.0.0-M3   org.apache.spark spark-streaming_2.10 2.0.0    

从pom.xml文件中删除“cassandra-driver-core”依赖项,因为它导致了问题。 您只需要“spark-cassandra-connector”依赖项以及spark依赖项,以便与Cassandra DB进行交互。

  com.datastax.cassandra cassandra-driver-core 3.1.0