Tag: apache spark

是否可以在Apache Spark中创建嵌套的RDD?

我试图在Spark中实现K-最近邻算法。 我想知道是否可以使用嵌套的RDD。 这将使我的生活更轻松。 请考虑以下代码段。 public static void main (String[] args){ //blah blah code JavaRDD temp1 = testData.map( new Function(){ public Double call(final Vector z) throws Exception{ JavaRDD temp2 = trainData.map( new Function() { public Double call(Vector vector) throws Exception { return (double) vector.length(); } } ); return (double)z.length(); } } ); } 目前我收到这个嵌套设置的错误(我可以在这里发布完整的日志)。 它是否允许在拳头位置? 谢谢

使用Java将spark RDD保存到本地文件系统

我有一个使用Spark生成的RDD。 现在,如果我将此RDD写入csv文件,我将获得一些方法,如“saveAsTextFile()”,它将csv文件输出到HDFS。 我想将文件写入我的本地文件系统,以便我的SSIS进程可以从系统中选择文件并将它们加载到数据库中。 我目前无法使用sqoop。 除了编写shell脚本之外,它是否可以在Java中实现。 需要任何清晰度,请告知。

如何使用Hive支持创建SparkSession(未找到“Hive类”)?

当我尝试运行此代码时,我收到此错误。 import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; public class App { public static void main(String[] args) throws Exception { String warehouseLocation = “file:” + System.getProperty(“user.dir”) + “spark-warehouse”; SparkSession spark = SparkSession .builder().master(“local”) .appName(“Java Spark Hive Example”) .config(“spark.sql.warehouse.dir”, warehouseLocation).enableHiveSupport() .getOrCreate(); String path = “/home/cloudera/Downloads/NetBeansProjects/sparksql1/src/test/Employee.json”; spark.sql(“CREATE TABLE IF NOT EXISTS src (key INT, value STRING)”); spark.sql(“LOAD DATA […]

使用Apache Spark和Java将CSV解析为DataFrame / DataSet

我是新来的火花,我想使用group-by&reduce从CSV中找到以下内容(使用一行): Department, Designation, costToCompany, State Sales, Trainee, 12000, UP Sales, Lead, 32000, AP Sales, Lead, 32000, LA Sales, Lead, 32000, TN Sales, Lead, 32000, AP Sales, Lead, 32000, TN Sales, Lead, 32000, LA Sales, Lead, 32000, LA Marketing, Associate, 18000, TN Marketing, Associate, 18000, TN HR, Manager, 58000, TN 我希望通过Department,Designation,State简化包含其他列和sum(costToCompany)和TotalEmployeeCount的组的CSV 应得到如下结果: Dept, Desg, state, empCount, […]

为什么启动StreamingContext失败并出现“IllegalArgumentException:要求失败:没有注册输出操作,所以无需执行”?

我正在尝试使用Twitter作为源执行Spark Streaming示例,如下所示: public static void main (String.. args) { SparkConf conf = new SparkConf().setAppName(“Spark_Streaming_Twitter”).setMaster(“local”); JavaSparkContext sc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(2)); JavaSQLContext sqlCtx = new JavaSQLContext(sc); String[] filters = new String[] {“soccer”}; JavaReceiverInputDStream receiverStream = TwitterUtils.createStream(jssc,filters); jssc.start(); jssc.awaitTermination(); } 但我得到以下例外 Exception in thread “main” java.lang.AssertionError: assertion failed: No output streams […]

如何在spark数据框中展平结构?

我有一个具有以下结构的dataframe: |– data: struct (nullable = true) | |– id: long (nullable = true) | |– keyNote: struct (nullable = true) | | |– key: string (nullable = true) | | |– note: string (nullable = true) | |– details: map (nullable = true) | | |– key: string | | |– value: string (valueContainsNull […]

TaskSchedulerImpl:初始作业未接受任何资源;

这是我想要做的。 我创建了两个DataStax企业集群节点,在此基础上我创建了一个java程序来获取一个表的计数(Cassandra数据库表)。 这个程序是在eclipse中构建的,它实际上来自一个Windows框。 在从Windows运行此程序时,它在运行时失败并出现以下错误: 初始工作没有接受任何资源; 检查群集UI以确保已注册工作并具有足够的内存 已经在这些集群上成功编译和运行相同的代码而没有任何问题。 可能是什么原因导致错误? 码: import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SchemaRDD; import org.apache.spark.sql.cassandra.CassandraSQLContext; import com.datastax.bdp.spark.DseSparkConfHelper; public class SparkProject { public static void main(String[] args) { SparkConf conf = DseSparkConfHelper.enrichSparkConf(new SparkConf()).setMaster(“spark://10.63.24.14X:7077”).setAppName(“DatastaxTests”).set(“spark.cassandra.connection.host”,”10.63.24.14x”).set(“spark.executor.memory”, “2048m”).set(“spark.driver.memory”, “1024m”).set(“spark.local.ip”,”10.63.24.14X”); JavaSparkContext sc = new JavaSparkContext(conf); CassandraSQLContext cassandraContext = new CassandraSQLContext(sc.sc()); SchemaRDD employees = cassandraContext.sql(“SELECT * FROM portware_ants.orders”); […]

如何将模型从ML Pipeline保存到S3或HDFS?

我正在努力保存ML Pipeline生产的数千种型号。 如此答案所示,模型可以保存如下: import java.io._ def saveModel(name: String, model: PipelineModel) = { val oos = new ObjectOutputStream(new FileOutputStream(s”/some/path/$name”)) oos.writeObject(model) oos.close } schools.zip(bySchoolArrayModels).foreach{ case (name, model) => saveModel(name, Model) } 我已经尝试使用s3://some/path/$name和/user/hadoop/some/path/$name因为我希望最终将模型保存到amazon s3,但它们都会失败,并显示路径不能是找到。 如何将模型保存到Amazon S3?

Spark Strutured Streaming自动将时间戳转换为本地时间

我有UTC和ISO8601的时间戳,但使用结构化流,它会自动转换为本地时间。 有没有办法阻止这种转换? 我想在UTC中使用它。 我正在从Kafka读取json数据,然后使用from_json Spark函数解析它们。 输入: {“Timestamp”:”2015-01-01T00:00:06.222Z”} 流: SparkSession .builder() .master(“local[*]”) .appName(“my-app”) .getOrCreate() .readStream() .format(“kafka”) … //some magic .writeStream() .format(“console”) .start() .awaitTermination(); 架构: StructType schema = DataTypes.createStructType(new StructField[] { DataTypes.createStructField(“Timestamp”, DataTypes.TimestampType, true),}); 输出: +——————–+ | Timestamp| +——————–+ |2015-01-01 01:00:…| |2015-01-01 01:00:…| +——————–+ 如您所见,小时数自行增加。 PS:我试着尝试使用from_utc_timestamp Spark函数,但没有运气。

与csv文件相比,将mysql表转换为spark数据集的速度非常慢

我在amazon s3中有csv文件,大小为62mb(114000行)。 我正在将其转换为spark数据集,并从中获取前500行。 代码如下; DataFrameReader df = new DataFrameReader(spark).format(“csv”).option(“header”, true); Dataset set=df.load(“s3n://”+this.accessId.replace(“\””, “”)+”:”+this.accessToken.replace(“\””, “”)+”@”+this.bucketName.replace(“\””, “”)+”/”+this.filePath.replace(“\””, “”)+””); set.take(500) 整个操作需要20到30秒。 现在我尝试相同但是使用csv我正在使用带有119 000行的mySQL表。 MySQL服务器在亚马逊ec2中。 代码如下; String url =”jdbc:mysql://”+this.hostName+”:3306/”+this.dataBaseName+”?user=”+this.userName+”&password=”+this.password; SparkSession spark=StartSpark.getSparkSession(); SQLContext sc = spark.sqlContext(); DataFrameReader df = new DataFrameReader(spark).format(“csv”).option(“header”, true); Dataset set = sc .read() .option(“url”, url) .option(“dbtable”, this.tableName) .option(“driver”,”com.mysql.jdbc.Driver”) .format(“jdbc”) .load(); set.take(500); 这需要5到10分钟。 我在jvm里面运行火花。 在两种情况下使用相同的配置。 我可以使用partitionColumn,numParttition等但我没有任何数字列,还有一个问题是我不知道该表的模式。 我的问题不是如何减少所需的时间,因为我知道在理想情况下火花将在集群中运行但我无法理解的是为什么在上述两种情况下这个大的时间差异?