Tag: apache spark

数据集-API模拟JavaSparkContext.wholeTextFiles

我们可以调用JavaSparkContext.wholeTextFiles并获取JavaPairRDD ,其中第一个String是文件名,第二个String是整个文件内容。 在Dataset API中是否有类似的方法,或者我所能做的就是将文件加载到JavaPairRDD然后转换为Dataset(这是有效的,但我正在寻找非RDD解决方案)。

Spark – 可以在JAVA中将MultiMap转换为DataFrame

我正在尝试将数十亿数据值的MultiMap转换为Spark DataFrame以运行计算,然后将结果写入cassandra表。 我从以下cassandra查询和循环生成多图。 我很乐意接受建议,如果有更好的方法来获取和操纵数据到DataFrame,就像我在循环中一样。 代码更新了答案: //Build ResultSet from cassandra query for data manipulation. Statement stmt = new SimpleStatement(“SELECT \”Power\”,\”Bandwidth\”,\”Start_Frequency\” FROM \”SB1000_49552019\”.\”Measured_Value\”;”); //Statement stmt = new SimpleStatement(“SELECT power, bandwidth, start_frequency FROM model.reports;”); stmt.setFetchSize(1000); ResultSet results = session.execute(stmt); // Get the Variables from each Row of Cassandra Data Multimap data = LinkedListMultimap.create(); for (Row row : results){ […]

SPARK到HBase写作

我的SPARK计划的流程如下: 驱动程序 – >创建Hbase连接 – >广播Hbase句柄现在从执行程序,我们获取此句柄并尝试写入hbase 在Driver程序中,我正在创建HBase conf对象和Connection Object,然后通过JavaSPARK Context广播它,如下所示: SparkConf sparkConf = JobConfigHelper.getSparkConfig(); Configuration conf = new Configuration(); UserGroupInformation.setConfiguration(conf); jsc = new JavaStreamingContext(sparkConf, Durations.milliseconds(Long.parseLong(batchDuration))); Configuration hconf=HBaseConfiguration.create(); hconf.addResource(new Path(“/etc/hbase/conf/core-site.xml”)); hconf.addResource(new Path(“/etc/hbase/conf/hbase-site.xml”)); UserGroupInformation.setConfiguration(hconf); JavaSparkContext js = jsc.sparkContext(); Connection connection = ConnectionFactory.createConnection(hconf); connectionbroadcast=js.broadcast(connection); 执行器的内部call()方法, Table table = connectionbroadcast.getValue().getTable(TableName.valueOf(“gfttsdgn:FRESHHBaseRushi”)) ; Put p = new Put(Bytes.toBytes(“row1”)); p.add(Bytes.toBytes(“c1”), Bytes.toBytes(“output”), Bytes.toBytes(“rohan”)); […]

使用Apache Spark从Amazon S3解析文件

我正在使用Apache Spark,我必须从Amazon S3解析文件。 从Amazon S3路径获取文件时,如何知道文件扩展名?

带有DataFrame API的Apache Spark MLlib在createDataFrame()或read()时会产生java.net.URISyntaxException .csv(…)

在一个独立的应用程序(运行在java8,Windows 10上,使用spark-xxx_2.11:2.0.0作为jar依赖项)下一代码会出错: /* this: */ Dataset logData = spark_session.createDataFrame(Arrays.asList( new LabeledPoint(1.0, Vectors.dense(4.9,3,1.4,0.2)), new LabeledPoint(1.0, Vectors.dense(4.7,3.2,1.3,0.2)) ), LabeledPoint.class); /* or this: */ /* logFile: “C:\files\project\file.csv”, “C:\\files\\project\\file.csv”, “C:/files/project/file.csv”, “file:/C:/files/project/file.csv”, “file:///C:/files/project/file.csv”, “/file.csv” */ Dataset logData = spark_session.read().csv(logFile); 例外: java.lang.IllegalArgumentException: java.net.URISyntaxException: Relative path in absolute URI: file:C:/files/project/spark-warehouse at org.apache.hadoop.fs.Path.initialize(Path.java:206) at org.apache.hadoop.fs.Path.(Path.java:172) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.makeQualifiedPath(SessionCatalog.scala:114) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.createDatabase(SessionCatalog.scala:145) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.(SessionCatalog.scala:89) at org.apache.spark.sql.internal.SessionState.catalog$lzycompute(SessionState.scala:95) […]

