为什么Apache Spark在客户端上执行filter

作为新手上的apache引发了一些关于在Spark上获取Cassandra数据的问题。

List dates = Arrays.asList("2015-01-21","2015-01-22"); CassandraJavaRDD aRDD = CassandraJavaUtil.javaFunctions(sc). cassandraTable("testing", "cf_text",CassandraJavaUtil.mapRowTo(A.class, colMap)). where("Id=? and date IN ?","Open",dates); 

此查询不过滤cassandra服务器上的数据。 虽然这个java语句正在执行它的内存并最终抛出spark java.lang.OutOfMemoryErrorexception。 查询应该过滤掉cassandra服务器而不是客户端上的数据,如https://github.com/datastax/spark-cassandra-connector/blob/master/doc/3_selection.md所述 。

虽然我正在使用cassandra cqlsh上的filter执行查询,但它执行正常但执行查询而没有filter(where子句)正在给出预期的超时。 因此很明显,火花并没有在客户端应用filter。

 SparkConf conf = new SparkConf(); conf.setAppName("Test"); conf.setMaster("local[8]"); conf.set("spark.cassandra.connection.host", "192.168.1.15") 

为什么在客户端应用filter以及如何改进它以在服务器端应用filter。

我们如何在Windows平台上的cassandra集群上配置spark集群?

没有使用Cassandra和Spark,从阅读你提供的部分(感谢你),我看到:

注意:虽然ALLOW FILTERING子句隐式添加到生成的CQL查询中,但Cassandra引擎当前不允许所有谓词。 这种限制将在未来的Cassandra版本中得到解决。 目前,ALLOW FILTERING适用于由二级索引或聚类列索引的列。

我很确定(但尚未测试)不支持“IN”谓词:请参阅https://github.com/datastax/spark-cassandra-connector/blob/24fbe6a10e083ddc3f770d1f52c07dfefeb7f59a/spark-cassandra-connector-java /src/main/java/com/datastax/spark/connector/japi/rdd/CassandraJavaRDD.java#L80

因此,您可以尝试将where子句限制为Id(假设存在二级索引)并对日期范围使用spark过滤。

我建议将表格作为DataFrame而不是RDD来阅读。 这些可用于Spark 1.3及更高版本。 然后,您可以将CQL查询指定为如下字符串:

 CassandraSQLContext sqlContext = new CassandraSQLContext(sc); String query = "SELECT * FROM testing.cf_text where id='Open' and date IN ('2015-01-21','2015-01-22')"; DataFrame resultsFrame = sqlContext.sql(query); System.out.println(resultsFrame.count()); 

所以尝试一下,看看它是否适合你。

在DataFrame中获得数据后,可以在其上运行Spark SQL操作。 如果您想要RDD中的数据,您可以将DataFrame转换为RDD。

在SparkConfing中设置spark.cassandra.input.split.size_in_mb解决了这个问题。

 conf = new SparkConf(); conf.setAppName("Test"); conf.setMaster("local[4]"); conf.set("spark.cassandra.connection.host", "192.168.1.15"). set("spark.executor.memory", "2g"). set("spark.cassandra.input.split.size_in_mb", "67108864"); 

Spark-cassnadra-connector读取spark.cassandra.input.split.size_in_mb的错误值,因此在SparkConf中覆盖此值可以完成工作。 现在IN子句也很好用。