如何使用Spark DataFrame计算Cassandra表的汇总统计量?
我试图得到一些Cassandra / SPARK数据的最小值,最大值,但我需要用JAVA来做。
import org.apache.spark.sql.DataFrame; import static org.apache.spark.sql.functions.*; DataFrame df = sqlContext.read() .format("org.apache.spark.sql.cassandra") .option("table", "someTable") .option("keyspace", "someKeyspace") .load(); df.groupBy(col("keyColumn")) .agg(min("valueColumn"), max("valueColumn"), avg("valueColumn")) .show();
编辑显示工作版本:确保“围绕someTable和someKeyspace
只需将您的数据导入为DataFrame
并应用所需的聚合:
import org.apache.spark.sql.DataFrame; import static org.apache.spark.sql.functions.*; DataFrame df = sqlContext.read() .format("org.apache.spark.sql.cassandra") .option("table", someTable) .option("keyspace", someKeyspace) .load(); df.groupBy(col("keyColumn")) .agg(min("valueColumn"), max("valueColumn"), avg("valueColumn")) .show();
其中someTable
和someKeyspace
分别存储表名和键空间。
我建议查看https://github.com/datastax/spark-cassandra-connector/tree/master/spark-cassandra-connector-demos
其中包含Scala和等效Java中的演示。
您还可以查看: http : //spark.apache.org/documentation.html
您可以在Scala,Java和Python版本之间进行大量示例。
我几乎100%肯定在这些链接之间,你会找到你正在寻找的东西。
如果您之后遇到任何问题,请随时更新您的问题并提出更具体的错误/问题。
一般来说,
编译scala文件:$ scalac Main.scala
从Main.class文件创建您的java源文件:$ javap Main
有关更多信息,请访问以下url: http : //alvinalexander.com/scala/scala-class-to-decompiled-java-source-code-classes
- 此语言级别不支持Lambda表达式
- 强制分区存储在特定执行程序中
- 由于java.io.NotSerializableException:org.apache.spark.SparkContext,Spark作业失败
- 如何在不使用collect函数的情况下有效地将rdd转换为list
- 如何更新火花流中的广播变量?
- sparkContext JavaSparkContext SQLContext SparkSession之间的区别?
- Apache Spark – 添加两列
- Spark序列化和Java序列化有什么区别?
- Spark 2.0.1写入错误:引起:java.util.NoSuchElementException