多节点hadoop集群中的Apache Spark Sql问题

嗨,我使用Spark java apis从hive获取数据。 此代码适用于hadoop单节点集群。 但是当我尝试在hadoop多节点集群中使用它时,它会抛出错误 org.apache.spark.SparkException: Detected yarn-cluster mode, but isn’t running on a cluster. Deployment to YARN is not supported directly by SparkContext. Please use spark-submit. 注意:我已将master作为本地用于单节点,而yarn-cluster用于多节点。 这是我的java代码 SparkConf sparkConf = new SparkConf().setAppName(“Hive”).setMaster(“yarn-cluster”); JavaSparkContext ctx = new JavaSparkContext(sparkConf); HiveContext sqlContext = new HiveContext(ctx.sc()); org.apache.spark.sql.Row[] result = sqlContext.sql(“Select * from Tablename”).collect(); 此外,我试图将master更改为本地,现在它抛出未知的主机名exception。 任何人都可以帮助我吗? 更新 错误日志 […]

Spark – Java UDF返回多列

我正在使用sparkSql 1.6.2(Java API),我必须处理以下DataFrame,其中包含2列中的值列表: ID AttributeName AttributeValue 0 [an1,an2,an3] [av1,av2,av3] 1 [bn1,bn2] [bv1,bv2] 所需的表是: ID AttributeName AttributeValue 0 an1 av1 0 an2 av2 0 an3 av3 1 bn1 bv1 1 bn2 bv2 我想我必须结合使用explode函数和自定义UDF函数。 我找到了以下资源: 在Spark SQL表中爆炸(转置?)多个列 如何使用JAVA在Spark DataFrame上调用UDF? 我可以成功运行一个读取两列的示例,并返回列中前两个字符串的串联 UDF2 combineUDF = new UDF2<Seq, Seq, String>() { public String call(final Seq col1, final Seq col2) throws […]

LSH Spark永远停留在approxSimilarityJoin()函数

我正在尝试实现LSH spark来为每个用户找到包含50000行和每行约5000个特征的非常大的数据集的最近邻居。 这是与此相关的代码。 MinHashLSH mh = new MinHashLSH().setNumHashTables(3).setInputCol(“features”) .setOutputCol(“hashes”); MinHashLSHModel model = mh.fit(dataset); Dataset approxSimilarityJoin = model .approxSimilarityJoin(dataset, dataset, config.getJaccardLimit(), “JaccardDistance”); approxSimilarityJoin.show(); 这项工作停留在approxSimilarityJoin()函数,永远不会超越它。 请让我知道如何解决它。

如何在spark中映射JavaPairRDD的键?

我是Spark的sc.wholeTextFiles(path); ,我使用过sc.wholeTextFiles(path); 要读取所有文件,该函数返回JavaPairRDD ,RDD的键是每个文件的完整路径,但我想要的是将密钥更改为文件名。 他们的东西是mapValues(func)但是用于键。

将JavaPairRDD转换为JavaRDD

我使用ElasticSearch-Hadoop Library从ElsticSearch获取数据。 JavaPairRDD<String, Map> esRDD = JavaEsSpark.esRDD(sc); 现在我有了JavaPairRDD。 我想在这个RDD上使用来自MLLib的随机森林。 所以我将它转换为JavaPairRDD.toRDD(esRDD)这将给我RDD。 使用RDD我再次转换为JavaRDD JavaRDD[] splits = (JavaRDD.fromRDD(JavaPairRDD.toRDD(esRDD), esRDD.classTag())).randomSplit(new double[] { 0.5, 0.5 }); JavaRDD trainingData = splits[0]; JavaRDD testData = splits[1]; 我想将trainingData和TestData传递给Random Forest算法,但它在编译时给出了转换exception。 类型不匹配:无法从JavaRDD [Tuple2 [String,Map [String,Object]]] []转换为JavaRDD [LabeledPoint] [] 添加方括号,小于和大于符号不起作用 任何人都可以建议我正确的铸造方式。 我是Spark Datastrucutres的新手。