如何使用java创建一个简单的spark graphframe?

基本上我是一名java开发人员,现在我有机会参与Spark并且我经历了Spark api的基础知识,比如SparkConfig,SparkContaxt,RDD,SQLContaxt,DataFrame,DataSet然后我能够执行一些简单的简单转换RDD,SQL ….但是当我尝试使用java训练一些示例graphframe应用程序时,我可以成功并且我经历了很多youtube教程,论坛和stackoverflow线程但没有我没有找到任何直接建议当我尝试为GraphFrame类创建一个对象时,我实际上遇到了这个问题,我也下载了接收jar( graphframes-0.2.0-spark2.0-s_2.11.jar ),但现在仍然面临问题我想放我的分析直到我到达的地方由于Spark的新事物我无法进一步移动所以如果有人帮助我它对所有人都非常有帮助。 提前致谢。 我面临的例外是构造函数GraphFrame(DataFrame,DataFrame)未定义

import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.storage.StorageLevel; import org.graphframes.GraphFrame; import com.fasterxml.jackson.core.JsonParseException; import com.fasterxml.jackson.databind.JsonMappingException; public class SparkJavaGraphFrameOne { public static void main(String[] args) throws JsonParseException, JsonMappingException, IOException{ SparkConf conf = new SparkConf().setAppName("test").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc); JavaRDD verRow = sc.parallelize(Arrays.asList(RowFactory.create(1,"A"),RowFactory.create(2,"B"))); JavaRDD edgRow = sc.parallelize(Arrays.asList(RowFactory.create(1,2,"Edge"))); List verFields = new ArrayList(); verFields.add(DataTypes.createStructField("id",DataTypes.IntegerType, true)); verFields.add(DataTypes.createStructField("name",DataTypes.StringType, true)); List EdgFields = new ArrayList(); EdgFields.add(DataTypes.createStructField("fromId",DataTypes.IntegerType, true)); EdgFields.add(DataTypes.createStructField("toId",DataTypes.IntegerType, true)); EdgFields.add(DataTypes.createStructField("name",DataTypes.StringType, true)); StructType verSchema = DataTypes.createStructType(verFields); StructType edgSchema = DataTypes.createStructType(EdgFields); DataFrame verDF = sqlContext.createDataFrame(verRow, verSchema); DataFrame edgDF = sqlContext.createDataFrame(edgRow, edgSchema); GraphFrame g = new GraphFrame(verDF,edgDF); g.vertices().show(); g.edges().show(); g.persist(StorageLevel.MEMORY_AND_DISK()); } } 

我使用Spark 2.0.0和GraphFrame 0.2.0在java中编写了示例程序。 该程序基于http://graphframes.github.io/quick-start.html#start-using-graphframes上提供的示例程序。 希望这可以帮助。

的pom.xml

  4.0.0 com.abaghel.examples.spark spark-graphframe 1.0.0-SNAPSHOT   org.apache.spark spark-core_2.11 2.0.0   org.apache.spark spark-graphx_2.11 2.0.0   org.apache.spark spark-sql_2.11 2.0.0   graphframes graphframes 0.2.0-spark2.0-s_2.11      SparkPackagesRepo http://dl.bintray.com/spark-packages/maven      org.apache.maven.plugins maven-compiler-plugin 3.1  1.8 1.8      

SparkGraphFrameSample.java

 package com.abaghel.examples.spark.graphframe; import java.util.ArrayList; import java.util.List; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.graphframes.GraphFrame; import org.graphframes.lib.PageRank; /** * Sample application shows how to create a GraphFrame, query it, and run the PageRank algorithm. * * @author abaghel * */ public class SparkGraphFrameSample { public static void main(String[] args) { SparkSession spark = SparkSession.builder() .appName("SparkGraphFrameSample") .config("spark.sql.warehouse.dir", "/file:C:/temp") .master("local[2]") .getOrCreate(); //Create a Vertex DataFrame with unique ID column "id" List uList = new ArrayList() { { add(new User("a", "Alice", 34)); add(new User("b", "Bob", 36)); add(new User("c", "Charlie", 30)); } }; Dataset verDF = spark.createDataFrame(uList, User.class); //Create an Edge DataFrame with "src" and "dst" columns List rList = new ArrayList() { { add(new Relation("a", "b", "friend")); add(new Relation("b", "c", "follow")); add(new Relation("c", "b", "follow")); } }; Dataset edgDF = spark.createDataFrame(rList, Relation.class); //Create a GraphFrame GraphFrame gFrame = new GraphFrame(verDF, edgDF); //Get in-degree of each vertex. gFrame.inDegrees().show(); //Count the number of "follow" connections in the graph. long count = gFrame.edges().filter("relationship = 'follow'").count(); //Run PageRank algorithm, and show results. PageRank pRank = gFrame.pageRank().resetProbability(0.01).maxIter(5); pRank.run().vertices().select("id", "pagerank").show(); //stop spark.stop(); } } 

User.java

 package com.abaghel.examples.spark.graphframe; /** * User class * * @author abaghel * */ public class User { private String id; private String name; private int age; public User(){ } public User(String id, String name, int age) { super(); this.id = id; this.name = name; this.age = age; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } } 

Relation.java

 package com.abaghel.examples.spark.graphframe; /** * Relation class * * @author abaghel * */ public class Relation { private String src; private String dst; private String relationship; public Relation(){ } public Relation(String src, String dst, String relationship) { super(); this.src = src; this.dst = dst; this.relationship = relationship; } public String getSrc() { return src; } public void setSrc(String src) { this.src = src; } public String getDst() { return dst; } public void setDst(String dst) { this.dst = dst; } public String getRelationship() { return relationship; } public void setRelationship(String relationship) { this.relationship = relationship; } } 

控制台输出

 16/08/27 22:34:45 INFO DAGScheduler: Job 10 finished: show at SparkGraphFrameSample.java:56, took 0.938910 s 16/08/27 22:34:45 INFO CodeGenerator: Code generated in 6.599005 ms +---+-------------------+ | id| pagerank| +---+-------------------+ | a| 0.01| | b|0.08763274109799998| | c| 0.077926810699| +---+-------------------+ 

我不知道你是否能够解决你的问题。 我刚看到你的问题。 我认为在线程“main”java.lang.NoClassDefFoundError:com / typesafe / scalalogging / slf4j / LazyLogging中获取Exception,需要将以下jar放在pom.xml中

   com.typesafe.scala-logging scala-logging-slf4j_2.10 2.1.2  

我遇到了同样的问题,通过添加这个jar,我能够解决这个问题。

我能够在0.5.0-spark2.1-s_2.11中复制问题(连续运行)并在0.4.0-spark2.1-s_2.11中正常工作