Tag: apache spark sql

带有DataFrame API的Apache Spark MLlib在createDataFrame()或read()时会产生java.net.URISyntaxException .csv(…)

在一个独立的应用程序(运行在java8,Windows 10上,使用spark-xxx_2.11:2.0.0作为jar依赖项)下一代码会出错: /* this: */ Dataset logData = spark_session.createDataFrame(Arrays.asList( new LabeledPoint(1.0, Vectors.dense(4.9,3,1.4,0.2)), new LabeledPoint(1.0, Vectors.dense(4.7,3.2,1.3,0.2)) ), LabeledPoint.class); /* or this: */ /* logFile: “C:\files\project\file.csv”, “C:\\files\\project\\file.csv”, “C:/files/project/file.csv”, “file:/C:/files/project/file.csv”, “file:///C:/files/project/file.csv”, “/file.csv” */ Dataset logData = spark_session.read().csv(logFile); 例外: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:/files/project/spark-warehouse at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.(Path.java:172) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:114) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.(SessionCatalog.scala:89) at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95) […]

多节点hadoop集群中的Apache Spark Sql问题

嗨,我使用Spark java apis从hive获取数据。 此代码适用于hadoop单节点集群。 但是当我尝试在hadoop多节点集群中使用它时,它会抛出错误 org.apache.spark.SparkException: Detected yarn-cluster mode, but isn’t running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. 注意:我已将master作为本地用于单节点,而yarn-cluster用于多节点。 这是我的java代码 SparkConf sparkConf = new SparkConf().setAppName(“Hive”).setMaster(“yarn-cluster”); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext sqlContext = new HiveContext(ctx.sc()); org.apache.spark.sql.Row[] result = sqlContext.sql(“Select * from Tablename”).collect(); 此外,我试图将master更改为本地,现在它抛出未知的主机名exception。 任何人都可以帮助我吗? 更新 错误日志 […]

Spark – Java UDF返回多列

我正在使用sparkSql 1.6.2(Java API),我必须处理以下DataFrame,其中包含2列中的值列表: ID AttributeName AttributeValue 0 [an1,an2,an3] [av1,av2,av3] 1 [bn1,bn2] [bv1,bv2] 所需的表是: ID AttributeName AttributeValue 0 an1 av1 0 an2 av2 0 an3 av3 1 bn1 bv1 1 bn2 bv2 我想我必须结合使用explode函数和自定义UDF函数。 我找到了以下资源: 在Spark SQL表中爆炸(转置?)多个列 如何使用JAVA在Spark DataFrame上调用UDF? 我可以成功运行一个读取两列的示例,并返回列中前两个字符串的串联 UDF2 combineUDF = new UDF2<Seq, Seq, String>() { public String call(final Seq col1, final Seq col2) throws […]

Spark SQL:镶嵌错误的嵌套类

我似乎无法写一个JavaRDD ,其中T是一个说法, Person类。 我把它定义为 public class Person implements Serializable { private static final long serialVersionUID = 1L; private String name; private String age; private Address address; …. Address : public class Address implements Serializable { private static final long serialVersionUID = 1L; private String City; private String Block; … 然后我像这样创建一个JavaRDD : JavaRDD people = sc.textFile(“/user/johndoe/spark/data/people.txt”).map(new […]

基于第二个Dataframe的DataFrame过滤

使用Spark SQL,我有两个dataframe,它们是从一个创建的,例如: df = sqlContext.createDataFrame(…); df1 = df.filter(“value = ‘abc'”); //[path, value] df2 = df.filter(“value = ‘qwe'”); //[path, value] 我想过滤df1,如果它的’path’的一部分是df2中的任何路径。 因此,如果df1具有路径’a / b / c / d / e’的行,我会发现在df2中是否是路径为’a / b / c’的行。 在SQL中应该是这样的 SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2) 其中udf是用户定义的函数,用于缩短df1的原始路径。 天真的解决方案是使用JOIN然后过滤结果,但它很慢,因为df1和df2每行都超过10mil。 我也试过以下代码,但首先我必须从df2创建广播变量 static Broadcast bdf; bdf = sc.broadcast(df2); //variable […]

如何在GroupBy操作后从spark DataFrame列中收集字符串列表?

这里描述的解决方案(通过zero323)非常接近我想要的两个曲折: 我如何用Java做到这一点? 如果列有一个字符串列表而不是一个字符串,并且我想在GroupBy(其他一些列)之后将所有这些列表收集到一个列表中,该怎么办? 我正在使用Spark 1.6并试图使用 org.apache.spark.sql.functions.collect_list(Column col) ,如该问题的解决方案中所述,但得到以下错误 线程“main”中的exceptionorg.apache.spark.sql.AnalysisException:undefined function collect_list; at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry.scala:65)at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry $$ anonfun $ 2.apply(FunctionRegistry。 scala:65)在scala.Option.getOrElse(Option.scala:121)

处理Spark Scala中的微秒

我使用Scala将PostgreSQL表作为dataframe导入spark。 数据框看起来像 user_id | log_dt ——–| ——- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.931652 我正在转换此dataframe,使log_dt的数据格式为yyyy-MM-dd hh:mm:ss.SSSSSS 。 为了实现这一点,我使用以下代码使用unix_timestamp函数将log_dt转换为时间戳格式。 val tablereader1=tablereader1Df.withColumn(“log_dt”,unix_timestamp(tablereader1Df(“log_dt”),”yyyy-MM-dd hh:mm:ss.SSSSSS”).cast(“timestamp”)) 当我打印使用命令tablereader1.show()打印tablereader1dataframe时,我得到以下结果 user_id | log_dt ——–| ——- 96 | 2004-10-19 10:23:54.0 1020 | 2017-01-12 12:12:14.0 如何将微秒保留为时间戳的一部分? 任何建议表示赞赏。

Spark – 使用数据框语法进行HAVING分组?

在没有sql / hiveContext的Spark中使用groupby-having的语法是什么? 我知道我能做到 DataFrame df = some_df df.registreTempTable(“df”); df1 = sqlContext.sql(“SELECT * FROM df GROUP BY col1 HAVING some stuff”) 但是我怎么用这样的语法来做呢 df = df.select(df.col(“*”)).groupBy(df.col(“col1”)).having(“some stuff”) ? 这个.having()似乎不存在。

SparkSQL并在Java中的DataFrame上爆炸

有没有一种简单的方法如何在SparkSQL DataFrame上使用数组列explode ? 它在Scala中相对简单,但是这个函数似乎在Java中不可用(如javadoc中所述)。 一个选项是使用SQLContext.sql(…)并在查询中explodefunction,但我正在寻找更好,更清洁的方式。 DataFrame是从镶木地板文件加载的。

Spark SQL – 如何将DataFrame写入文本文件?

我正在使用Spark SQL来阅读镶木地板和书写镶木地板文件。 但在某些情况下,我需要将DataFrame写为文本文件而不是Json或Parquet。 是否支持任何默认方法或我必须将该DataFrame转换为RDD然后使用saveAsTextFile()方法?